You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@spark.apache.org by ma...@apache.org on 2015/06/12 07:15:25 UTC

spark git commit: [SPARK-8317] [SQL] Do not push sort into shuffle in Exchange operator

Repository: spark
Updated Branches:
  refs/heads/master 767cc94ca -> b9d177c51


[SPARK-8317] [SQL] Do not push sort into shuffle in Exchange operator

In some cases, Spark SQL pushes sorting operations into the shuffle layer by specifying a key ordering as part of the shuffle dependency. I think that we should not do this:

- Since we do not delegate aggregation to Spark's shuffle, specifying the keyOrdering as part of the shuffle has no effect on the shuffle map side.
- By performing the shuffle ourselves (by inserting a sort operator after the shuffle instead), we can use the Exchange planner to choose specialized sorting implementations based on the types of rows being sorted.
- We can remove some complexity from SqlSerializer2 by not requiring it to know about sort orderings, since SQL's own sort operators will already perform the necessary defensive copying.

This patch removes Exchange's `canSortWithShuffle` path and the associated code in `SqlSerializer2`.  Shuffles that used to go through the `canSortWithShuffle` path would always wind up using Spark's `ExternalSorter` (inside of `HashShuffleReader`); to avoid a performance regression as a result of handling these shuffles ourselves, I've changed the SQLConf defaults so that external sorting is enabled by default.

Author: Josh Rosen <jo...@databricks.com>

Closes #6772 from JoshRosen/SPARK-8317 and squashes the following commits:

ebf9c0f [Josh Rosen] Do not push sort into shuffle in Exchange operator
bf3b4c8 [Josh Rosen] Enable external sort by default


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/b9d177c5
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/b9d177c5
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/b9d177c5

Branch: refs/heads/master
Commit: b9d177c5110cd054fdb9bcbeeb5f4ca9aa645dc1
Parents: 767cc94
Author: Josh Rosen <jo...@databricks.com>
Authored: Thu Jun 11 22:15:15 2015 -0700
Committer: Michael Armbrust <mi...@databricks.com>
Committed: Thu Jun 11 22:15:15 2015 -0700

----------------------------------------------------------------------
 .../scala/org/apache/spark/sql/SQLConf.scala    |  2 +-
 .../apache/spark/sql/execution/Exchange.scala   | 54 ++++++--------------
 .../sql/execution/SparkSqlSerializer2.scala     | 22 +++-----
 3 files changed, 24 insertions(+), 54 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/spark/blob/b9d177c5/sql/core/src/main/scala/org/apache/spark/sql/SQLConf.scala
----------------------------------------------------------------------
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/SQLConf.scala b/sql/core/src/main/scala/org/apache/spark/sql/SQLConf.scala
index be786f9..87f4048 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/SQLConf.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/SQLConf.scala
@@ -161,7 +161,7 @@ private[sql] class SQLConf extends Serializable with CatalystConf {
     getConf(HIVE_VERIFY_PARTITIONPATH, "true").toBoolean
 
   /** When true the planner will use the external sort, which may spill to disk. */
-  private[spark] def externalSortEnabled: Boolean = getConf(EXTERNAL_SORT, "false").toBoolean
+  private[spark] def externalSortEnabled: Boolean = getConf(EXTERNAL_SORT, "true").toBoolean
 
   /**
    * Sort merge join would sort the two side of join first, and then iterate both sides together

http://git-wip-us.apache.org/repos/asf/spark/blob/b9d177c5/sql/core/src/main/scala/org/apache/spark/sql/execution/Exchange.scala
----------------------------------------------------------------------
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/Exchange.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/Exchange.scala
index f25d10f..6fa7ccc 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/Exchange.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/Exchange.scala
@@ -31,16 +31,6 @@ import org.apache.spark.sql.types.DataType
 import org.apache.spark.sql.{SQLContext, Row}
 import org.apache.spark.util.MutablePair
 
-object Exchange {
-  /**
-   * Returns true when the ordering expressions are a subset of the key.
-   * if true, ShuffledRDD can use `setKeyOrdering(orderingKey)` to sort within [[Exchange]].
-   */
-  def canSortWithShuffle(partitioning: Partitioning, desiredOrdering: Seq[SortOrder]): Boolean = {
-    desiredOrdering.map(_.child).toSet.subsetOf(partitioning.keyExpressions.toSet)
-  }
-}
-
 /**
  * :: DeveloperApi ::
  * Performs a shuffle that will result in the desired `newPartitioning`.  Optionally sorts each
@@ -143,7 +133,6 @@ case class Exchange(
   private def getSerializer(
       keySchema: Array[DataType],
       valueSchema: Array[DataType],
-      hasKeyOrdering: Boolean,
       numPartitions: Int): Serializer = {
     // It is true when there is no field that needs to be write out.
     // For now, we will not use SparkSqlSerializer2 when noField is true.
@@ -159,7 +148,7 @@ case class Exchange(
 
     val serializer = if (useSqlSerializer2) {
       logInfo("Using SparkSqlSerializer2.")
-      new SparkSqlSerializer2(keySchema, valueSchema, hasKeyOrdering)
+      new SparkSqlSerializer2(keySchema, valueSchema)
     } else {
       logInfo("Using SparkSqlSerializer.")
       new SparkSqlSerializer(sparkConf)
@@ -173,7 +162,7 @@ case class Exchange(
       case HashPartitioning(expressions, numPartitions) =>
         val keySchema = expressions.map(_.dataType).toArray
         val valueSchema = child.output.map(_.dataType).toArray
-        val serializer = getSerializer(keySchema, valueSchema, newOrdering.nonEmpty, numPartitions)
+        val serializer = getSerializer(keySchema, valueSchema, numPartitions)
         val part = new HashPartitioner(numPartitions)
 
         val rdd = if (needToCopyObjectsBeforeShuffle(part, serializer)) {
@@ -189,15 +178,12 @@ case class Exchange(
           }
         }
         val shuffled = new ShuffledRDD[Row, Row, Row](rdd, part)
-        if (newOrdering.nonEmpty) {
-          shuffled.setKeyOrdering(keyOrdering)
-        }
         shuffled.setSerializer(serializer)
         shuffled.map(_._2)
 
       case RangePartitioning(sortingExpressions, numPartitions) =>
         val keySchema = child.output.map(_.dataType).toArray
-        val serializer = getSerializer(keySchema, null, newOrdering.nonEmpty, numPartitions)
+        val serializer = getSerializer(keySchema, null, numPartitions)
 
         val childRdd = child.execute()
         val part: Partitioner = {
@@ -222,15 +208,12 @@ case class Exchange(
         }
 
         val shuffled = new ShuffledRDD[Row, Null, Null](rdd, part)
-        if (newOrdering.nonEmpty) {
-          shuffled.setKeyOrdering(keyOrdering)
-        }
         shuffled.setSerializer(serializer)
         shuffled.map(_._1)
 
       case SinglePartition =>
         val valueSchema = child.output.map(_.dataType).toArray
-        val serializer = getSerializer(null, valueSchema, hasKeyOrdering = false, 1)
+        val serializer = getSerializer(null, valueSchema, numPartitions = 1)
         val partitioner = new HashPartitioner(1)
 
         val rdd = if (needToCopyObjectsBeforeShuffle(partitioner, serializer)) {
@@ -306,29 +289,24 @@ private[sql] case class EnsureRequirements(sqlContext: SQLContext) extends Rule[
           child: SparkPlan): SparkPlan = {
         val needSort = rowOrdering.nonEmpty && child.outputOrdering != rowOrdering
         val needsShuffle = child.outputPartitioning != partitioning
-        val canSortWithShuffle = Exchange.canSortWithShuffle(partitioning, rowOrdering)
 
-        if (needSort && needsShuffle && canSortWithShuffle) {
-          Exchange(partitioning, rowOrdering, child)
+        val withShuffle = if (needsShuffle) {
+          Exchange(partitioning, Nil, child)
         } else {
-          val withShuffle = if (needsShuffle) {
-            Exchange(partitioning, Nil, child)
-          } else {
-            child
-          }
+          child
+        }
 
-          val withSort = if (needSort) {
-            if (sqlContext.conf.externalSortEnabled) {
-              ExternalSort(rowOrdering, global = false, withShuffle)
-            } else {
-              Sort(rowOrdering, global = false, withShuffle)
-            }
+        val withSort = if (needSort) {
+          if (sqlContext.conf.externalSortEnabled) {
+            ExternalSort(rowOrdering, global = false, withShuffle)
           } else {
-            withShuffle
+            Sort(rowOrdering, global = false, withShuffle)
           }
-
-          withSort
+        } else {
+          withShuffle
         }
+
+        withSort
       }
 
       if (meetsRequirements && compatible && !needsAnySort) {

http://git-wip-us.apache.org/repos/asf/spark/blob/b9d177c5/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkSqlSerializer2.scala
----------------------------------------------------------------------
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkSqlSerializer2.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkSqlSerializer2.scala
index 202e448..15b6936 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkSqlSerializer2.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkSqlSerializer2.scala
@@ -86,7 +86,6 @@ private[sql] class Serializer2SerializationStream(
 private[sql] class Serializer2DeserializationStream(
     keySchema: Array[DataType],
     valueSchema: Array[DataType],
-    hasKeyOrdering: Boolean,
     in: InputStream)
   extends DeserializationStream with Logging  {
 
@@ -96,14 +95,9 @@ private[sql] class Serializer2DeserializationStream(
     if (schema == null) {
       () => null
     } else {
-      if (hasKeyOrdering) {
-        // We have key ordering specified in a ShuffledRDD, it is not safe to reuse a mutable row.
-        () => new GenericMutableRow(schema.length)
-      } else {
-        // It is safe to reuse the mutable row.
-        val mutableRow = new SpecificMutableRow(schema)
-        () => mutableRow
-      }
+      // It is safe to reuse the mutable row.
+      val mutableRow = new SpecificMutableRow(schema)
+      () => mutableRow
     }
   }
 
@@ -133,8 +127,7 @@ private[sql] class Serializer2DeserializationStream(
 
 private[sql] class SparkSqlSerializer2Instance(
     keySchema: Array[DataType],
-    valueSchema: Array[DataType],
-    hasKeyOrdering: Boolean)
+    valueSchema: Array[DataType])
   extends SerializerInstance {
 
   def serialize[T: ClassTag](t: T): ByteBuffer =
@@ -151,7 +144,7 @@ private[sql] class SparkSqlSerializer2Instance(
   }
 
   def deserializeStream(s: InputStream): DeserializationStream = {
-    new Serializer2DeserializationStream(keySchema, valueSchema, hasKeyOrdering, s)
+    new Serializer2DeserializationStream(keySchema, valueSchema, s)
   }
 }
 
@@ -164,14 +157,13 @@ private[sql] class SparkSqlSerializer2Instance(
  */
 private[sql] class SparkSqlSerializer2(
     keySchema: Array[DataType],
-    valueSchema: Array[DataType],
-    hasKeyOrdering: Boolean)
+    valueSchema: Array[DataType])
   extends Serializer
   with Logging
   with Serializable{
 
   def newInstance(): SerializerInstance =
-    new SparkSqlSerializer2Instance(keySchema, valueSchema, hasKeyOrdering)
+    new SparkSqlSerializer2Instance(keySchema, valueSchema)
 
   override def supportsRelocationOfSerializedObjects: Boolean = {
     // SparkSqlSerializer2 is stateless and writes no stream headers


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@spark.apache.org
For additional commands, e-mail: commits-help@spark.apache.org