You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@spark.apache.org by hv...@apache.org on 2017/02/15 16:10:55 UTC

spark git commit: [SPARK-16475][SQL] broadcast hint for SQL queries - follow up

Repository: spark
Updated Branches:
  refs/heads/master b55563c17 -> 733c59ec1


[SPARK-16475][SQL] broadcast hint for SQL queries - follow up

## What changes were proposed in this pull request?
A small update to https://github.com/apache/spark/pull/16925

1. Rename SubstituteHints -> ResolveHints to be more consistent with rest of the rules.
2. Added more documentation in the rule and be more defensive / future proof to skip views as well as CTEs.

## How was this patch tested?
This pull request contains no real logic change and all behavior should be covered by existing tests.

Author: Reynold Xin <rx...@databricks.com>

Closes #16939 from rxin/SPARK-16475.


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/733c59ec
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/733c59ec
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/733c59ec

Branch: refs/heads/master
Commit: 733c59ec1ee5746c322e68459cd06241f5fa0903
Parents: b55563c
Author: Reynold Xin <rx...@databricks.com>
Authored: Wed Feb 15 17:10:49 2017 +0100
Committer: Herman van Hovell <hv...@databricks.com>
Committed: Wed Feb 15 17:10:49 2017 +0100

----------------------------------------------------------------------
 .../spark/sql/catalyst/analysis/Analyzer.scala  |   4 +-
 .../sql/catalyst/analysis/ResolveHints.scala    | 103 ++++++++++++++++
 .../sql/catalyst/analysis/SubstituteHints.scala | 104 ----------------
 .../catalyst/analysis/ResolveHintsSuite.scala   | 120 ++++++++++++++++++
 .../analysis/SubstituteHintsSuite.scala         | 121 -------------------
 5 files changed, 225 insertions(+), 227 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/spark/blob/733c59ec/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala
----------------------------------------------------------------------
diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala
index 8348cb5..6aa0e8d 100644
--- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala
+++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala
@@ -115,8 +115,8 @@ class Analyzer(
 
   lazy val batches: Seq[Batch] = Seq(
     Batch("Hints", fixedPoint,
-      new SubstituteHints.SubstituteBroadcastHints(conf),
-      SubstituteHints.RemoveAllHints),
+      new ResolveHints.ResolveBroadcastHints(conf),
+      ResolveHints.RemoveAllHints),
     Batch("Substitution", fixedPoint,
       CTESubstitution,
       WindowsSubstitution,

http://git-wip-us.apache.org/repos/asf/spark/blob/733c59ec/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/ResolveHints.scala
----------------------------------------------------------------------
diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/ResolveHints.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/ResolveHints.scala
new file mode 100644
index 0000000..2124177
--- /dev/null
+++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/ResolveHints.scala
@@ -0,0 +1,103 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.catalyst.analysis
+
+import org.apache.spark.sql.catalyst.CatalystConf
+import org.apache.spark.sql.catalyst.plans.logical._
+import org.apache.spark.sql.catalyst.rules.Rule
+import org.apache.spark.sql.catalyst.trees.CurrentOrigin
+
+
+/**
+ * Collection of rules related to hints. The only hint currently available is broadcast join hint.
+ *
+ * Note that this is separatedly into two rules because in the future we might introduce new hint
+ * rules that have different ordering requirements from broadcast.
+ */
+object ResolveHints {
+
+  /**
+   * For broadcast hint, we accept "BROADCAST", "BROADCASTJOIN", and "MAPJOIN", and a sequence of
+   * relation aliases can be specified in the hint. A broadcast hint plan node will be inserted
+   * on top of any relation (that is not aliased differently), subquery, or common table expression
+   * that match the specified name.
+   *
+   * The hint resolution works by recursively traversing down the query plan to find a relation or
+   * subquery that matches one of the specified broadcast aliases. The traversal does not go past
+   * beyond any existing broadcast hints, subquery aliases.
+   *
+   * This rule must happen before common table expressions.
+   */
+  class ResolveBroadcastHints(conf: CatalystConf) extends Rule[LogicalPlan] {
+    private val BROADCAST_HINT_NAMES = Set("BROADCAST", "BROADCASTJOIN", "MAPJOIN")
+
+    def resolver: Resolver = conf.resolver
+
+    private def applyBroadcastHint(plan: LogicalPlan, toBroadcast: Set[String]): LogicalPlan = {
+      // Whether to continue recursing down the tree
+      var recurse = true
+
+      val newNode = CurrentOrigin.withOrigin(plan.origin) {
+        plan match {
+          case r: UnresolvedRelation =>
+            val alias = r.alias.getOrElse(r.tableIdentifier.table)
+            if (toBroadcast.exists(resolver(_, alias))) BroadcastHint(plan) else plan
+
+          case r: SubqueryAlias if toBroadcast.exists(resolver(_, r.alias)) =>
+            BroadcastHint(plan)
+
+          case _: BroadcastHint | _: View | _: With | _: SubqueryAlias =>
+            // Don't traverse down these nodes.
+            // For an existing broadcast hint, there is no point going down (if we do, we either
+            // won't change the structure, or will introduce another broadcast hint that is useless.
+            // The rest (view, with, subquery) indicates different scopes that we shouldn't traverse
+            // down. Note that technically when this rule is executed, we haven't completed view
+            // resolution yet and as a result the view part should be deadcode. I'm leaving it here
+            // to be more future proof in case we change the view we do view resolution.
+            recurse = false
+            plan
+
+          case _ =>
+            plan
+        }
+      }
+
+      if ((plan fastEquals newNode) && recurse) {
+        newNode.mapChildren(child => applyBroadcastHint(child, toBroadcast))
+      } else {
+        newNode
+      }
+    }
+
+    def apply(plan: LogicalPlan): LogicalPlan = plan transformUp {
+      case h: Hint if BROADCAST_HINT_NAMES.contains(h.name.toUpperCase) =>
+        applyBroadcastHint(h.child, h.parameters.toSet)
+    }
+  }
+
+  /**
+   * Removes all the hints, used to remove invalid hints provided by the user.
+   * This must be executed after all the other hint rules are executed.
+   */
+  object RemoveAllHints extends Rule[LogicalPlan] {
+    def apply(plan: LogicalPlan): LogicalPlan = plan transformUp {
+      case h: Hint => h.child
+    }
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/spark/blob/733c59ec/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/SubstituteHints.scala
----------------------------------------------------------------------
diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/SubstituteHints.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/SubstituteHints.scala
deleted file mode 100644
index fda4d1b..0000000
--- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/SubstituteHints.scala
+++ /dev/null
@@ -1,104 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.spark.sql.catalyst.analysis
-
-import org.apache.spark.sql.catalyst.CatalystConf
-import org.apache.spark.sql.catalyst.plans.logical._
-import org.apache.spark.sql.catalyst.rules.Rule
-import org.apache.spark.sql.catalyst.trees.CurrentOrigin
-
-
-/**
- * Collection of rules related to hints. The only hint currently available is broadcast join hint.
- *
- * Note that this is separatedly into two rules because in the future we might introduce new hint
- * rules that have different ordering requirements from broadcast.
- */
-object SubstituteHints {
-
-  /**
-   * Substitute Hints.
-   *
-   * The only hint currently available is broadcast join hint.
-   *
-   * For broadcast hint, we accept "BROADCAST", "BROADCASTJOIN", and "MAPJOIN", and a sequence of
-   * relation aliases can be specified in the hint. A broadcast hint plan node will be inserted
-   * on top of any relation (that is not aliased differently), subquery, or common table expression
-   * that match the specified name.
-   *
-   * The hint resolution works by recursively traversing down the query plan to find a relation or
-   * subquery that matches one of the specified broadcast aliases. The traversal does not go past
-   * beyond any existing broadcast hints, subquery aliases.
-   *
-   * This rule must happen before common table expressions.
-   */
-  class SubstituteBroadcastHints(conf: CatalystConf) extends Rule[LogicalPlan] {
-    private val BROADCAST_HINT_NAMES = Set("BROADCAST", "BROADCASTJOIN", "MAPJOIN")
-
-    def resolver: Resolver = conf.resolver
-
-    private def applyBroadcastHint(plan: LogicalPlan, toBroadcast: Set[String]): LogicalPlan = {
-      // Whether to continue recursing down the tree
-      var recurse = true
-
-      val newNode = CurrentOrigin.withOrigin(plan.origin) {
-        plan match {
-          case r: UnresolvedRelation =>
-            val alias = r.alias.getOrElse(r.tableIdentifier.table)
-            if (toBroadcast.exists(resolver(_, alias))) BroadcastHint(plan) else plan
-          case r: SubqueryAlias =>
-            if (toBroadcast.exists(resolver(_, r.alias))) {
-              BroadcastHint(plan)
-            } else {
-              // Don't recurse down subquery aliases if there are no match.
-              recurse = false
-              plan
-            }
-          case _: BroadcastHint =>
-            // Found a broadcast hint; don't change the plan but also don't recurse down.
-            recurse = false
-            plan
-          case _ =>
-            plan
-        }
-      }
-
-      if ((plan fastEquals newNode) && recurse) {
-        newNode.mapChildren(child => applyBroadcastHint(child, toBroadcast))
-      } else {
-        newNode
-      }
-    }
-
-    def apply(plan: LogicalPlan): LogicalPlan = plan transformUp {
-      case h: Hint if BROADCAST_HINT_NAMES.contains(h.name.toUpperCase) =>
-        applyBroadcastHint(h.child, h.parameters.toSet)
-    }
-  }
-
-  /**
-   * Removes all the hints, used to remove invalid hints provided by the user.
-   * This must be executed after all the other hint rules are executed.
-   */
-  object RemoveAllHints extends Rule[LogicalPlan] {
-    def apply(plan: LogicalPlan): LogicalPlan = plan transformUp {
-      case h: Hint => h.child
-    }
-  }
-
-}

http://git-wip-us.apache.org/repos/asf/spark/blob/733c59ec/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/analysis/ResolveHintsSuite.scala
----------------------------------------------------------------------
diff --git a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/analysis/ResolveHintsSuite.scala b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/analysis/ResolveHintsSuite.scala
new file mode 100644
index 0000000..d101e22
--- /dev/null
+++ b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/analysis/ResolveHintsSuite.scala
@@ -0,0 +1,120 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.catalyst.analysis
+
+import org.apache.spark.sql.catalyst.dsl.expressions._
+import org.apache.spark.sql.catalyst.dsl.plans._
+import org.apache.spark.sql.catalyst.parser.CatalystSqlParser
+import org.apache.spark.sql.catalyst.plans.Inner
+import org.apache.spark.sql.catalyst.plans.logical._
+
+class ResolveHintsSuite extends AnalysisTest {
+  import org.apache.spark.sql.catalyst.analysis.TestRelations._
+
+  test("invalid hints should be ignored") {
+    checkAnalysis(
+      Hint("some_random_hint_that_does_not_exist", Seq("TaBlE"), table("TaBlE")),
+      testRelation,
+      caseSensitive = false)
+  }
+
+  test("case-sensitive or insensitive parameters") {
+    checkAnalysis(
+      Hint("MAPJOIN", Seq("TaBlE"), table("TaBlE")),
+      BroadcastHint(testRelation),
+      caseSensitive = false)
+
+    checkAnalysis(
+      Hint("MAPJOIN", Seq("table"), table("TaBlE")),
+      BroadcastHint(testRelation),
+      caseSensitive = false)
+
+    checkAnalysis(
+      Hint("MAPJOIN", Seq("TaBlE"), table("TaBlE")),
+      BroadcastHint(testRelation),
+      caseSensitive = true)
+
+    checkAnalysis(
+      Hint("MAPJOIN", Seq("table"), table("TaBlE")),
+      testRelation,
+      caseSensitive = true)
+  }
+
+  test("multiple broadcast hint aliases") {
+    checkAnalysis(
+      Hint("MAPJOIN", Seq("table", "table2"), table("table").join(table("table2"))),
+      Join(BroadcastHint(testRelation), BroadcastHint(testRelation2), Inner, None),
+      caseSensitive = false)
+  }
+
+  test("do not traverse past existing broadcast hints") {
+    checkAnalysis(
+      Hint("MAPJOIN", Seq("table"), BroadcastHint(table("table").where('a > 1))),
+      BroadcastHint(testRelation.where('a > 1)).analyze,
+      caseSensitive = false)
+  }
+
+  test("should work for subqueries") {
+    checkAnalysis(
+      Hint("MAPJOIN", Seq("tableAlias"), table("table").as("tableAlias")),
+      BroadcastHint(testRelation),
+      caseSensitive = false)
+
+    checkAnalysis(
+      Hint("MAPJOIN", Seq("tableAlias"), table("table").subquery('tableAlias)),
+      BroadcastHint(testRelation),
+      caseSensitive = false)
+
+    // Negative case: if the alias doesn't match, don't match the original table name.
+    checkAnalysis(
+      Hint("MAPJOIN", Seq("table"), table("table").as("tableAlias")),
+      testRelation,
+      caseSensitive = false)
+  }
+
+  test("do not traverse past subquery alias") {
+    checkAnalysis(
+      Hint("MAPJOIN", Seq("table"), table("table").where('a > 1).subquery('tableAlias)),
+      testRelation.where('a > 1).analyze,
+      caseSensitive = false)
+  }
+
+  test("should work for CTE") {
+    checkAnalysis(
+      CatalystSqlParser.parsePlan(
+        """
+          |WITH ctetable AS (SELECT * FROM table WHERE a > 1)
+          |SELECT /*+ BROADCAST(ctetable) */ * FROM ctetable
+        """.stripMargin
+      ),
+      BroadcastHint(testRelation.where('a > 1).select('a)).select('a).analyze,
+      caseSensitive = false)
+  }
+
+  test("should not traverse down CTE") {
+    checkAnalysis(
+      CatalystSqlParser.parsePlan(
+        """
+          |WITH ctetable AS (SELECT * FROM table WHERE a > 1)
+          |SELECT /*+ BROADCAST(table) */ * FROM ctetable
+        """.stripMargin
+      ),
+      testRelation.where('a > 1).select('a).select('a).analyze,
+      caseSensitive = false)
+  }
+}

http://git-wip-us.apache.org/repos/asf/spark/blob/733c59ec/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/analysis/SubstituteHintsSuite.scala
----------------------------------------------------------------------
diff --git a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/analysis/SubstituteHintsSuite.scala b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/analysis/SubstituteHintsSuite.scala
deleted file mode 100644
index 9d671f3..0000000
--- a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/analysis/SubstituteHintsSuite.scala
+++ /dev/null
@@ -1,121 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.spark.sql.catalyst.analysis
-
-import org.apache.spark.sql.catalyst.TableIdentifier
-import org.apache.spark.sql.catalyst.dsl.expressions._
-import org.apache.spark.sql.catalyst.dsl.plans._
-import org.apache.spark.sql.catalyst.parser.CatalystSqlParser
-import org.apache.spark.sql.catalyst.plans.Inner
-import org.apache.spark.sql.catalyst.plans.logical._
-
-class SubstituteHintsSuite extends AnalysisTest {
-  import org.apache.spark.sql.catalyst.analysis.TestRelations._
-
-  test("invalid hints should be ignored") {
-    checkAnalysis(
-      Hint("some_random_hint_that_does_not_exist", Seq("TaBlE"), table("TaBlE")),
-      testRelation,
-      caseSensitive = false)
-  }
-
-  test("case-sensitive or insensitive parameters") {
-    checkAnalysis(
-      Hint("MAPJOIN", Seq("TaBlE"), table("TaBlE")),
-      BroadcastHint(testRelation),
-      caseSensitive = false)
-
-    checkAnalysis(
-      Hint("MAPJOIN", Seq("table"), table("TaBlE")),
-      BroadcastHint(testRelation),
-      caseSensitive = false)
-
-    checkAnalysis(
-      Hint("MAPJOIN", Seq("TaBlE"), table("TaBlE")),
-      BroadcastHint(testRelation),
-      caseSensitive = true)
-
-    checkAnalysis(
-      Hint("MAPJOIN", Seq("table"), table("TaBlE")),
-      testRelation,
-      caseSensitive = true)
-  }
-
-  test("multiple broadcast hint aliases") {
-    checkAnalysis(
-      Hint("MAPJOIN", Seq("table", "table2"), table("table").join(table("table2"))),
-      Join(BroadcastHint(testRelation), BroadcastHint(testRelation2), Inner, None),
-      caseSensitive = false)
-  }
-
-  test("do not traverse past existing broadcast hints") {
-    checkAnalysis(
-      Hint("MAPJOIN", Seq("table"), BroadcastHint(table("table").where('a > 1))),
-      BroadcastHint(testRelation.where('a > 1)).analyze,
-      caseSensitive = false)
-  }
-
-  test("should work for subqueries") {
-    checkAnalysis(
-      Hint("MAPJOIN", Seq("tableAlias"), table("table").as("tableAlias")),
-      BroadcastHint(testRelation),
-      caseSensitive = false)
-
-    checkAnalysis(
-      Hint("MAPJOIN", Seq("tableAlias"), table("table").subquery('tableAlias)),
-      BroadcastHint(testRelation),
-      caseSensitive = false)
-
-    // Negative case: if the alias doesn't match, don't match the original table name.
-    checkAnalysis(
-      Hint("MAPJOIN", Seq("table"), table("table").as("tableAlias")),
-      testRelation,
-      caseSensitive = false)
-  }
-
-  test("do not traverse past subquery alias") {
-    checkAnalysis(
-      Hint("MAPJOIN", Seq("table"), table("table").where('a > 1).subquery('tableAlias)),
-      testRelation.where('a > 1).analyze,
-      caseSensitive = false)
-  }
-
-  test("should work for CTE") {
-    checkAnalysis(
-      CatalystSqlParser.parsePlan(
-        """
-          |WITH ctetable AS (SELECT * FROM table WHERE a > 1)
-          |SELECT /*+ BROADCAST(ctetable) */ * FROM ctetable
-        """.stripMargin
-      ),
-      BroadcastHint(testRelation.where('a > 1).select('a)).select('a).analyze,
-      caseSensitive = false)
-  }
-
-  test("should not traverse down CTE") {
-    checkAnalysis(
-      CatalystSqlParser.parsePlan(
-        """
-          |WITH ctetable AS (SELECT * FROM table WHERE a > 1)
-          |SELECT /*+ BROADCAST(table) */ * FROM ctetable
-        """.stripMargin
-      ),
-      testRelation.where('a > 1).select('a).select('a).analyze,
-      caseSensitive = false)
-  }
-}


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@spark.apache.org
For additional commands, e-mail: commits-help@spark.apache.org