You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@spark.apache.org by jo...@apache.org on 2015/06/14 01:15:15 UTC

spark git commit: [SPARK-8319] [CORE] [SQL] Update logic related to key orderings in shuffle dependencies

Repository: spark
Updated Branches:
  refs/heads/master ce1041c38 -> af31335ad


[SPARK-8319] [CORE] [SQL] Update logic related to key orderings in shuffle dependencies

This patch updates two pieces of logic that are related to handling of keyOrderings in ShuffleDependencies:

- The Tungsten ShuffleManager falls back to regular SortShuffleManager whenever the shuffle dependency specifies a key ordering, but technically we only need to fall back when an aggregator is also specified. This patch updates the fallback logic to reflect this so that the Tungsten optimizations can apply to more workloads.

- The SQL Exchange operator performs defensive copying of shuffle inputs when a key ordering is specified, but this is unnecessary. The copying was added to guard against cases where ExternalSorter would buffer non-serialized records in memory.  When ExternalSorter is configured without an aggregator, it uses the following logic to determine whether to buffer records in a serialized or deserialized format:

   ```scala
     private val useSerializedPairBuffer =
        ordering.isEmpty &&
        conf.getBoolean("spark.shuffle.sort.serializeMapOutputs", true) &&
        ser.supportsRelocationOfSerializedObjects
   ```

   The `newOrdering.isDefined` branch in `ExternalSorter.needToCopyObjectsBeforeShuffle`, removed by this patch, is not necessary:

   - It was checked even if we weren't using sort-based shuffle, but this was unnecessary because only SortShuffleManager performs map-side sorting.
   - Map-side sorting during shuffle writing is only performed for shuffles that perform map-side aggregation as part of the shuffle (to see this, look at how SortShuffleWriter constructs ExternalSorter).  Since SQL never pushes aggregation into Spark's shuffle, we can guarantee that both the aggregator and ordering will be empty and Spark SQL always uses serializers that support relocation, so sort-shuffle will use the serialized pair buffer unless the user has explicitly disabled it via the SparkConf feature-flag.  Therefore, I think my optimization in Exchange should be safe.

Author: Josh Rosen <jo...@databricks.com>

Closes #6773 from JoshRosen/SPARK-8319 and squashes the following commits:

7a14129 [Josh Rosen] Revise comments; add handler to guard against future ShuffleManager implementations
07bb2c9 [Josh Rosen] Update comment to clarify circumstances under which shuffle operates on serialized records
269089a [Josh Rosen] Avoid unnecessary copy in SQL Exchange
34e526e [Josh Rosen] Enable Tungsten shuffle for non-agg shuffles w/ key orderings


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/af31335a
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/af31335a
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/af31335a

Branch: refs/heads/master
Commit: af31335adce13e1452ce1990496c9bfac9778b5c
Parents: ce1041c
Author: Josh Rosen <jo...@databricks.com>
Authored: Sat Jun 13 16:14:24 2015 -0700
Committer: Josh Rosen <jo...@databricks.com>
Committed: Sat Jun 13 16:14:24 2015 -0700

----------------------------------------------------------------------
 .../shuffle/unsafe/UnsafeShuffleManager.scala    |  3 ---
 .../unsafe/UnsafeShuffleManagerSuite.scala       | 19 ++++++++++---------
 .../apache/spark/sql/execution/Exchange.scala    | 19 +++++++++++--------
 3 files changed, 21 insertions(+), 20 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/spark/blob/af31335a/core/src/main/scala/org/apache/spark/shuffle/unsafe/UnsafeShuffleManager.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/shuffle/unsafe/UnsafeShuffleManager.scala b/core/src/main/scala/org/apache/spark/shuffle/unsafe/UnsafeShuffleManager.scala
index f2bfef3..df7bbd6 100644
--- a/core/src/main/scala/org/apache/spark/shuffle/unsafe/UnsafeShuffleManager.scala
+++ b/core/src/main/scala/org/apache/spark/shuffle/unsafe/UnsafeShuffleManager.scala
@@ -56,9 +56,6 @@ private[spark] object UnsafeShuffleManager extends Logging {
     } else if (dependency.aggregator.isDefined) {
       log.debug(s"Can't use UnsafeShuffle for shuffle $shufId because an aggregator is defined")
       false
-    } else if (dependency.keyOrdering.isDefined) {
-      log.debug(s"Can't use UnsafeShuffle for shuffle $shufId because a key ordering is defined")
-      false
     } else if (dependency.partitioner.numPartitions > MAX_SHUFFLE_OUTPUT_PARTITIONS) {
       log.debug(s"Can't use UnsafeShuffle for shuffle $shufId because it has more than " +
         s"$MAX_SHUFFLE_OUTPUT_PARTITIONS partitions")

http://git-wip-us.apache.org/repos/asf/spark/blob/af31335a/core/src/test/scala/org/apache/spark/shuffle/unsafe/UnsafeShuffleManagerSuite.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/org/apache/spark/shuffle/unsafe/UnsafeShuffleManagerSuite.scala b/core/src/test/scala/org/apache/spark/shuffle/unsafe/UnsafeShuffleManagerSuite.scala
index a73e94e..6727934 100644
--- a/core/src/test/scala/org/apache/spark/shuffle/unsafe/UnsafeShuffleManagerSuite.scala
+++ b/core/src/test/scala/org/apache/spark/shuffle/unsafe/UnsafeShuffleManagerSuite.scala
@@ -76,6 +76,15 @@ class UnsafeShuffleManagerSuite extends SparkFunSuite with Matchers {
       mapSideCombine = false
     )))
 
+    // Shuffles with key orderings are supported as long as no aggregator is specified
+    assert(canUseUnsafeShuffle(shuffleDep(
+      partitioner = new HashPartitioner(2),
+      serializer = kryo,
+      keyOrdering = Some(mock(classOf[Ordering[Any]])),
+      aggregator = None,
+      mapSideCombine = false
+    )))
+
   }
 
   test("unsupported shuffle dependencies") {
@@ -100,14 +109,7 @@ class UnsafeShuffleManagerSuite extends SparkFunSuite with Matchers {
       mapSideCombine = false
     )))
 
-    // We do not support shuffles that perform any kind of aggregation or sorting of keys
-    assert(!canUseUnsafeShuffle(shuffleDep(
-      partitioner = new HashPartitioner(2),
-      serializer = kryo,
-      keyOrdering = Some(mock(classOf[Ordering[Any]])),
-      aggregator = None,
-      mapSideCombine = false
-    )))
+    // We do not support shuffles that perform aggregation
     assert(!canUseUnsafeShuffle(shuffleDep(
       partitioner = new HashPartitioner(2),
       serializer = kryo,
@@ -115,7 +117,6 @@ class UnsafeShuffleManagerSuite extends SparkFunSuite with Matchers {
       aggregator = Some(mock(classOf[Aggregator[Any, Any, Any]])),
       mapSideCombine = false
     )))
-    // We do not support shuffles that perform any kind of aggregation or sorting of keys
     assert(!canUseUnsafeShuffle(shuffleDep(
       partitioner = new HashPartitioner(2),
       serializer = kryo,

http://git-wip-us.apache.org/repos/asf/spark/blob/af31335a/sql/core/src/main/scala/org/apache/spark/sql/execution/Exchange.scala
----------------------------------------------------------------------
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/Exchange.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/Exchange.scala
index c9a1883..edc64a0 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/Exchange.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/Exchange.scala
@@ -20,6 +20,7 @@ package org.apache.spark.sql.execution
 import org.apache.spark.annotation.DeveloperApi
 import org.apache.spark.rdd.{RDD, ShuffledRDD}
 import org.apache.spark.serializer.Serializer
+import org.apache.spark.shuffle.hash.HashShuffleManager
 import org.apache.spark.shuffle.sort.SortShuffleManager
 import org.apache.spark.shuffle.unsafe.UnsafeShuffleManager
 import org.apache.spark.sql.SQLContext
@@ -81,11 +82,7 @@ case class Exchange(
       shuffleManager.isInstanceOf[UnsafeShuffleManager]
     val bypassMergeThreshold = conf.getInt("spark.shuffle.sort.bypassMergeThreshold", 200)
     val serializeMapOutputs = conf.getBoolean("spark.shuffle.sort.serializeMapOutputs", true)
-    if (newOrdering.nonEmpty) {
-      // If a new ordering is required, then records will be sorted with Spark's `ExternalSorter`,
-      // which requires a defensive copy.
-      true
-    } else if (sortBasedShuffleOn) {
+    if (sortBasedShuffleOn) {
       val bypassIsSupported = SparkEnv.get.shuffleManager.isInstanceOf[SortShuffleManager]
       if (bypassIsSupported && partitioner.numPartitions <= bypassMergeThreshold) {
         // If we're using the original SortShuffleManager and the number of output partitions is
@@ -96,8 +93,11 @@ case class Exchange(
       } else if (serializeMapOutputs && serializer.supportsRelocationOfSerializedObjects) {
         // SPARK-4550 extended sort-based shuffle to serialize individual records prior to sorting
         // them. This optimization is guarded by a feature-flag and is only applied in cases where
-        // shuffle dependency does not specify an ordering and the record serializer has certain
-        // properties. If this optimization is enabled, we can safely avoid the copy.
+        // shuffle dependency does not specify an aggregator or ordering and the record serializer
+        // has certain properties. If this optimization is enabled, we can safely avoid the copy.
+        //
+        // Exchange never configures its ShuffledRDDs with aggregators or key orderings, so we only
+        // need to check whether the optimization is enabled and supported by our serializer.
         //
         // This optimization also applies to UnsafeShuffleManager (added in SPARK-7081).
         false
@@ -108,9 +108,12 @@ case class Exchange(
         // both cases, we must copy.
         true
       }
-    } else {
+    } else if (shuffleManager.isInstanceOf[HashShuffleManager]) {
       // We're using hash-based shuffle, so we don't need to copy.
       false
+    } else {
+      // Catch-all case to safely handle any future ShuffleManager implementations.
+      true
     }
   }
 


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@spark.apache.org
For additional commands, e-mail: commits-help@spark.apache.org