You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@spark.apache.org by yh...@apache.org on 2015/05/09 04:10:03 UTC

spark git commit: [SPARK-7375] [SQL] Avoid row copying in exchange when sort.serializeMapOutputs takes effect

Repository: spark
Updated Branches:
  refs/heads/master 0a901dd3a -> cde548388


[SPARK-7375] [SQL] Avoid row copying in exchange when sort.serializeMapOutputs takes effect

This patch refactors the SQL `Exchange` operator's logic for determining whether map outputs need to be copied before being shuffled. As part of this change, we'll now avoid unnecessary copies in cases where sort-based shuffle operates on serialized map outputs (as in #4450 /
SPARK-4550).

This patch also includes a change to copy the input to RangePartitioner partition bounds calculation, which is necessary because this calculation buffers mutable Java objects.

<!-- Reviewable:start -->
[<img src="https://reviewable.io/review_button.png" height=40 alt="Review on Reviewable"/>](https://reviewable.io/reviews/apache/spark/5948)
<!-- Reviewable:end -->

Author: Josh Rosen <jo...@databricks.com>

Closes #5948 from JoshRosen/SPARK-7375 and squashes the following commits:

f305ff3 [Josh Rosen] Reduce scope of some variables in Exchange
899e1d7 [Josh Rosen] Merge remote-tracking branch 'origin/master' into SPARK-7375
6a6bfce [Josh Rosen] Fix issue related to RangePartitioning:
ad006a4 [Josh Rosen] [SPARK-7375] Avoid defensive copying in exchange operator when sort.serializeMapOutputs takes effect.


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/cde54838
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/cde54838
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/cde54838

Branch: refs/heads/master
Commit: cde5483884068b0ae1470b9b9b3ee54ab944ab12
Parents: 0a901dd
Author: Josh Rosen <jo...@databricks.com>
Authored: Fri May 8 22:09:55 2015 -0400
Committer: Yin Huai <yh...@databricks.com>
Committed: Fri May 8 22:09:55 2015 -0400

----------------------------------------------------------------------
 .../apache/spark/sql/execution/Exchange.scala   | 156 ++++++++++++-------
 1 file changed, 100 insertions(+), 56 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/spark/blob/cde54838/sql/core/src/main/scala/org/apache/spark/sql/execution/Exchange.scala
----------------------------------------------------------------------
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/Exchange.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/Exchange.scala
index f02fa81..c3d2c70 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/Exchange.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/Exchange.scala
@@ -19,7 +19,7 @@ package org.apache.spark.sql.execution
 
 import org.apache.spark.annotation.DeveloperApi
 import org.apache.spark.shuffle.sort.SortShuffleManager
-import org.apache.spark.{SparkEnv, HashPartitioner, RangePartitioner}
+import org.apache.spark.{HashPartitioner, Partitioner, RangePartitioner, SparkEnv}
 import org.apache.spark.rdd.{RDD, ShuffledRDD}
 import org.apache.spark.serializer.Serializer
 import org.apache.spark.sql.{SQLContext, Row}
@@ -59,11 +59,62 @@ case class Exchange(
 
   override def output: Seq[Attribute] = child.output
 
-  /** We must copy rows when sort based shuffle is on */
-  protected def sortBasedShuffleOn = SparkEnv.get.shuffleManager.isInstanceOf[SortShuffleManager]
-
-  private val bypassMergeThreshold =
-    child.sqlContext.sparkContext.conf.getInt("spark.shuffle.sort.bypassMergeThreshold", 200)
+  /**
+   * Determines whether records must be defensively copied before being sent to the shuffle.
+   * Several of Spark's shuffle components will buffer deserialized Java objects in memory. The
+   * shuffle code assumes that objects are immutable and hence does not perform its own defensive
+   * copying. In Spark SQL, however, operators' iterators return the same mutable `Row` object. In
+   * order to properly shuffle the output of these operators, we need to perform our own copying
+   * prior to sending records to the shuffle. This copying is expensive, so we try to avoid it
+   * whenever possible. This method encapsulates the logic for choosing when to copy.
+   *
+   * In the long run, we might want to push this logic into core's shuffle APIs so that we don't
+   * have to rely on knowledge of core internals here in SQL.
+   *
+   * See SPARK-2967, SPARK-4479, and SPARK-7375 for more discussion of this issue.
+   *
+   * @param partitioner the partitioner for the shuffle
+   * @param serializer the serializer that will be used to write rows
+   * @return true if rows should be copied before being shuffled, false otherwise
+   */
+  private def needToCopyObjectsBeforeShuffle(
+      partitioner: Partitioner,
+      serializer: Serializer): Boolean = {
+    // Note: even though we only use the partitioner's `numPartitions` field, we require it to be
+    // passed instead of directly passing the number of partitions in order to guard against
+    // corner-cases where a partitioner constructed with `numPartitions` partitions may output
+    // fewer partitions (like RangePartitioner, for example).
+    val conf = child.sqlContext.sparkContext.conf
+    val sortBasedShuffleOn = SparkEnv.get.shuffleManager.isInstanceOf[SortShuffleManager]
+    val bypassMergeThreshold = conf.getInt("spark.shuffle.sort.bypassMergeThreshold", 200)
+    val serializeMapOutputs = conf.getBoolean("spark.shuffle.sort.serializeMapOutputs", true)
+    if (newOrdering.nonEmpty) {
+      // If a new ordering is required, then records will be sorted with Spark's `ExternalSorter`,
+      // which requires a defensive copy.
+      true
+    } else if (sortBasedShuffleOn) {
+      // Spark's sort-based shuffle also uses `ExternalSorter` to buffer records in memory.
+      // However, there are two special cases where we can avoid the copy, described below:
+      if (partitioner.numPartitions <= bypassMergeThreshold) {
+        // If the number of output partitions is sufficiently small, then Spark will fall back to
+        // the old hash-based shuffle write path which doesn't buffer deserialized records.
+        // Note that we'll have to remove this case if we fix SPARK-6026 and remove this bypass.
+        false
+      } else if (serializeMapOutputs && serializer.supportsRelocationOfSerializedObjects) {
+        // SPARK-4550 extended sort-based shuffle to serialize individual records prior to sorting
+        // them. This optimization is guarded by a feature-flag and is only applied in cases where
+        // shuffle dependency does not specify an ordering and the record serializer has certain
+        // properties. If this optimization is enabled, we can safely avoid the copy.
+        false
+      } else {
+        // None of the special cases held, so we must copy.
+        true
+      }
+    } else {
+      // We're using hash-based shuffle, so we don't need to copy.
+      false
+    }
+  }
 
   private val keyOrdering = {
     if (newOrdering.nonEmpty) {
@@ -81,7 +132,7 @@ case class Exchange(
 
   @transient private lazy val sparkConf = child.sqlContext.sparkContext.getConf
 
-  def serializer(
+  private def getSerializer(
       keySchema: Array[DataType],
       valueSchema: Array[DataType],
       hasKeyOrdering: Boolean,
@@ -112,17 +163,12 @@ case class Exchange(
   protected override def doExecute(): RDD[Row] = attachTree(this , "execute") {
     newPartitioning match {
       case HashPartitioning(expressions, numPartitions) =>
-        // TODO: Eliminate redundant expressions in grouping key and value.
-        // This is a workaround for SPARK-4479. When:
-        //  1. sort based shuffle is on, and
-        //  2. the partition number is under the merge threshold, and
-        //  3. no ordering is required
-        // we can avoid the defensive copies to improve performance. In the long run, we probably
-        // want to include information in shuffle dependencies to indicate whether elements in the
-        // source RDD should be copied.
-        val willMergeSort = sortBasedShuffleOn && numPartitions > bypassMergeThreshold
-
-        val rdd = if (willMergeSort || newOrdering.nonEmpty) {
+        val keySchema = expressions.map(_.dataType).toArray
+        val valueSchema = child.output.map(_.dataType).toArray
+        val serializer = getSerializer(keySchema, valueSchema, newOrdering.nonEmpty, numPartitions)
+        val part = new HashPartitioner(numPartitions)
+
+        val rdd = if (needToCopyObjectsBeforeShuffle(part, serializer)) {
           child.execute().mapPartitions { iter =>
             val hashExpressions = newMutableProjection(expressions, child.output)()
             iter.map(r => (hashExpressions(r).copy(), r.copy()))
@@ -134,52 +180,52 @@ case class Exchange(
             iter.map(r => mutablePair.update(hashExpressions(r), r))
           }
         }
-        val part = new HashPartitioner(numPartitions)
-        val shuffled =
-          if (newOrdering.nonEmpty) {
-            new ShuffledRDD[Row, Row, Row](rdd, part).setKeyOrdering(keyOrdering)
-          } else {
-            new ShuffledRDD[Row, Row, Row](rdd, part)
-          }
-        val keySchema = expressions.map(_.dataType).toArray
-        val valueSchema = child.output.map(_.dataType).toArray
-        shuffled.setSerializer(
-          serializer(keySchema, valueSchema, newOrdering.nonEmpty, numPartitions))
-
+        val shuffled = new ShuffledRDD[Row, Row, Row](rdd, part)
+        if (newOrdering.nonEmpty) {
+          shuffled.setKeyOrdering(keyOrdering)
+        }
+        shuffled.setSerializer(serializer)
         shuffled.map(_._2)
 
       case RangePartitioning(sortingExpressions, numPartitions) =>
-        val rdd = if (sortBasedShuffleOn || newOrdering.nonEmpty) {
-          child.execute().mapPartitions { iter => iter.map(row => (row.copy(), null))}
-        } else {
-          child.execute().mapPartitions { iter =>
-            val mutablePair = new MutablePair[Row, Null](null, null)
-            iter.map(row => mutablePair.update(row, null))
+        val keySchema = child.output.map(_.dataType).toArray
+        val serializer = getSerializer(keySchema, null, newOrdering.nonEmpty, numPartitions)
+
+        val childRdd = child.execute()
+        val part: Partitioner = {
+          // Internally, RangePartitioner runs a job on the RDD that samples keys to compute
+          // partition bounds. To get accurate samples, we need to copy the mutable keys.
+          val rddForSampling = childRdd.mapPartitions { iter =>
+            val mutablePair = new MutablePair[Row, Null]()
+            iter.map(row => mutablePair.update(row.copy(), null))
           }
+          // TODO: RangePartitioner should take an Ordering.
+          implicit val ordering = new RowOrdering(sortingExpressions, child.output)
+          new RangePartitioner(numPartitions, rddForSampling, ascending = true)
         }
 
-        // TODO: RangePartitioner should take an Ordering.
-        implicit val ordering = new RowOrdering(sortingExpressions, child.output)
-
-        val part = new RangePartitioner(numPartitions, rdd, ascending = true)
-        val shuffled =
-          if (newOrdering.nonEmpty) {
-            new ShuffledRDD[Row, Null, Null](rdd, part).setKeyOrdering(keyOrdering)
-          } else {
-            new ShuffledRDD[Row, Null, Null](rdd, part)
+        val rdd = if (needToCopyObjectsBeforeShuffle(part, serializer)) {
+          childRdd.mapPartitions { iter => iter.map(row => (row.copy(), null))}
+        } else {
+          childRdd.mapPartitions { iter =>
+            val mutablePair = new MutablePair[Row, Null]()
+            iter.map(row => mutablePair.update(row, null))
           }
-        val keySchema = child.output.map(_.dataType).toArray
-        shuffled.setSerializer(
-          serializer(keySchema, null, newOrdering.nonEmpty, numPartitions))
+        }
 
+        val shuffled = new ShuffledRDD[Row, Null, Null](rdd, part)
+        if (newOrdering.nonEmpty) {
+          shuffled.setKeyOrdering(keyOrdering)
+        }
+        shuffled.setSerializer(serializer)
         shuffled.map(_._1)
 
       case SinglePartition =>
-        // SPARK-4479: Can't turn off defensive copy as what we do for `HashPartitioning`, since
-        // operators like `TakeOrdered` may require an ordering within the partition, and currently
-        // `SinglePartition` doesn't include ordering information.
-        // TODO Add `SingleOrderedPartition` for operators like `TakeOrdered`
-        val rdd = if (sortBasedShuffleOn) {
+        val valueSchema = child.output.map(_.dataType).toArray
+        val serializer = getSerializer(null, valueSchema, hasKeyOrdering = false, 1)
+        val partitioner = new HashPartitioner(1)
+
+        val rdd = if (needToCopyObjectsBeforeShuffle(partitioner, serializer)) {
           child.execute().mapPartitions { iter => iter.map(r => (null, r.copy())) }
         } else {
           child.execute().mapPartitions { iter =>
@@ -187,10 +233,8 @@ case class Exchange(
             iter.map(r => mutablePair.update(null, r))
           }
         }
-        val partitioner = new HashPartitioner(1)
         val shuffled = new ShuffledRDD[Null, Row, Row](rdd, partitioner)
-        val valueSchema = child.output.map(_.dataType).toArray
-        shuffled.setSerializer(serializer(null, valueSchema, false, 1))
+        shuffled.setSerializer(serializer)
         shuffled.map(_._2)
 
       case _ => sys.error(s"Exchange not implemented for $newPartitioning")


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@spark.apache.org
For additional commands, e-mail: commits-help@spark.apache.org