You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@spark.apache.org by we...@apache.org on 2016/09/10 01:27:32 UTC

spark git commit: [SPARK-15453][SQL] FileSourceScanExec to extract `outputOrdering` information

Repository: spark
Updated Branches:
  refs/heads/master f7d214370 -> 335491704


[SPARK-15453][SQL] FileSourceScanExec to extract `outputOrdering` information

## What changes were proposed in this pull request?

Jira : https://issues.apache.org/jira/browse/SPARK-15453

Extracting sort ordering information in `FileSourceScanExec` so that planner can make use of it. My motivation to make this change was to get Sort Merge join in par with Hive's Sort-Merge-Bucket join when the source tables are bucketed + sorted.

Query:

```
val df = (0 until 16).map(i => (i % 8, i * 2, i.toString)).toDF("i", "j", "k").coalesce(1)
df.write.bucketBy(8, "j", "k").sortBy("j", "k").saveAsTable("table8")
df.write.bucketBy(8, "j", "k").sortBy("j", "k").saveAsTable("table9")
context.sql("SELECT * FROM table8 a JOIN table9 b ON a.j=b.j AND a.k=b.k").explain(true)
```

Before:

```
== Physical Plan ==
*SortMergeJoin [j#120, k#121], [j#123, k#124], Inner
:- *Sort [j#120 ASC, k#121 ASC], false, 0
:  +- *Project [i#119, j#120, k#121]
:     +- *Filter (isnotnull(k#121) && isnotnull(j#120))
:        +- *FileScan orc default.table8[i#119,j#120,k#121] Batched: false, Format: ORC, InputPaths: file:/Users/tejasp/Desktop/dev/tp-spark/spark-warehouse/table8, PartitionFilters: [], PushedFilters: [IsNotNull(k), IsNotNull(j)], ReadSchema: struct<i:int,j:int,k:string>
+- *Sort [j#123 ASC, k#124 ASC], false, 0
+- *Project [i#122, j#123, k#124]
+- *Filter (isnotnull(k#124) && isnotnull(j#123))
 +- *FileScan orc default.table9[i#122,j#123,k#124] Batched: false, Format: ORC, InputPaths: file:/Users/tejasp/Desktop/dev/tp-spark/spark-warehouse/table9, PartitionFilters: [], PushedFilters: [IsNotNull(k), IsNotNull(j)], ReadSchema: struct<i:int,j:int,k:string>
```

After:  (note that the `Sort` step is no longer there)

```
== Physical Plan ==
*SortMergeJoin [j#49, k#50], [j#52, k#53], Inner
:- *Project [i#48, j#49, k#50]
:  +- *Filter (isnotnull(k#50) && isnotnull(j#49))
:     +- *FileScan orc default.table8[i#48,j#49,k#50] Batched: false, Format: ORC, InputPaths: file:/Users/tejasp/Desktop/dev/tp-spark/spark-warehouse/table8, PartitionFilters: [], PushedFilters: [IsNotNull(k), IsNotNull(j)], ReadSchema: struct<i:int,j:int,k:string>
+- *Project [i#51, j#52, k#53]
   +- *Filter (isnotnull(j#52) && isnotnull(k#53))
      +- *FileScan orc default.table9[i#51,j#52,k#53] Batched: false, Format: ORC, InputPaths: file:/Users/tejasp/Desktop/dev/tp-spark/spark-warehouse/table9, PartitionFilters: [], PushedFilters: [IsNotNull(j), IsNotNull(k)], ReadSchema: struct<i:int,j:int,k:string>
```

## How was this patch tested?

Added a test case in `JoinSuite`. Ran all other tests in `JoinSuite`

Author: Tejas Patil <te...@fb.com>

Closes #14864 from tejasapatil/SPARK-15453_smb_optimization.


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/33549170
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/33549170
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/33549170

Branch: refs/heads/master
Commit: 335491704c526921da3b3c5035175677ba5b92de
Parents: f7d2143
Author: Tejas Patil <te...@fb.com>
Authored: Sat Sep 10 09:27:22 2016 +0800
Committer: Wenchen Fan <we...@databricks.com>
Committed: Sat Sep 10 09:27:22 2016 +0800

----------------------------------------------------------------------
 .../sql/execution/DataSourceScanExec.scala      | 79 +++++++++++++++-----
 .../spark/sql/sources/BucketedReadSuite.scala   | 63 +++++++++++++++-
 2 files changed, 123 insertions(+), 19 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/spark/blob/33549170/sql/core/src/main/scala/org/apache/spark/sql/execution/DataSourceScanExec.scala
----------------------------------------------------------------------
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/DataSourceScanExec.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/DataSourceScanExec.scala
index 9597bdf..6cdba40 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/DataSourceScanExec.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/DataSourceScanExec.scala
@@ -23,12 +23,11 @@ import org.apache.commons.lang3.StringUtils
 import org.apache.hadoop.fs.{BlockLocation, FileStatus, LocatedFileStatus, Path}
 
 import org.apache.spark.rdd.RDD
-import org.apache.spark.sql.{Row, SparkSession, SQLContext}
+import org.apache.spark.sql.{AnalysisException, SparkSession}
 import org.apache.spark.sql.catalyst.{InternalRow, TableIdentifier}
 import org.apache.spark.sql.catalyst.catalog.BucketSpec
 import org.apache.spark.sql.catalyst.expressions._
 import org.apache.spark.sql.catalyst.expressions.codegen.{CodegenContext, ExprCode}
-import org.apache.spark.sql.catalyst.plans.logical._
 import org.apache.spark.sql.catalyst.plans.physical.{HashPartitioning, Partitioning, UnknownPartitioning}
 import org.apache.spark.sql.execution.datasources._
 import org.apache.spark.sql.execution.datasources.parquet.{ParquetFileFormat => ParquetSource}
@@ -156,24 +155,72 @@ case class FileSourceScanExec(
     false
   }
 
-  override val outputPartitioning: Partitioning = {
+  @transient private lazy val selectedPartitions = relation.location.listFiles(partitionFilters)
+
+  override val (outputPartitioning, outputOrdering): (Partitioning, Seq[SortOrder]) = {
     val bucketSpec = if (relation.sparkSession.sessionState.conf.bucketingEnabled) {
       relation.bucketSpec
     } else {
       None
     }
-    bucketSpec.map { spec =>
-      val numBuckets = spec.numBuckets
-      val bucketColumns = spec.bucketColumnNames.flatMap { n =>
-        output.find(_.name == n)
-      }
-      if (bucketColumns.size == spec.bucketColumnNames.size) {
-        HashPartitioning(bucketColumns, numBuckets)
-      } else {
-        UnknownPartitioning(0)
-      }
-    }.getOrElse {
-      UnknownPartitioning(0)
+    bucketSpec match {
+      case Some(spec) =>
+        // For bucketed columns:
+        // -----------------------
+        // `HashPartitioning` would be used only when:
+        // 1. ALL the bucketing columns are being read from the table
+        //
+        // For sorted columns:
+        // ---------------------
+        // Sort ordering should be used when ALL these criteria's match:
+        // 1. `HashPartitioning` is being used
+        // 2. A prefix (or all) of the sort columns are being read from the table.
+        //
+        // Sort ordering would be over the prefix subset of `sort columns` being read
+        // from the table.
+        // eg.
+        // Assume (col0, col2, col3) are the columns read from the table
+        // If sort columns are (col0, col1), then sort ordering would be considered as (col0)
+        // If sort columns are (col1, col0), then sort ordering would be empty as per rule #2
+        // above
+
+        def toAttribute(colName: String): Option[Attribute] =
+          output.find(_.name == colName)
+
+        val bucketColumns = spec.bucketColumnNames.flatMap(n => toAttribute(n))
+        if (bucketColumns.size == spec.bucketColumnNames.size) {
+          val partitioning = HashPartitioning(bucketColumns, spec.numBuckets)
+          val sortColumns =
+            spec.sortColumnNames.map(x => toAttribute(x)).takeWhile(x => x.isDefined).map(_.get)
+
+          val sortOrder = if (sortColumns.nonEmpty) {
+            // In case of bucketing, its possible to have multiple files belonging to the
+            // same bucket in a given relation. Each of these files are locally sorted
+            // but those files combined together are not globally sorted. Given that,
+            // the RDD partition will not be sorted even if the relation has sort columns set
+            // Current solution is to check if all the buckets have a single file in it
+
+            val files = selectedPartitions.flatMap(partition => partition.files)
+            val bucketToFilesGrouping =
+              files.map(_.getPath.getName).groupBy(file => BucketingUtils.getBucketId(file))
+            val singleFilePartitions = bucketToFilesGrouping.forall(p => p._2.length <= 1)
+
+            if (singleFilePartitions) {
+              // TODO Currently Spark does not support writing columns sorting in descending order
+              // so using Ascending order. This can be fixed in future
+              sortColumns.map(attribute => SortOrder(attribute, Ascending))
+            } else {
+              Nil
+            }
+          } else {
+            Nil
+          }
+          (partitioning, sortOrder)
+        } else {
+          (UnknownPartitioning(0), Nil)
+        }
+      case _ =>
+        (UnknownPartitioning(0), Nil)
     }
   }
 
@@ -187,8 +234,6 @@ case class FileSourceScanExec(
     "InputPaths" -> relation.location.paths.mkString(", "))
 
   private lazy val inputRDD: RDD[InternalRow] = {
-    val selectedPartitions = relation.location.listFiles(partitionFilters)
-
     val readFile: (PartitionedFile) => Iterator[InternalRow] =
       relation.fileFormat.buildReaderWithPartitionValues(
         sparkSession = relation.sparkSession,

http://git-wip-us.apache.org/repos/asf/spark/blob/33549170/sql/hive/src/test/scala/org/apache/spark/sql/sources/BucketedReadSuite.scala
----------------------------------------------------------------------
diff --git a/sql/hive/src/test/scala/org/apache/spark/sql/sources/BucketedReadSuite.scala b/sql/hive/src/test/scala/org/apache/spark/sql/sources/BucketedReadSuite.scala
index ca2ec9f..3ff8517 100644
--- a/sql/hive/src/test/scala/org/apache/spark/sql/sources/BucketedReadSuite.scala
+++ b/sql/hive/src/test/scala/org/apache/spark/sql/sources/BucketedReadSuite.scala
@@ -23,7 +23,7 @@ import org.apache.spark.sql._
 import org.apache.spark.sql.catalyst.catalog.BucketSpec
 import org.apache.spark.sql.catalyst.expressions._
 import org.apache.spark.sql.catalyst.plans.physical.HashPartitioning
-import org.apache.spark.sql.execution.DataSourceScanExec
+import org.apache.spark.sql.execution.{DataSourceScanExec, SortExec}
 import org.apache.spark.sql.execution.datasources.DataSourceStrategy
 import org.apache.spark.sql.execution.exchange.ShuffleExchange
 import org.apache.spark.sql.execution.joins.SortMergeJoinExec
@@ -237,7 +237,9 @@ class BucketedReadSuite extends QueryTest with SQLTestUtils with TestHiveSinglet
       bucketSpecRight: Option[BucketSpec],
       joinColumns: Seq[String],
       shuffleLeft: Boolean,
-      shuffleRight: Boolean): Unit = {
+      shuffleRight: Boolean,
+      sortLeft: Boolean = true,
+      sortRight: Boolean = true): Unit = {
     withTable("bucketed_table1", "bucketed_table2") {
       def withBucket(
           writer: DataFrameWriter[Row],
@@ -247,6 +249,15 @@ class BucketedReadSuite extends QueryTest with SQLTestUtils with TestHiveSinglet
             spec.numBuckets,
             spec.bucketColumnNames.head,
             spec.bucketColumnNames.tail: _*)
+
+          if (spec.sortColumnNames.nonEmpty) {
+            writer.sortBy(
+              spec.sortColumnNames.head,
+              spec.sortColumnNames.tail: _*
+            )
+          } else {
+            writer
+          }
         }.getOrElse(writer)
       }
 
@@ -267,12 +278,21 @@ class BucketedReadSuite extends QueryTest with SQLTestUtils with TestHiveSinglet
         assert(joined.queryExecution.executedPlan.isInstanceOf[SortMergeJoinExec])
         val joinOperator = joined.queryExecution.executedPlan.asInstanceOf[SortMergeJoinExec]
 
+        // check existence of shuffle
         assert(
           joinOperator.left.find(_.isInstanceOf[ShuffleExchange]).isDefined == shuffleLeft,
           s"expected shuffle in plan to be $shuffleLeft but found\n${joinOperator.left}")
         assert(
           joinOperator.right.find(_.isInstanceOf[ShuffleExchange]).isDefined == shuffleRight,
           s"expected shuffle in plan to be $shuffleRight but found\n${joinOperator.right}")
+
+        // check existence of sort
+        assert(
+          joinOperator.left.find(_.isInstanceOf[SortExec]).isDefined == sortLeft,
+          s"expected sort in plan to be $shuffleLeft but found\n${joinOperator.left}")
+        assert(
+          joinOperator.right.find(_.isInstanceOf[SortExec]).isDefined == sortRight,
+          s"expected sort in plan to be $shuffleRight but found\n${joinOperator.right}")
       }
     }
   }
@@ -321,6 +341,45 @@ class BucketedReadSuite extends QueryTest with SQLTestUtils with TestHiveSinglet
     }
   }
 
+  test("avoid shuffle and sort when bucket and sort columns are join keys") {
+    val bucketSpec = Some(BucketSpec(8, Seq("i", "j"), Seq("i", "j")))
+    testBucketing(
+      bucketSpec, bucketSpec, Seq("i", "j"),
+      shuffleLeft = false, shuffleRight = false,
+      sortLeft = false, sortRight = false
+    )
+  }
+
+  test("avoid shuffle and sort when sort columns are a super set of join keys") {
+    val bucketSpec1 = Some(BucketSpec(8, Seq("i"), Seq("i", "j")))
+    val bucketSpec2 = Some(BucketSpec(8, Seq("i"), Seq("i", "k")))
+    testBucketing(
+      bucketSpec1, bucketSpec2, Seq("i"),
+      shuffleLeft = false, shuffleRight = false,
+      sortLeft = false, sortRight = false
+    )
+  }
+
+  test("only sort one side when sort columns are different") {
+    val bucketSpec1 = Some(BucketSpec(8, Seq("i", "j"), Seq("i", "j")))
+    val bucketSpec2 = Some(BucketSpec(8, Seq("i", "j"), Seq("k")))
+    testBucketing(
+      bucketSpec1, bucketSpec2, Seq("i", "j"),
+      shuffleLeft = false, shuffleRight = false,
+      sortLeft = false, sortRight = true
+    )
+  }
+
+  test("only sort one side when sort columns are same but their ordering is different") {
+    val bucketSpec1 = Some(BucketSpec(8, Seq("i", "j"), Seq("i", "j")))
+    val bucketSpec2 = Some(BucketSpec(8, Seq("i", "j"), Seq("j", "i")))
+    testBucketing(
+      bucketSpec1, bucketSpec2, Seq("i", "j"),
+      shuffleLeft = false, shuffleRight = false,
+      sortLeft = false, sortRight = true
+    )
+  }
+
   test("avoid shuffle when grouping keys are equal to bucket keys") {
     withTable("bucketed_table") {
       df1.write.format("parquet").bucketBy(8, "i", "j").saveAsTable("bucketed_table")


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@spark.apache.org
For additional commands, e-mail: commits-help@spark.apache.org