You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@spark.apache.org by hv...@apache.org on 2017/02/15 16:10:55 UTC
spark git commit: [SPARK-16475][SQL] broadcast hint for SQL queries -
follow up
Repository: spark
Updated Branches:
refs/heads/master b55563c17 -> 733c59ec1
[SPARK-16475][SQL] broadcast hint for SQL queries - follow up
## What changes were proposed in this pull request?
A small update to https://github.com/apache/spark/pull/16925
1. Rename SubstituteHints -> ResolveHints to be more consistent with rest of the rules.
2. Added more documentation in the rule and be more defensive / future proof to skip views as well as CTEs.
## How was this patch tested?
This pull request contains no real logic change and all behavior should be covered by existing tests.
Author: Reynold Xin <rx...@databricks.com>
Closes #16939 from rxin/SPARK-16475.
Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/733c59ec
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/733c59ec
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/733c59ec
Branch: refs/heads/master
Commit: 733c59ec1ee5746c322e68459cd06241f5fa0903
Parents: b55563c
Author: Reynold Xin <rx...@databricks.com>
Authored: Wed Feb 15 17:10:49 2017 +0100
Committer: Herman van Hovell <hv...@databricks.com>
Committed: Wed Feb 15 17:10:49 2017 +0100
----------------------------------------------------------------------
.../spark/sql/catalyst/analysis/Analyzer.scala | 4 +-
.../sql/catalyst/analysis/ResolveHints.scala | 103 ++++++++++++++++
.../sql/catalyst/analysis/SubstituteHints.scala | 104 ----------------
.../catalyst/analysis/ResolveHintsSuite.scala | 120 ++++++++++++++++++
.../analysis/SubstituteHintsSuite.scala | 121 -------------------
5 files changed, 225 insertions(+), 227 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/spark/blob/733c59ec/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala
----------------------------------------------------------------------
diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala
index 8348cb5..6aa0e8d 100644
--- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala
+++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala
@@ -115,8 +115,8 @@ class Analyzer(
lazy val batches: Seq[Batch] = Seq(
Batch("Hints", fixedPoint,
- new SubstituteHints.SubstituteBroadcastHints(conf),
- SubstituteHints.RemoveAllHints),
+ new ResolveHints.ResolveBroadcastHints(conf),
+ ResolveHints.RemoveAllHints),
Batch("Substitution", fixedPoint,
CTESubstitution,
WindowsSubstitution,
http://git-wip-us.apache.org/repos/asf/spark/blob/733c59ec/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/ResolveHints.scala
----------------------------------------------------------------------
diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/ResolveHints.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/ResolveHints.scala
new file mode 100644
index 0000000..2124177
--- /dev/null
+++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/ResolveHints.scala
@@ -0,0 +1,103 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.catalyst.analysis
+
+import org.apache.spark.sql.catalyst.CatalystConf
+import org.apache.spark.sql.catalyst.plans.logical._
+import org.apache.spark.sql.catalyst.rules.Rule
+import org.apache.spark.sql.catalyst.trees.CurrentOrigin
+
+
+/**
+ * Collection of rules related to hints. The only hint currently available is broadcast join hint.
+ *
+ * Note that this is separatedly into two rules because in the future we might introduce new hint
+ * rules that have different ordering requirements from broadcast.
+ */
+object ResolveHints {
+
+ /**
+ * For broadcast hint, we accept "BROADCAST", "BROADCASTJOIN", and "MAPJOIN", and a sequence of
+ * relation aliases can be specified in the hint. A broadcast hint plan node will be inserted
+ * on top of any relation (that is not aliased differently), subquery, or common table expression
+ * that match the specified name.
+ *
+ * The hint resolution works by recursively traversing down the query plan to find a relation or
+ * subquery that matches one of the specified broadcast aliases. The traversal does not go past
+ * beyond any existing broadcast hints, subquery aliases.
+ *
+ * This rule must happen before common table expressions.
+ */
+ class ResolveBroadcastHints(conf: CatalystConf) extends Rule[LogicalPlan] {
+ private val BROADCAST_HINT_NAMES = Set("BROADCAST", "BROADCASTJOIN", "MAPJOIN")
+
+ def resolver: Resolver = conf.resolver
+
+ private def applyBroadcastHint(plan: LogicalPlan, toBroadcast: Set[String]): LogicalPlan = {
+ // Whether to continue recursing down the tree
+ var recurse = true
+
+ val newNode = CurrentOrigin.withOrigin(plan.origin) {
+ plan match {
+ case r: UnresolvedRelation =>
+ val alias = r.alias.getOrElse(r.tableIdentifier.table)
+ if (toBroadcast.exists(resolver(_, alias))) BroadcastHint(plan) else plan
+
+ case r: SubqueryAlias if toBroadcast.exists(resolver(_, r.alias)) =>
+ BroadcastHint(plan)
+
+ case _: BroadcastHint | _: View | _: With | _: SubqueryAlias =>
+ // Don't traverse down these nodes.
+ // For an existing broadcast hint, there is no point going down (if we do, we either
+ // won't change the structure, or will introduce another broadcast hint that is useless.
+ // The rest (view, with, subquery) indicates different scopes that we shouldn't traverse
+ // down. Note that technically when this rule is executed, we haven't completed view
+ // resolution yet and as a result the view part should be deadcode. I'm leaving it here
+ // to be more future proof in case we change the view we do view resolution.
+ recurse = false
+ plan
+
+ case _ =>
+ plan
+ }
+ }
+
+ if ((plan fastEquals newNode) && recurse) {
+ newNode.mapChildren(child => applyBroadcastHint(child, toBroadcast))
+ } else {
+ newNode
+ }
+ }
+
+ def apply(plan: LogicalPlan): LogicalPlan = plan transformUp {
+ case h: Hint if BROADCAST_HINT_NAMES.contains(h.name.toUpperCase) =>
+ applyBroadcastHint(h.child, h.parameters.toSet)
+ }
+ }
+
+ /**
+ * Removes all the hints, used to remove invalid hints provided by the user.
+ * This must be executed after all the other hint rules are executed.
+ */
+ object RemoveAllHints extends Rule[LogicalPlan] {
+ def apply(plan: LogicalPlan): LogicalPlan = plan transformUp {
+ case h: Hint => h.child
+ }
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/spark/blob/733c59ec/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/SubstituteHints.scala
----------------------------------------------------------------------
diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/SubstituteHints.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/SubstituteHints.scala
deleted file mode 100644
index fda4d1b..0000000
--- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/SubstituteHints.scala
+++ /dev/null
@@ -1,104 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.spark.sql.catalyst.analysis
-
-import org.apache.spark.sql.catalyst.CatalystConf
-import org.apache.spark.sql.catalyst.plans.logical._
-import org.apache.spark.sql.catalyst.rules.Rule
-import org.apache.spark.sql.catalyst.trees.CurrentOrigin
-
-
-/**
- * Collection of rules related to hints. The only hint currently available is broadcast join hint.
- *
- * Note that this is separatedly into two rules because in the future we might introduce new hint
- * rules that have different ordering requirements from broadcast.
- */
-object SubstituteHints {
-
- /**
- * Substitute Hints.
- *
- * The only hint currently available is broadcast join hint.
- *
- * For broadcast hint, we accept "BROADCAST", "BROADCASTJOIN", and "MAPJOIN", and a sequence of
- * relation aliases can be specified in the hint. A broadcast hint plan node will be inserted
- * on top of any relation (that is not aliased differently), subquery, or common table expression
- * that match the specified name.
- *
- * The hint resolution works by recursively traversing down the query plan to find a relation or
- * subquery that matches one of the specified broadcast aliases. The traversal does not go past
- * beyond any existing broadcast hints, subquery aliases.
- *
- * This rule must happen before common table expressions.
- */
- class SubstituteBroadcastHints(conf: CatalystConf) extends Rule[LogicalPlan] {
- private val BROADCAST_HINT_NAMES = Set("BROADCAST", "BROADCASTJOIN", "MAPJOIN")
-
- def resolver: Resolver = conf.resolver
-
- private def applyBroadcastHint(plan: LogicalPlan, toBroadcast: Set[String]): LogicalPlan = {
- // Whether to continue recursing down the tree
- var recurse = true
-
- val newNode = CurrentOrigin.withOrigin(plan.origin) {
- plan match {
- case r: UnresolvedRelation =>
- val alias = r.alias.getOrElse(r.tableIdentifier.table)
- if (toBroadcast.exists(resolver(_, alias))) BroadcastHint(plan) else plan
- case r: SubqueryAlias =>
- if (toBroadcast.exists(resolver(_, r.alias))) {
- BroadcastHint(plan)
- } else {
- // Don't recurse down subquery aliases if there are no match.
- recurse = false
- plan
- }
- case _: BroadcastHint =>
- // Found a broadcast hint; don't change the plan but also don't recurse down.
- recurse = false
- plan
- case _ =>
- plan
- }
- }
-
- if ((plan fastEquals newNode) && recurse) {
- newNode.mapChildren(child => applyBroadcastHint(child, toBroadcast))
- } else {
- newNode
- }
- }
-
- def apply(plan: LogicalPlan): LogicalPlan = plan transformUp {
- case h: Hint if BROADCAST_HINT_NAMES.contains(h.name.toUpperCase) =>
- applyBroadcastHint(h.child, h.parameters.toSet)
- }
- }
-
- /**
- * Removes all the hints, used to remove invalid hints provided by the user.
- * This must be executed after all the other hint rules are executed.
- */
- object RemoveAllHints extends Rule[LogicalPlan] {
- def apply(plan: LogicalPlan): LogicalPlan = plan transformUp {
- case h: Hint => h.child
- }
- }
-
-}
http://git-wip-us.apache.org/repos/asf/spark/blob/733c59ec/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/analysis/ResolveHintsSuite.scala
----------------------------------------------------------------------
diff --git a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/analysis/ResolveHintsSuite.scala b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/analysis/ResolveHintsSuite.scala
new file mode 100644
index 0000000..d101e22
--- /dev/null
+++ b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/analysis/ResolveHintsSuite.scala
@@ -0,0 +1,120 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.catalyst.analysis
+
+import org.apache.spark.sql.catalyst.dsl.expressions._
+import org.apache.spark.sql.catalyst.dsl.plans._
+import org.apache.spark.sql.catalyst.parser.CatalystSqlParser
+import org.apache.spark.sql.catalyst.plans.Inner
+import org.apache.spark.sql.catalyst.plans.logical._
+
+class ResolveHintsSuite extends AnalysisTest {
+ import org.apache.spark.sql.catalyst.analysis.TestRelations._
+
+ test("invalid hints should be ignored") {
+ checkAnalysis(
+ Hint("some_random_hint_that_does_not_exist", Seq("TaBlE"), table("TaBlE")),
+ testRelation,
+ caseSensitive = false)
+ }
+
+ test("case-sensitive or insensitive parameters") {
+ checkAnalysis(
+ Hint("MAPJOIN", Seq("TaBlE"), table("TaBlE")),
+ BroadcastHint(testRelation),
+ caseSensitive = false)
+
+ checkAnalysis(
+ Hint("MAPJOIN", Seq("table"), table("TaBlE")),
+ BroadcastHint(testRelation),
+ caseSensitive = false)
+
+ checkAnalysis(
+ Hint("MAPJOIN", Seq("TaBlE"), table("TaBlE")),
+ BroadcastHint(testRelation),
+ caseSensitive = true)
+
+ checkAnalysis(
+ Hint("MAPJOIN", Seq("table"), table("TaBlE")),
+ testRelation,
+ caseSensitive = true)
+ }
+
+ test("multiple broadcast hint aliases") {
+ checkAnalysis(
+ Hint("MAPJOIN", Seq("table", "table2"), table("table").join(table("table2"))),
+ Join(BroadcastHint(testRelation), BroadcastHint(testRelation2), Inner, None),
+ caseSensitive = false)
+ }
+
+ test("do not traverse past existing broadcast hints") {
+ checkAnalysis(
+ Hint("MAPJOIN", Seq("table"), BroadcastHint(table("table").where('a > 1))),
+ BroadcastHint(testRelation.where('a > 1)).analyze,
+ caseSensitive = false)
+ }
+
+ test("should work for subqueries") {
+ checkAnalysis(
+ Hint("MAPJOIN", Seq("tableAlias"), table("table").as("tableAlias")),
+ BroadcastHint(testRelation),
+ caseSensitive = false)
+
+ checkAnalysis(
+ Hint("MAPJOIN", Seq("tableAlias"), table("table").subquery('tableAlias)),
+ BroadcastHint(testRelation),
+ caseSensitive = false)
+
+ // Negative case: if the alias doesn't match, don't match the original table name.
+ checkAnalysis(
+ Hint("MAPJOIN", Seq("table"), table("table").as("tableAlias")),
+ testRelation,
+ caseSensitive = false)
+ }
+
+ test("do not traverse past subquery alias") {
+ checkAnalysis(
+ Hint("MAPJOIN", Seq("table"), table("table").where('a > 1).subquery('tableAlias)),
+ testRelation.where('a > 1).analyze,
+ caseSensitive = false)
+ }
+
+ test("should work for CTE") {
+ checkAnalysis(
+ CatalystSqlParser.parsePlan(
+ """
+ |WITH ctetable AS (SELECT * FROM table WHERE a > 1)
+ |SELECT /*+ BROADCAST(ctetable) */ * FROM ctetable
+ """.stripMargin
+ ),
+ BroadcastHint(testRelation.where('a > 1).select('a)).select('a).analyze,
+ caseSensitive = false)
+ }
+
+ test("should not traverse down CTE") {
+ checkAnalysis(
+ CatalystSqlParser.parsePlan(
+ """
+ |WITH ctetable AS (SELECT * FROM table WHERE a > 1)
+ |SELECT /*+ BROADCAST(table) */ * FROM ctetable
+ """.stripMargin
+ ),
+ testRelation.where('a > 1).select('a).select('a).analyze,
+ caseSensitive = false)
+ }
+}
http://git-wip-us.apache.org/repos/asf/spark/blob/733c59ec/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/analysis/SubstituteHintsSuite.scala
----------------------------------------------------------------------
diff --git a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/analysis/SubstituteHintsSuite.scala b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/analysis/SubstituteHintsSuite.scala
deleted file mode 100644
index 9d671f3..0000000
--- a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/analysis/SubstituteHintsSuite.scala
+++ /dev/null
@@ -1,121 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.spark.sql.catalyst.analysis
-
-import org.apache.spark.sql.catalyst.TableIdentifier
-import org.apache.spark.sql.catalyst.dsl.expressions._
-import org.apache.spark.sql.catalyst.dsl.plans._
-import org.apache.spark.sql.catalyst.parser.CatalystSqlParser
-import org.apache.spark.sql.catalyst.plans.Inner
-import org.apache.spark.sql.catalyst.plans.logical._
-
-class SubstituteHintsSuite extends AnalysisTest {
- import org.apache.spark.sql.catalyst.analysis.TestRelations._
-
- test("invalid hints should be ignored") {
- checkAnalysis(
- Hint("some_random_hint_that_does_not_exist", Seq("TaBlE"), table("TaBlE")),
- testRelation,
- caseSensitive = false)
- }
-
- test("case-sensitive or insensitive parameters") {
- checkAnalysis(
- Hint("MAPJOIN", Seq("TaBlE"), table("TaBlE")),
- BroadcastHint(testRelation),
- caseSensitive = false)
-
- checkAnalysis(
- Hint("MAPJOIN", Seq("table"), table("TaBlE")),
- BroadcastHint(testRelation),
- caseSensitive = false)
-
- checkAnalysis(
- Hint("MAPJOIN", Seq("TaBlE"), table("TaBlE")),
- BroadcastHint(testRelation),
- caseSensitive = true)
-
- checkAnalysis(
- Hint("MAPJOIN", Seq("table"), table("TaBlE")),
- testRelation,
- caseSensitive = true)
- }
-
- test("multiple broadcast hint aliases") {
- checkAnalysis(
- Hint("MAPJOIN", Seq("table", "table2"), table("table").join(table("table2"))),
- Join(BroadcastHint(testRelation), BroadcastHint(testRelation2), Inner, None),
- caseSensitive = false)
- }
-
- test("do not traverse past existing broadcast hints") {
- checkAnalysis(
- Hint("MAPJOIN", Seq("table"), BroadcastHint(table("table").where('a > 1))),
- BroadcastHint(testRelation.where('a > 1)).analyze,
- caseSensitive = false)
- }
-
- test("should work for subqueries") {
- checkAnalysis(
- Hint("MAPJOIN", Seq("tableAlias"), table("table").as("tableAlias")),
- BroadcastHint(testRelation),
- caseSensitive = false)
-
- checkAnalysis(
- Hint("MAPJOIN", Seq("tableAlias"), table("table").subquery('tableAlias)),
- BroadcastHint(testRelation),
- caseSensitive = false)
-
- // Negative case: if the alias doesn't match, don't match the original table name.
- checkAnalysis(
- Hint("MAPJOIN", Seq("table"), table("table").as("tableAlias")),
- testRelation,
- caseSensitive = false)
- }
-
- test("do not traverse past subquery alias") {
- checkAnalysis(
- Hint("MAPJOIN", Seq("table"), table("table").where('a > 1).subquery('tableAlias)),
- testRelation.where('a > 1).analyze,
- caseSensitive = false)
- }
-
- test("should work for CTE") {
- checkAnalysis(
- CatalystSqlParser.parsePlan(
- """
- |WITH ctetable AS (SELECT * FROM table WHERE a > 1)
- |SELECT /*+ BROADCAST(ctetable) */ * FROM ctetable
- """.stripMargin
- ),
- BroadcastHint(testRelation.where('a > 1).select('a)).select('a).analyze,
- caseSensitive = false)
- }
-
- test("should not traverse down CTE") {
- checkAnalysis(
- CatalystSqlParser.parsePlan(
- """
- |WITH ctetable AS (SELECT * FROM table WHERE a > 1)
- |SELECT /*+ BROADCAST(table) */ * FROM ctetable
- """.stripMargin
- ),
- testRelation.where('a > 1).select('a).select('a).analyze,
- caseSensitive = false)
- }
-}
---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@spark.apache.org
For additional commands, e-mail: commits-help@spark.apache.org