You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@spark.apache.org by rx...@apache.org on 2015/07/29 02:42:44 UTC

spark git commit: [SPARK-9418][SQL] Use sort-merge join as the default shuffle join.

Repository: spark
Updated Branches:
  refs/heads/master b7f54119f -> 6662ee212


[SPARK-9418][SQL] Use sort-merge join as the default shuffle join.

Sort-merge join is more robust in Spark since sorting can be made using the Tungsten sort operator.

Author: Reynold Xin <rx...@databricks.com>

Closes #7733 from rxin/smj and squashes the following commits:

61e4d34 [Reynold Xin] Fixed test case.
5ffd731 [Reynold Xin] Fixed JoinSuite.
a137dc0 [Reynold Xin] [SPARK-9418][SQL] Use sort-merge join as the default shuffle join.


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/6662ee21
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/6662ee21
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/6662ee21

Branch: refs/heads/master
Commit: 6662ee21244067180c1bcef0b16107b2979fd933
Parents: b7f5411
Author: Reynold Xin <rx...@databricks.com>
Authored: Tue Jul 28 17:42:35 2015 -0700
Committer: Reynold Xin <rx...@databricks.com>
Committed: Tue Jul 28 17:42:35 2015 -0700

----------------------------------------------------------------------
 .../scala/org/apache/spark/sql/SQLConf.scala    |   2 +-
 .../scala/org/apache/spark/sql/JoinSuite.scala  |   6 +-
 .../execution/HashJoinCompatibilitySuite.scala  | 169 +++++++++++++++++++
 .../execution/SortMergeCompatibilitySuite.scala | 169 -------------------
 .../apache/spark/sql/hive/StatisticsSuite.scala |   2 +-
 5 files changed, 174 insertions(+), 174 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/spark/blob/6662ee21/sql/core/src/main/scala/org/apache/spark/sql/SQLConf.scala
----------------------------------------------------------------------
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/SQLConf.scala b/sql/core/src/main/scala/org/apache/spark/sql/SQLConf.scala
index 40eba33..cdb0c7a 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/SQLConf.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/SQLConf.scala
@@ -322,7 +322,7 @@ private[spark] object SQLConf {
       " memory.")
 
   val SORTMERGE_JOIN = booleanConf("spark.sql.planner.sortMergeJoin",
-    defaultValue = Some(false),
+    defaultValue = Some(true),
     doc = "When true, use sort merge join (as opposed to hash join) by default for large joins.")
 
   // This is only used for the thriftserver

http://git-wip-us.apache.org/repos/asf/spark/blob/6662ee21/sql/core/src/test/scala/org/apache/spark/sql/JoinSuite.scala
----------------------------------------------------------------------
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/JoinSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/JoinSuite.scala
index dfb2a7e..666f26b 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/JoinSuite.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/JoinSuite.scala
@@ -79,9 +79,9 @@ class JoinSuite extends QueryTest with BeforeAndAfterEach {
       ("SELECT * FROM testData FULL OUTER JOIN testData2 WHERE key = 2", classOf[CartesianProduct]),
       ("SELECT * FROM testData JOIN testData2 WHERE key > a", classOf[CartesianProduct]),
       ("SELECT * FROM testData FULL OUTER JOIN testData2 WHERE key > a", classOf[CartesianProduct]),
-      ("SELECT * FROM testData JOIN testData2 ON key = a", classOf[ShuffledHashJoin]),
-      ("SELECT * FROM testData JOIN testData2 ON key = a and key = 2", classOf[ShuffledHashJoin]),
-      ("SELECT * FROM testData JOIN testData2 ON key = a where key = 2", classOf[ShuffledHashJoin]),
+      ("SELECT * FROM testData JOIN testData2 ON key = a", classOf[SortMergeJoin]),
+      ("SELECT * FROM testData JOIN testData2 ON key = a and key = 2", classOf[SortMergeJoin]),
+      ("SELECT * FROM testData JOIN testData2 ON key = a where key = 2", classOf[SortMergeJoin]),
       ("SELECT * FROM testData LEFT JOIN testData2 ON key = a", classOf[ShuffledHashOuterJoin]),
       ("SELECT * FROM testData RIGHT JOIN testData2 ON key = a where key = 2",
         classOf[ShuffledHashOuterJoin]),

http://git-wip-us.apache.org/repos/asf/spark/blob/6662ee21/sql/hive/compatibility/src/test/scala/org/apache/spark/sql/hive/execution/HashJoinCompatibilitySuite.scala
----------------------------------------------------------------------
diff --git a/sql/hive/compatibility/src/test/scala/org/apache/spark/sql/hive/execution/HashJoinCompatibilitySuite.scala b/sql/hive/compatibility/src/test/scala/org/apache/spark/sql/hive/execution/HashJoinCompatibilitySuite.scala
new file mode 100644
index 0000000..1a5ba20
--- /dev/null
+++ b/sql/hive/compatibility/src/test/scala/org/apache/spark/sql/hive/execution/HashJoinCompatibilitySuite.scala
@@ -0,0 +1,169 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.hive.execution
+
+import java.io.File
+
+import org.apache.spark.sql.SQLConf
+import org.apache.spark.sql.hive.test.TestHive
+
+/**
+ * Runs the test cases that are included in the hive distribution with hash joins.
+ */
+class HashJoinCompatibilitySuite extends HiveCompatibilitySuite {
+  override def beforeAll() {
+    super.beforeAll()
+    TestHive.setConf(SQLConf.SORTMERGE_JOIN, false)
+  }
+
+  override def afterAll() {
+    TestHive.setConf(SQLConf.SORTMERGE_JOIN, true)
+    super.afterAll()
+  }
+
+  override def whiteList = Seq(
+    "auto_join0",
+    "auto_join1",
+    "auto_join10",
+    "auto_join11",
+    "auto_join12",
+    "auto_join13",
+    "auto_join14",
+    "auto_join14_hadoop20",
+    "auto_join15",
+    "auto_join17",
+    "auto_join18",
+    "auto_join19",
+    "auto_join2",
+    "auto_join20",
+    "auto_join21",
+    "auto_join22",
+    "auto_join23",
+    "auto_join24",
+    "auto_join25",
+    "auto_join26",
+    "auto_join27",
+    "auto_join28",
+    "auto_join3",
+    "auto_join30",
+    "auto_join31",
+    "auto_join32",
+    "auto_join4",
+    "auto_join5",
+    "auto_join6",
+    "auto_join7",
+    "auto_join8",
+    "auto_join9",
+    "auto_join_filters",
+    "auto_join_nulls",
+    "auto_join_reordering_values",
+    "auto_smb_mapjoin_14",
+    "auto_sortmerge_join_1",
+    "auto_sortmerge_join_10",
+    "auto_sortmerge_join_11",
+    "auto_sortmerge_join_12",
+    "auto_sortmerge_join_13",
+    "auto_sortmerge_join_14",
+    "auto_sortmerge_join_15",
+    "auto_sortmerge_join_16",
+    "auto_sortmerge_join_2",
+    "auto_sortmerge_join_3",
+    "auto_sortmerge_join_4",
+    "auto_sortmerge_join_5",
+    "auto_sortmerge_join_6",
+    "auto_sortmerge_join_7",
+    "auto_sortmerge_join_8",
+    "auto_sortmerge_join_9",
+    "correlationoptimizer1",
+    "correlationoptimizer10",
+    "correlationoptimizer11",
+    "correlationoptimizer13",
+    "correlationoptimizer14",
+    "correlationoptimizer15",
+    "correlationoptimizer2",
+    "correlationoptimizer3",
+    "correlationoptimizer4",
+    "correlationoptimizer6",
+    "correlationoptimizer7",
+    "correlationoptimizer8",
+    "correlationoptimizer9",
+    "join0",
+    "join1",
+    "join10",
+    "join11",
+    "join12",
+    "join13",
+    "join14",
+    "join14_hadoop20",
+    "join15",
+    "join16",
+    "join17",
+    "join18",
+    "join19",
+    "join2",
+    "join20",
+    "join21",
+    "join22",
+    "join23",
+    "join24",
+    "join25",
+    "join26",
+    "join27",
+    "join28",
+    "join29",
+    "join3",
+    "join30",
+    "join31",
+    "join32",
+    "join32_lessSize",
+    "join33",
+    "join34",
+    "join35",
+    "join36",
+    "join37",
+    "join38",
+    "join39",
+    "join4",
+    "join40",
+    "join41",
+    "join5",
+    "join6",
+    "join7",
+    "join8",
+    "join9",
+    "join_1to1",
+    "join_array",
+    "join_casesensitive",
+    "join_empty",
+    "join_filters",
+    "join_hive_626",
+    "join_map_ppr",
+    "join_nulls",
+    "join_nullsafe",
+    "join_rc",
+    "join_reorder2",
+    "join_reorder3",
+    "join_reorder4",
+    "join_star"
+  )
+
+  // Only run those query tests in the realWhileList (do not try other ignored query files).
+  override def testCases: Seq[(String, File)] = super.testCases.filter {
+    case (name, _) => realWhiteList.contains(name)
+  }
+}

http://git-wip-us.apache.org/repos/asf/spark/blob/6662ee21/sql/hive/compatibility/src/test/scala/org/apache/spark/sql/hive/execution/SortMergeCompatibilitySuite.scala
----------------------------------------------------------------------
diff --git a/sql/hive/compatibility/src/test/scala/org/apache/spark/sql/hive/execution/SortMergeCompatibilitySuite.scala b/sql/hive/compatibility/src/test/scala/org/apache/spark/sql/hive/execution/SortMergeCompatibilitySuite.scala
deleted file mode 100644
index 1fe4fe9..0000000
--- a/sql/hive/compatibility/src/test/scala/org/apache/spark/sql/hive/execution/SortMergeCompatibilitySuite.scala
+++ /dev/null
@@ -1,169 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.spark.sql.hive.execution
-
-import java.io.File
-
-import org.apache.spark.sql.SQLConf
-import org.apache.spark.sql.hive.test.TestHive
-
-/**
- * Runs the test cases that are included in the hive distribution with sort merge join is true.
- */
-class SortMergeCompatibilitySuite extends HiveCompatibilitySuite {
-  override def beforeAll() {
-    super.beforeAll()
-    TestHive.setConf(SQLConf.SORTMERGE_JOIN, true)
-  }
-
-  override def afterAll() {
-    TestHive.setConf(SQLConf.SORTMERGE_JOIN, false)
-    super.afterAll()
-  }
-
-  override def whiteList = Seq(
-    "auto_join0",
-    "auto_join1",
-    "auto_join10",
-    "auto_join11",
-    "auto_join12",
-    "auto_join13",
-    "auto_join14",
-    "auto_join14_hadoop20",
-    "auto_join15",
-    "auto_join17",
-    "auto_join18",
-    "auto_join19",
-    "auto_join2",
-    "auto_join20",
-    "auto_join21",
-    "auto_join22",
-    "auto_join23",
-    "auto_join24",
-    "auto_join25",
-    "auto_join26",
-    "auto_join27",
-    "auto_join28",
-    "auto_join3",
-    "auto_join30",
-    "auto_join31",
-    "auto_join32",
-    "auto_join4",
-    "auto_join5",
-    "auto_join6",
-    "auto_join7",
-    "auto_join8",
-    "auto_join9",
-    "auto_join_filters",
-    "auto_join_nulls",
-    "auto_join_reordering_values",
-    "auto_smb_mapjoin_14",
-    "auto_sortmerge_join_1",
-    "auto_sortmerge_join_10",
-    "auto_sortmerge_join_11",
-    "auto_sortmerge_join_12",
-    "auto_sortmerge_join_13",
-    "auto_sortmerge_join_14",
-    "auto_sortmerge_join_15",
-    "auto_sortmerge_join_16",
-    "auto_sortmerge_join_2",
-    "auto_sortmerge_join_3",
-    "auto_sortmerge_join_4",
-    "auto_sortmerge_join_5",
-    "auto_sortmerge_join_6",
-    "auto_sortmerge_join_7",
-    "auto_sortmerge_join_8",
-    "auto_sortmerge_join_9",
-    "correlationoptimizer1",
-    "correlationoptimizer10",
-    "correlationoptimizer11",
-    "correlationoptimizer13",
-    "correlationoptimizer14",
-    "correlationoptimizer15",
-    "correlationoptimizer2",
-    "correlationoptimizer3",
-    "correlationoptimizer4",
-    "correlationoptimizer6",
-    "correlationoptimizer7",
-    "correlationoptimizer8",
-    "correlationoptimizer9",
-    "join0",
-    "join1",
-    "join10",
-    "join11",
-    "join12",
-    "join13",
-    "join14",
-    "join14_hadoop20",
-    "join15",
-    "join16",
-    "join17",
-    "join18",
-    "join19",
-    "join2",
-    "join20",
-    "join21",
-    "join22",
-    "join23",
-    "join24",
-    "join25",
-    "join26",
-    "join27",
-    "join28",
-    "join29",
-    "join3",
-    "join30",
-    "join31",
-    "join32",
-    "join32_lessSize",
-    "join33",
-    "join34",
-    "join35",
-    "join36",
-    "join37",
-    "join38",
-    "join39",
-    "join4",
-    "join40",
-    "join41",
-    "join5",
-    "join6",
-    "join7",
-    "join8",
-    "join9",
-    "join_1to1",
-    "join_array",
-    "join_casesensitive",
-    "join_empty",
-    "join_filters",
-    "join_hive_626",
-    "join_map_ppr",
-    "join_nulls",
-    "join_nullsafe",
-    "join_rc",
-    "join_reorder2",
-    "join_reorder3",
-    "join_reorder4",
-    "join_star"
-  )
-
-  // Only run those query tests in the realWhileList (do not try other ignored query files).
-  override def testCases: Seq[(String, File)] = super.testCases.filter {
-    case (name, _) => realWhiteList.contains(name)
-  }
-}

http://git-wip-us.apache.org/repos/asf/spark/blob/6662ee21/sql/hive/src/test/scala/org/apache/spark/sql/hive/StatisticsSuite.scala
----------------------------------------------------------------------
diff --git a/sql/hive/src/test/scala/org/apache/spark/sql/hive/StatisticsSuite.scala b/sql/hive/src/test/scala/org/apache/spark/sql/hive/StatisticsSuite.scala
index f067ea0..bc72b01 100644
--- a/sql/hive/src/test/scala/org/apache/spark/sql/hive/StatisticsSuite.scala
+++ b/sql/hive/src/test/scala/org/apache/spark/sql/hive/StatisticsSuite.scala
@@ -172,7 +172,7 @@ class StatisticsSuite extends QueryTest with BeforeAndAfterAll {
         bhj = df.queryExecution.sparkPlan.collect { case j: BroadcastHashJoin => j }
         assert(bhj.isEmpty, "BroadcastHashJoin still planned even though it is switched off")
 
-        val shj = df.queryExecution.sparkPlan.collect { case j: ShuffledHashJoin => j }
+        val shj = df.queryExecution.sparkPlan.collect { case j: SortMergeJoin => j }
         assert(shj.size === 1,
           "ShuffledHashJoin should be planned when BroadcastHashJoin is turned off")
 


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@spark.apache.org
For additional commands, e-mail: commits-help@spark.apache.org