You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@spark.apache.org by li...@apache.org on 2017/02/20 17:04:28 UTC

spark git commit: [SPARK-15453][SQL][FOLLOW-UP] FileSourceScanExec to extract `outputOrdering` information

Repository: spark
Updated Branches:
  refs/heads/master d0ecca607 -> ead4ba0eb


[SPARK-15453][SQL][FOLLOW-UP] FileSourceScanExec to extract `outputOrdering` information

### What changes were proposed in this pull request?
`outputOrdering` is also dependent on whether the bucket has more than one files. The test cases fail when we try to move them to sql/core. This PR is to fix the test cases introduced in https://github.com/apache/spark/pull/14864 and add a test case to verify [the related logics](https://github.com/tejasapatil/spark/blob/070c24994747c0479fb2520774ede27ff1cf8cac/sql/core/src/main/scala/org/apache/spark/sql/execution/DataSourceScanExec.scala#L197-L206).

### How was this patch tested?
N/A

Author: Xiao Li <ga...@gmail.com>

Closes #16994 from gatorsmile/bucketingTS.


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/ead4ba0e
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/ead4ba0e
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/ead4ba0e

Branch: refs/heads/master
Commit: ead4ba0eb5841e42e6a57c1a1865bf89564e8ff9
Parents: d0ecca6
Author: Xiao Li <ga...@gmail.com>
Authored: Mon Feb 20 09:04:22 2017 -0800
Committer: Xiao Li <ga...@gmail.com>
Committed: Mon Feb 20 09:04:22 2017 -0800

----------------------------------------------------------------------
 .../spark/sql/sources/BucketedReadSuite.scala   | 229 +++++++++++--------
 1 file changed, 137 insertions(+), 92 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/spark/blob/ead4ba0e/sql/hive/src/test/scala/org/apache/spark/sql/sources/BucketedReadSuite.scala
----------------------------------------------------------------------
diff --git a/sql/hive/src/test/scala/org/apache/spark/sql/sources/BucketedReadSuite.scala b/sql/hive/src/test/scala/org/apache/spark/sql/sources/BucketedReadSuite.scala
index d9ddcbd..4fc72b9 100644
--- a/sql/hive/src/test/scala/org/apache/spark/sql/sources/BucketedReadSuite.scala
+++ b/sql/hive/src/test/scala/org/apache/spark/sql/sources/BucketedReadSuite.scala
@@ -227,6 +227,12 @@ class BucketedReadSuite extends QueryTest with SQLTestUtils with TestHiveSinglet
   private val df1 = (0 until 50).map(i => (i % 5, i % 13, i.toString)).toDF("i", "j", "k").as("df1")
   private val df2 = (0 until 50).map(i => (i % 7, i % 11, i.toString)).toDF("i", "j", "k").as("df2")
 
+  case class BucketedTableTestSpec(
+      bucketSpec: Option[BucketSpec],
+      numPartitions: Int = 10,
+      expectedShuffle: Boolean = true,
+      expectedSort: Boolean = true)
+
   /**
    * A helper method to test the bucket read functionality using join.  It will save `df1` and `df2`
    * to hive tables, bucketed or not, according to the given bucket specifics.  Next we will join
@@ -234,14 +240,15 @@ class BucketedReadSuite extends QueryTest with SQLTestUtils with TestHiveSinglet
    * exists as user expected according to the `shuffleLeft` and `shuffleRight`.
    */
   private def testBucketing(
-      bucketSpecLeft: Option[BucketSpec],
-      bucketSpecRight: Option[BucketSpec],
+      bucketedTableTestSpecLeft: BucketedTableTestSpec,
+      bucketedTableTestSpecRight: BucketedTableTestSpec,
       joinType: String = "inner",
-      joinCondition: (DataFrame, DataFrame) => Column,
-      shuffleLeft: Boolean,
-      shuffleRight: Boolean,
-      sortLeft: Boolean = true,
-      sortRight: Boolean = true): Unit = {
+      joinCondition: (DataFrame, DataFrame) => Column): Unit = {
+    val BucketedTableTestSpec(bucketSpecLeft, numPartitionsLeft, shuffleLeft, sortLeft) =
+      bucketedTableTestSpecLeft
+    val BucketedTableTestSpec(bucketSpecRight, numPartitionsRight, shuffleRight, sortRight) =
+      bucketedTableTestSpecRight
+
     withTable("bucketed_table1", "bucketed_table2") {
       def withBucket(
           writer: DataFrameWriter[Row],
@@ -263,8 +270,10 @@ class BucketedReadSuite extends QueryTest with SQLTestUtils with TestHiveSinglet
         }.getOrElse(writer)
       }
 
-      withBucket(df1.write.format("parquet"), bucketSpecLeft).saveAsTable("bucketed_table1")
-      withBucket(df2.write.format("parquet"), bucketSpecRight).saveAsTable("bucketed_table2")
+      withBucket(df1.repartition(numPartitionsLeft).write.format("parquet"), bucketSpecLeft)
+        .saveAsTable("bucketed_table1")
+      withBucket(df2.repartition(numPartitionsRight).write.format("parquet"), bucketSpecRight)
+        .saveAsTable("bucketed_table2")
 
       withSQLConf(SQLConf.AUTO_BROADCASTJOIN_THRESHOLD.key -> "0",
         SQLConf.WHOLESTAGE_CODEGEN_ENABLED.key -> "false") {
@@ -291,10 +300,10 @@ class BucketedReadSuite extends QueryTest with SQLTestUtils with TestHiveSinglet
         // check existence of sort
         assert(
           joinOperator.left.find(_.isInstanceOf[SortExec]).isDefined == sortLeft,
-          s"expected sort in plan to be $shuffleLeft but found\n${joinOperator.left}")
+          s"expected sort in the left child to be $sortLeft but found\n${joinOperator.left}")
         assert(
           joinOperator.right.find(_.isInstanceOf[SortExec]).isDefined == sortRight,
-          s"expected sort in plan to be $shuffleRight but found\n${joinOperator.right}")
+          s"expected sort in the right child to be $sortRight but found\n${joinOperator.right}")
       }
     }
   }
@@ -305,138 +314,174 @@ class BucketedReadSuite extends QueryTest with SQLTestUtils with TestHiveSinglet
 
   test("avoid shuffle when join 2 bucketed tables") {
     val bucketSpec = Some(BucketSpec(8, Seq("i", "j"), Nil))
+    val bucketedTableTestSpecLeft = BucketedTableTestSpec(bucketSpec, expectedShuffle = false)
+    val bucketedTableTestSpecRight = BucketedTableTestSpec(bucketSpec, expectedShuffle = false)
     testBucketing(
-      bucketSpecLeft = bucketSpec,
-      bucketSpecRight = bucketSpec,
-      joinCondition = joinCondition(Seq("i", "j")),
-      shuffleLeft = false,
-      shuffleRight = false
+      bucketedTableTestSpecLeft = bucketedTableTestSpecLeft,
+      bucketedTableTestSpecRight = bucketedTableTestSpecRight,
+      joinCondition = joinCondition(Seq("i", "j"))
     )
   }
 
   // Enable it after fix https://issues.apache.org/jira/browse/SPARK-12704
   ignore("avoid shuffle when join keys are a super-set of bucket keys") {
     val bucketSpec = Some(BucketSpec(8, Seq("i"), Nil))
+    val bucketedTableTestSpecLeft = BucketedTableTestSpec(bucketSpec, expectedShuffle = false)
+    val bucketedTableTestSpecRight = BucketedTableTestSpec(bucketSpec, expectedShuffle = false)
     testBucketing(
-      bucketSpecLeft = bucketSpec,
-      bucketSpecRight = bucketSpec,
-      joinCondition = joinCondition(Seq("i", "j")),
-      shuffleLeft = false,
-      shuffleRight = false
+      bucketedTableTestSpecLeft = bucketedTableTestSpecLeft,
+      bucketedTableTestSpecRight = bucketedTableTestSpecRight,
+      joinCondition = joinCondition(Seq("i", "j"))
     )
   }
 
   test("only shuffle one side when join bucketed table and non-bucketed table") {
     val bucketSpec = Some(BucketSpec(8, Seq("i", "j"), Nil))
+    val bucketedTableTestSpecLeft = BucketedTableTestSpec(bucketSpec, expectedShuffle = false)
+    val bucketedTableTestSpecRight = BucketedTableTestSpec(None, expectedShuffle = true)
     testBucketing(
-      bucketSpecLeft = bucketSpec,
-      bucketSpecRight = None,
-      joinCondition = joinCondition(Seq("i", "j")),
-      shuffleLeft = false,
-      shuffleRight = true
+      bucketedTableTestSpecLeft = bucketedTableTestSpecLeft,
+      bucketedTableTestSpecRight = bucketedTableTestSpecRight,
+      joinCondition = joinCondition(Seq("i", "j"))
     )
   }
 
   test("only shuffle one side when 2 bucketed tables have different bucket number") {
-    val bucketSpec1 = Some(BucketSpec(8, Seq("i", "j"), Nil))
-    val bucketSpec2 = Some(BucketSpec(5, Seq("i", "j"), Nil))
+    val bucketSpecLeft = Some(BucketSpec(8, Seq("i", "j"), Nil))
+    val bucketSpecRight = Some(BucketSpec(5, Seq("i", "j"), Nil))
+    val bucketedTableTestSpecLeft = BucketedTableTestSpec(bucketSpecLeft, expectedShuffle = false)
+    val bucketedTableTestSpecRight = BucketedTableTestSpec(bucketSpecRight, expectedShuffle = true)
     testBucketing(
-      bucketSpecLeft = bucketSpec1,
-      bucketSpecRight = bucketSpec2,
-      joinCondition = joinCondition(Seq("i", "j")),
-      shuffleLeft = false,
-      shuffleRight = true
+      bucketedTableTestSpecLeft = bucketedTableTestSpecLeft,
+      bucketedTableTestSpecRight = bucketedTableTestSpecRight,
+      joinCondition = joinCondition(Seq("i", "j"))
     )
   }
 
   test("only shuffle one side when 2 bucketed tables have different bucket keys") {
-    val bucketSpec1 = Some(BucketSpec(8, Seq("i"), Nil))
-    val bucketSpec2 = Some(BucketSpec(8, Seq("j"), Nil))
+    val bucketSpecLeft = Some(BucketSpec(8, Seq("i"), Nil))
+    val bucketSpecRight = Some(BucketSpec(8, Seq("j"), Nil))
+    val bucketedTableTestSpecLeft = BucketedTableTestSpec(bucketSpecLeft, expectedShuffle = false)
+    val bucketedTableTestSpecRight = BucketedTableTestSpec(bucketSpecRight, expectedShuffle = true)
     testBucketing(
-      bucketSpecLeft = bucketSpec1,
-      bucketSpecRight = bucketSpec2,
-      joinCondition = joinCondition(Seq("i")),
-      shuffleLeft = false,
-      shuffleRight = true
+      bucketedTableTestSpecLeft = bucketedTableTestSpecLeft,
+      bucketedTableTestSpecRight = bucketedTableTestSpecRight,
+      joinCondition = joinCondition(Seq("i"))
     )
   }
 
   test("shuffle when join keys are not equal to bucket keys") {
     val bucketSpec = Some(BucketSpec(8, Seq("i"), Nil))
+    val bucketedTableTestSpecLeft = BucketedTableTestSpec(bucketSpec, expectedShuffle = true)
+    val bucketedTableTestSpecRight = BucketedTableTestSpec(bucketSpec, expectedShuffle = true)
     testBucketing(
-      bucketSpecLeft = bucketSpec,
-      bucketSpecRight = bucketSpec,
-      joinCondition = joinCondition(Seq("j")),
-      shuffleLeft = true,
-      shuffleRight = true
+      bucketedTableTestSpecLeft = bucketedTableTestSpecLeft,
+      bucketedTableTestSpecRight = bucketedTableTestSpecRight,
+      joinCondition = joinCondition(Seq("j"))
     )
   }
 
   test("shuffle when join 2 bucketed tables with bucketing disabled") {
     val bucketSpec = Some(BucketSpec(8, Seq("i", "j"), Nil))
+    val bucketedTableTestSpecLeft = BucketedTableTestSpec(bucketSpec, expectedShuffle = true)
+    val bucketedTableTestSpecRight = BucketedTableTestSpec(bucketSpec, expectedShuffle = true)
     withSQLConf(SQLConf.BUCKETING_ENABLED.key -> "false") {
       testBucketing(
-        bucketSpecLeft = bucketSpec,
-        bucketSpecRight = bucketSpec,
-        joinCondition = joinCondition(Seq("i", "j")),
-        shuffleLeft = true,
-        shuffleRight = true
+        bucketedTableTestSpecLeft = bucketedTableTestSpecLeft,
+        bucketedTableTestSpecRight = bucketedTableTestSpecRight,
+        joinCondition = joinCondition(Seq("i", "j"))
       )
     }
   }
 
-  test("avoid shuffle and sort when bucket and sort columns are join keys") {
+  test("check sort and shuffle when bucket and sort columns are join keys") {
+    // In case of bucketing, its possible to have multiple files belonging to the
+    // same bucket in a given relation. Each of these files are locally sorted
+    // but those files combined together are not globally sorted. Given that,
+    // the RDD partition will not be sorted even if the relation has sort columns set
+    // Therefore, we still need to keep the Sort in both sides.
     val bucketSpec = Some(BucketSpec(8, Seq("i", "j"), Seq("i", "j")))
+
+    val bucketedTableTestSpecLeft1 = BucketedTableTestSpec(
+      bucketSpec, numPartitions = 50, expectedShuffle = false, expectedSort = true)
+    val bucketedTableTestSpecRight1 = BucketedTableTestSpec(
+      bucketSpec, numPartitions = 1, expectedShuffle = false, expectedSort = false)
     testBucketing(
-      bucketSpecLeft = bucketSpec,
-      bucketSpecRight = bucketSpec,
-      joinCondition = joinCondition(Seq("i", "j")),
-      shuffleLeft = false,
-      shuffleRight = false,
-      sortLeft = false,
-      sortRight = false
+      bucketedTableTestSpecLeft = bucketedTableTestSpecLeft1,
+      bucketedTableTestSpecRight = bucketedTableTestSpecRight1,
+      joinCondition = joinCondition(Seq("i", "j"))
+    )
+
+    val bucketedTableTestSpecLeft2 = BucketedTableTestSpec(
+      bucketSpec, numPartitions = 1, expectedShuffle = false, expectedSort = false)
+    val bucketedTableTestSpecRight2 = BucketedTableTestSpec(
+      bucketSpec, numPartitions = 50, expectedShuffle = false, expectedSort = true)
+    testBucketing(
+      bucketedTableTestSpecLeft = bucketedTableTestSpecLeft2,
+      bucketedTableTestSpecRight = bucketedTableTestSpecRight2,
+      joinCondition = joinCondition(Seq("i", "j"))
+    )
+
+    val bucketedTableTestSpecLeft3 = BucketedTableTestSpec(
+      bucketSpec, numPartitions = 50, expectedShuffle = false, expectedSort = true)
+    val bucketedTableTestSpecRight3 = BucketedTableTestSpec(
+      bucketSpec, numPartitions = 50, expectedShuffle = false, expectedSort = true)
+    testBucketing(
+      bucketedTableTestSpecLeft = bucketedTableTestSpecLeft3,
+      bucketedTableTestSpecRight = bucketedTableTestSpecRight3,
+      joinCondition = joinCondition(Seq("i", "j"))
+    )
+
+    val bucketedTableTestSpecLeft4 = BucketedTableTestSpec(
+      bucketSpec, numPartitions = 1, expectedShuffle = false, expectedSort = false)
+    val bucketedTableTestSpecRight4 = BucketedTableTestSpec(
+      bucketSpec, numPartitions = 1, expectedShuffle = false, expectedSort = false)
+    testBucketing(
+      bucketedTableTestSpecLeft = bucketedTableTestSpecLeft4,
+      bucketedTableTestSpecRight = bucketedTableTestSpecRight4,
+      joinCondition = joinCondition(Seq("i", "j"))
     )
   }
 
   test("avoid shuffle and sort when sort columns are a super set of join keys") {
-    val bucketSpec1 = Some(BucketSpec(8, Seq("i"), Seq("i", "j")))
-    val bucketSpec2 = Some(BucketSpec(8, Seq("i"), Seq("i", "k")))
+    val bucketSpecLeft = Some(BucketSpec(8, Seq("i"), Seq("i", "j")))
+    val bucketSpecRight = Some(BucketSpec(8, Seq("i"), Seq("i", "k")))
+    val bucketedTableTestSpecLeft = BucketedTableTestSpec(
+      bucketSpecLeft, numPartitions = 1, expectedShuffle = false, expectedSort = false)
+    val bucketedTableTestSpecRight = BucketedTableTestSpec(
+      bucketSpecRight, numPartitions = 1, expectedShuffle = false, expectedSort = false)
     testBucketing(
-      bucketSpecLeft = bucketSpec1,
-      bucketSpecRight = bucketSpec2,
-      joinCondition = joinCondition(Seq("i")),
-      shuffleLeft = false,
-      shuffleRight = false,
-      sortLeft = false,
-      sortRight = false
+      bucketedTableTestSpecLeft = bucketedTableTestSpecLeft,
+      bucketedTableTestSpecRight = bucketedTableTestSpecRight,
+      joinCondition = joinCondition(Seq("i"))
     )
   }
 
   test("only sort one side when sort columns are different") {
-    val bucketSpec1 = Some(BucketSpec(8, Seq("i", "j"), Seq("i", "j")))
-    val bucketSpec2 = Some(BucketSpec(8, Seq("i", "j"), Seq("k")))
+    val bucketSpecLeft = Some(BucketSpec(8, Seq("i", "j"), Seq("i", "j")))
+    val bucketSpecRight = Some(BucketSpec(8, Seq("i", "j"), Seq("k")))
+    val bucketedTableTestSpecLeft = BucketedTableTestSpec(
+      bucketSpecLeft, numPartitions = 1, expectedShuffle = false, expectedSort = false)
+    val bucketedTableTestSpecRight = BucketedTableTestSpec(
+      bucketSpecRight, numPartitions = 1, expectedShuffle = false, expectedSort = true)
     testBucketing(
-      bucketSpecLeft = bucketSpec1,
-      bucketSpecRight = bucketSpec2,
-      joinCondition = joinCondition(Seq("i", "j")),
-      shuffleLeft = false,
-      shuffleRight = false,
-      sortLeft = false,
-      sortRight = true
+      bucketedTableTestSpecLeft = bucketedTableTestSpecLeft,
+      bucketedTableTestSpecRight = bucketedTableTestSpecRight,
+      joinCondition = joinCondition(Seq("i", "j"))
     )
   }
 
   test("only sort one side when sort columns are same but their ordering is different") {
-    val bucketSpec1 = Some(BucketSpec(8, Seq("i", "j"), Seq("i", "j")))
-    val bucketSpec2 = Some(BucketSpec(8, Seq("i", "j"), Seq("j", "i")))
+    val bucketSpecLeft = Some(BucketSpec(8, Seq("i", "j"), Seq("i", "j")))
+    val bucketSpecRight = Some(BucketSpec(8, Seq("i", "j"), Seq("j", "i")))
+    val bucketedTableTestSpecLeft = BucketedTableTestSpec(
+      bucketSpecLeft, numPartitions = 1, expectedShuffle = false, expectedSort = false)
+    val bucketedTableTestSpecRight = BucketedTableTestSpec(
+      bucketSpecRight, numPartitions = 1, expectedShuffle = false, expectedSort = true)
     testBucketing(
-      bucketSpecLeft = bucketSpec1,
-      bucketSpecRight = bucketSpec2,
-      joinCondition = joinCondition(Seq("i", "j")),
-      shuffleLeft = false,
-      shuffleRight = false,
-      sortLeft = false,
-      sortRight = true
+      bucketedTableTestSpecLeft = bucketedTableTestSpecLeft,
+      bucketedTableTestSpecRight = bucketedTableTestSpecRight,
+      joinCondition = joinCondition(Seq("i", "j"))
     )
   }
 
@@ -470,20 +515,20 @@ class BucketedReadSuite extends QueryTest with SQLTestUtils with TestHiveSinglet
 
   test("SPARK-17698 Join predicates should not contain filter clauses") {
     val bucketSpec = Some(BucketSpec(8, Seq("i"), Seq("i")))
+    val bucketedTableTestSpecLeft = BucketedTableTestSpec(
+      bucketSpec, numPartitions = 1, expectedShuffle = false, expectedSort = false)
+    val bucketedTableTestSpecRight = BucketedTableTestSpec(
+      bucketSpec, numPartitions = 1, expectedShuffle = false, expectedSort = false)
     testBucketing(
-      bucketSpecLeft = bucketSpec,
-      bucketSpecRight = bucketSpec,
+      bucketedTableTestSpecLeft = bucketedTableTestSpecLeft,
+      bucketedTableTestSpecRight = bucketedTableTestSpecRight,
       joinType = "fullouter",
       joinCondition = (left: DataFrame, right: DataFrame) => {
         val joinPredicates = Seq("i").map(col => left(col) === right(col)).reduce(_ && _)
         val filterLeft = left("i") === Literal("1")
         val filterRight = right("i") === Literal("1")
         joinPredicates && filterLeft && filterRight
-      },
-      shuffleLeft = false,
-      shuffleRight = false,
-      sortLeft = false,
-      sortRight = false
+      }
     )
   }
 


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@spark.apache.org
For additional commands, e-mail: commits-help@spark.apache.org