You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@spark.apache.org by rx...@apache.org on 2014/11/17 06:56:00 UTC

spark git commit: [SPARK-4410][SQL] Add support for external sort

Repository: spark
Updated Branches:
  refs/heads/master 5168c6ca9 -> 64c6b9bad


[SPARK-4410][SQL] Add support for external sort

Adds a new operator that uses Spark's `ExternalSort` class.  It is off by default now, but we might consider making it the default if benchmarks show that it does not regress performance.

Author: Michael Armbrust <mi...@databricks.com>

Closes #3268 from marmbrus/externalSort and squashes the following commits:

48b9726 [Michael Armbrust] comments
b98799d [Michael Armbrust] Add test
afd7562 [Michael Armbrust] Add support for external sort.


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/64c6b9ba
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/64c6b9ba
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/64c6b9ba

Branch: refs/heads/master
Commit: 64c6b9bad559c21f25cd9fbe37c8813cdab939f2
Parents: 5168c6c
Author: Michael Armbrust <mi...@databricks.com>
Authored: Sun Nov 16 21:55:57 2014 -0800
Committer: Reynold Xin <rx...@databricks.com>
Committed: Sun Nov 16 21:55:57 2014 -0800

----------------------------------------------------------------------
 .../scala/org/apache/spark/sql/SQLConf.scala    |  7 ++++
 .../spark/sql/execution/SparkStrategies.scala   |  5 ++-
 .../spark/sql/execution/basicOperators.scala    | 37 +++++++++++++++++---
 .../org/apache/spark/sql/SQLQuerySuite.scala    | 16 ++++++++-
 4 files changed, 59 insertions(+), 6 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/spark/blob/64c6b9ba/sql/core/src/main/scala/org/apache/spark/sql/SQLConf.scala
----------------------------------------------------------------------
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/SQLConf.scala b/sql/core/src/main/scala/org/apache/spark/sql/SQLConf.scala
index cd7d78e..9697beb 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/SQLConf.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/SQLConf.scala
@@ -39,6 +39,10 @@ private[spark] object SQLConf {
 
   val COLUMN_NAME_OF_CORRUPT_RECORD = "spark.sql.columnNameOfCorruptRecord"
 
+  // Options that control which operators can be chosen by the query planner.  These should be
+  // considered hints and may be ignored by future versions of Spark SQL.
+  val EXTERNAL_SORT = "spark.sql.planner.externalSort"
+
   // This is only used for the thriftserver
   val THRIFTSERVER_POOL = "spark.sql.thriftserver.scheduler.pool"
 
@@ -96,6 +100,9 @@ private[sql] trait SQLConf {
   private[spark] def parquetFilterPushDown =
     getConf(PARQUET_FILTER_PUSHDOWN_ENABLED, "false").toBoolean
 
+  /** When true the planner will use the external sort, which may spill to disk. */
+  private[spark] def externalSortEnabled: Boolean = getConf(EXTERNAL_SORT, "false").toBoolean
+
   /**
    * When set to true, Spark SQL will use the Scala compiler at runtime to generate custom bytecode
    * that evaluates expressions found in queries.  In general this custom code runs much faster

http://git-wip-us.apache.org/repos/asf/spark/blob/64c6b9ba/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkStrategies.scala
----------------------------------------------------------------------
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkStrategies.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkStrategies.scala
index 03cd5bd..7ef1f9f 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkStrategies.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkStrategies.scala
@@ -263,9 +263,12 @@ private[sql] abstract class SparkStrategies extends QueryPlanner[SparkPlan] {
       case logical.Distinct(child) =>
         execution.Distinct(partial = false,
           execution.Distinct(partial = true, planLater(child))) :: Nil
+
+      case logical.Sort(sortExprs, child) if sqlContext.externalSortEnabled =>
+        execution.ExternalSort(sortExprs, global = true, planLater(child)):: Nil
       case logical.Sort(sortExprs, child) =>
-        // This sort is a global sort. Its requiredDistribution will be an OrderedDistribution.
         execution.Sort(sortExprs, global = true, planLater(child)):: Nil
+
       case logical.SortPartitions(sortExprs, child) =>
         // This sort only sorts tuples within a partition. Its requiredDistribution will be
         // an UnspecifiedDistribution.

http://git-wip-us.apache.org/repos/asf/spark/blob/64c6b9ba/sql/core/src/main/scala/org/apache/spark/sql/execution/basicOperators.scala
----------------------------------------------------------------------
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/basicOperators.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/basicOperators.scala
index 1b8ba3a..e53723c 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/basicOperators.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/basicOperators.scala
@@ -29,6 +29,7 @@ import org.apache.spark.sql.catalyst.errors._
 import org.apache.spark.sql.catalyst.expressions._
 import org.apache.spark.sql.catalyst.plans.physical.{ClusteredDistribution, OrderedDistribution, SinglePartition, UnspecifiedDistribution}
 import org.apache.spark.util.MutablePair
+import org.apache.spark.util.collection.ExternalSorter
 
 /**
  * :: DeveloperApi ::
@@ -189,6 +190,9 @@ case class TakeOrdered(limit: Int, sortOrder: Seq[SortOrder], child: SparkPlan)
 
 /**
  * :: DeveloperApi ::
+ * Performs a sort on-heap.
+ * @param global when true performs a global sort of all partitions by shuffling the data first
+ *               if necessary.
  */
 @DeveloperApi
 case class Sort(
@@ -199,12 +203,37 @@ case class Sort(
   override def requiredChildDistribution =
     if (global) OrderedDistribution(sortOrder) :: Nil else UnspecifiedDistribution :: Nil
 
+  override def execute() = attachTree(this, "sort") {
+    child.execute().mapPartitions( { iterator =>
+      val ordering = newOrdering(sortOrder, child.output)
+      iterator.map(_.copy()).toArray.sorted(ordering).iterator
+    }, preservesPartitioning = true)
+  }
+
+  override def output = child.output
+}
+
+/**
+ * :: DeveloperApi ::
+ * Performs a sort, spilling to disk as needed.
+ * @param global when true performs a global sort of all partitions by shuffling the data first
+ *               if necessary.
+ */
+@DeveloperApi
+case class ExternalSort(
+    sortOrder: Seq[SortOrder],
+    global: Boolean,
+    child: SparkPlan)
+  extends UnaryNode {
+  override def requiredChildDistribution =
+    if (global) OrderedDistribution(sortOrder) :: Nil else UnspecifiedDistribution :: Nil
 
   override def execute() = attachTree(this, "sort") {
-    child.execute()
-      .mapPartitions( { iterator =>
-        val ordering = newOrdering(sortOrder, child.output)
-        iterator.map(_.copy()).toArray.sorted(ordering).iterator
+    child.execute().mapPartitions( { iterator =>
+      val ordering = newOrdering(sortOrder, child.output)
+      val sorter = new ExternalSorter[Row, Null, Row](ordering = Some(ordering))
+      sorter.insertAll(iterator.map(r => (r, null)))
+      sorter.iterator.map(_._1)
     }, preservesPartitioning = true)
   }
 

http://git-wip-us.apache.org/repos/asf/spark/blob/64c6b9ba/sql/core/src/test/scala/org/apache/spark/sql/SQLQuerySuite.scala
----------------------------------------------------------------------
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/SQLQuerySuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/SQLQuerySuite.scala
index ce5672c..a635154 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/SQLQuerySuite.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/SQLQuerySuite.scala
@@ -196,7 +196,7 @@ class SQLQuerySuite extends QueryTest with BeforeAndAfterAll {
       Seq(Seq("1")))
   }
 
-  test("sorting") {
+  def sortTest() = {
     checkAnswer(
       sql("SELECT * FROM testData2 ORDER BY a ASC, b ASC"),
       Seq((1,1), (1,2), (2,1), (2,2), (3,1), (3,2)))
@@ -238,6 +238,20 @@ class SQLQuerySuite extends QueryTest with BeforeAndAfterAll {
       mapData.collect().sortBy(_.data(1)).reverse.toSeq)
   }
 
+  test("sorting") {
+    val before = externalSortEnabled
+    setConf(SQLConf.EXTERNAL_SORT, "false")
+    sortTest()
+    setConf(SQLConf.EXTERNAL_SORT, before.toString)
+  }
+
+  test("external sorting") {
+    val before = externalSortEnabled
+    setConf(SQLConf.EXTERNAL_SORT, "true")
+    sortTest()
+    setConf(SQLConf.EXTERNAL_SORT, before.toString)
+  }
+
   test("limit") {
     checkAnswer(
       sql("SELECT * FROM testData LIMIT 10"),


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@spark.apache.org
For additional commands, e-mail: commits-help@spark.apache.org