You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@spark.apache.org by ma...@apache.org on 2014/11/24 21:43:51 UTC

spark git commit: [SPARK-4479][SQL] Avoids unnecessary defensive copies when sort based shuffle is on

Repository: spark
Updated Branches:
  refs/heads/master 29372b631 -> a6d7b61f9


[SPARK-4479][SQL] Avoids unnecessary defensive copies when sort based shuffle is on

This PR is a workaround for SPARK-4479. Two changes are introduced: when merge sort is bypassed in `ExternalSorter`,

1. also bypass RDD elements buffering as buffering is the reason that `MutableRow` backed row objects must be copied, and
2. avoids defensive copies in `Exchange` operator

<!-- Reviewable:start -->
[<img src="https://reviewable.io/review_button.png" height=40 alt="Review on Reviewable"/>](https://reviewable.io/reviews/apache/spark/3422)
<!-- Reviewable:end -->

Author: Cheng Lian <li...@databricks.com>

Closes #3422 from liancheng/avoids-defensive-copies and squashes the following commits:

591f2e9 [Cheng Lian] Passes all shuffle suites
0c3c91e [Cheng Lian] Fixes shuffle write metrics when merge sort is bypassed
ed5df3c [Cheng Lian] Fixes styling changes
f75089b [Cheng Lian] Avoids unnecessary defensive copies when sort based shuffle is on


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/a6d7b61f
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/a6d7b61f
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/a6d7b61f

Branch: refs/heads/master
Commit: a6d7b61f92dc7c1f9632cecb232afa8040ab2b4d
Parents: 29372b6
Author: Cheng Lian <li...@databricks.com>
Authored: Mon Nov 24 12:43:45 2014 -0800
Committer: Michael Armbrust <mi...@databricks.com>
Committed: Mon Nov 24 12:43:45 2014 -0800

----------------------------------------------------------------------
 .../spark/util/collection/ExternalSorter.scala  | 23 +++++++++++++++++---
 .../scala/org/apache/spark/ShuffleSuite.scala   | 12 +++++-----
 .../apache/spark/sql/execution/Exchange.scala   | 16 +++++++++++++-
 3 files changed, 41 insertions(+), 10 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/spark/blob/a6d7b61f/core/src/main/scala/org/apache/spark/util/collection/ExternalSorter.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/util/collection/ExternalSorter.scala b/core/src/main/scala/org/apache/spark/util/collection/ExternalSorter.scala
index c617ff5..15bda1c 100644
--- a/core/src/main/scala/org/apache/spark/util/collection/ExternalSorter.scala
+++ b/core/src/main/scala/org/apache/spark/util/collection/ExternalSorter.scala
@@ -205,6 +205,13 @@ private[spark] class ExternalSorter[K, V, C](
         map.changeValue((getPartition(kv._1), kv._1), update)
         maybeSpillCollection(usingMap = true)
       }
+    } else if (bypassMergeSort) {
+      // SPARK-4479: Also bypass buffering if merge sort is bypassed to avoid defensive copies
+      if (records.hasNext) {
+        spillToPartitionFiles(records.map { kv =>
+          ((getPartition(kv._1), kv._1), kv._2.asInstanceOf[C])
+        })
+      }
     } else {
       // Stick values into our buffer
       while (records.hasNext) {
@@ -336,6 +343,10 @@ private[spark] class ExternalSorter[K, V, C](
    * @param collection whichever collection we're using (map or buffer)
    */
   private def spillToPartitionFiles(collection: SizeTrackingPairCollection[(Int, K), C]): Unit = {
+    spillToPartitionFiles(collection.iterator)
+  }
+
+  private def spillToPartitionFiles(iterator: Iterator[((Int, K), C)]): Unit = {
     assert(bypassMergeSort)
 
     // Create our file writers if we haven't done so yet
@@ -350,9 +361,9 @@ private[spark] class ExternalSorter[K, V, C](
       }
     }
 
-    val it = collection.iterator  // No need to sort stuff, just write each element out
-    while (it.hasNext) {
-      val elem = it.next()
+    // No need to sort stuff, just write each element out
+    while (iterator.hasNext) {
+      val elem = iterator.next()
       val partitionId = elem._1._1
       val key = elem._1._2
       val value = elem._2
@@ -748,6 +759,12 @@ private[spark] class ExternalSorter[K, V, C](
 
     context.taskMetrics.memoryBytesSpilled += memoryBytesSpilled
     context.taskMetrics.diskBytesSpilled += diskBytesSpilled
+    context.taskMetrics.shuffleWriteMetrics.filter(_ => bypassMergeSort).foreach { m =>
+      if (curWriteMetrics != null) {
+        m.shuffleBytesWritten += curWriteMetrics.shuffleBytesWritten
+        m.shuffleWriteTime += curWriteMetrics.shuffleWriteTime
+      }
+    }
 
     lengths
   }

http://git-wip-us.apache.org/repos/asf/spark/blob/a6d7b61f/core/src/test/scala/org/apache/spark/ShuffleSuite.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/org/apache/spark/ShuffleSuite.scala b/core/src/test/scala/org/apache/spark/ShuffleSuite.scala
index cda942e..85e5f9a 100644
--- a/core/src/test/scala/org/apache/spark/ShuffleSuite.scala
+++ b/core/src/test/scala/org/apache/spark/ShuffleSuite.scala
@@ -95,14 +95,14 @@ abstract class ShuffleSuite extends FunSuite with Matchers with LocalSparkContex
     // Use a local cluster with 2 processes to make sure there are both local and remote blocks
     sc = new SparkContext("local-cluster[2,1,512]", "test", conf)
 
-    // 10 partitions from 4 keys
-    val NUM_BLOCKS = 10
+    // 201 partitions (greater than "spark.shuffle.sort.bypassMergeThreshold") from 4 keys
+    val NUM_BLOCKS = 201
     val a = sc.parallelize(1 to 4, NUM_BLOCKS)
     val b = a.map(x => (x, x*2))
 
     // NOTE: The default Java serializer doesn't create zero-sized blocks.
     //       So, use Kryo
-    val c = new ShuffledRDD[Int, Int, Int](b, new HashPartitioner(10))
+    val c = new ShuffledRDD[Int, Int, Int](b, new HashPartitioner(NUM_BLOCKS))
       .setSerializer(new KryoSerializer(conf))
 
     val shuffleId = c.dependencies.head.asInstanceOf[ShuffleDependency[_, _, _]].shuffleId
@@ -122,13 +122,13 @@ abstract class ShuffleSuite extends FunSuite with Matchers with LocalSparkContex
     // Use a local cluster with 2 processes to make sure there are both local and remote blocks
     sc = new SparkContext("local-cluster[2,1,512]", "test", conf)
 
-    // 10 partitions from 4 keys
-    val NUM_BLOCKS = 10
+    // 201 partitions (greater than "spark.shuffle.sort.bypassMergeThreshold") from 4 keys
+    val NUM_BLOCKS = 201
     val a = sc.parallelize(1 to 4, NUM_BLOCKS)
     val b = a.map(x => (x, x*2))
 
     // NOTE: The default Java serializer should create zero-sized blocks
-    val c = new ShuffledRDD[Int, Int, Int](b, new HashPartitioner(10))
+    val c = new ShuffledRDD[Int, Int, Int](b, new HashPartitioner(NUM_BLOCKS))
 
     val shuffleId = c.dependencies.head.asInstanceOf[ShuffleDependency[_, _, _]].shuffleId
     assert(c.count === 4)

http://git-wip-us.apache.org/repos/asf/spark/blob/a6d7b61f/sql/core/src/main/scala/org/apache/spark/sql/execution/Exchange.scala
----------------------------------------------------------------------
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/Exchange.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/Exchange.scala
index cff7a01..d7c811c 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/Exchange.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/Exchange.scala
@@ -41,11 +41,21 @@ case class Exchange(newPartitioning: Partitioning, child: SparkPlan) extends Una
   /** We must copy rows when sort based shuffle is on */
   protected def sortBasedShuffleOn = SparkEnv.get.shuffleManager.isInstanceOf[SortShuffleManager]
 
+  private val bypassMergeThreshold =
+    child.sqlContext.sparkContext.conf.getInt("spark.shuffle.sort.bypassMergeThreshold", 200)
+
   override def execute() = attachTree(this , "execute") {
     newPartitioning match {
       case HashPartitioning(expressions, numPartitions) =>
         // TODO: Eliminate redundant expressions in grouping key and value.
-        val rdd = if (sortBasedShuffleOn) {
+        // This is a workaround for SPARK-4479. When:
+        //  1. sort based shuffle is on, and
+        //  2. the partition number is under the merge threshold, and
+        //  3. no ordering is required
+        // we can avoid the defensive copies to improve performance. In the long run, we probably
+        // want to include information in shuffle dependencies to indicate whether elements in the
+        // source RDD should be copied.
+        val rdd = if (sortBasedShuffleOn && numPartitions > bypassMergeThreshold) {
           child.execute().mapPartitions { iter =>
             val hashExpressions = newMutableProjection(expressions, child.output)()
             iter.map(r => (hashExpressions(r).copy(), r.copy()))
@@ -82,6 +92,10 @@ case class Exchange(newPartitioning: Partitioning, child: SparkPlan) extends Una
         shuffled.map(_._1)
 
       case SinglePartition =>
+        // SPARK-4479: Can't turn off defensive copy as what we do for `HashPartitioning`, since
+        // operators like `TakeOrdered` may require an ordering within the partition, and currently
+        // `SinglePartition` doesn't include ordering information.
+        // TODO Add `SingleOrderedPartition` for operators like `TakeOrdered`
         val rdd = if (sortBasedShuffleOn) {
           child.execute().mapPartitions { iter => iter.map(r => (null, r.copy())) }
         } else {


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@spark.apache.org
For additional commands, e-mail: commits-help@spark.apache.org