You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@spark.apache.org by li...@apache.org on 2017/10/28 01:57:12 UTC

spark git commit: [SPARK-22181][SQL] Adds ReplaceExceptWithFilter rule

Repository: spark
Updated Branches:
  refs/heads/master 20eb95e5e -> 01f6ba0e7


[SPARK-22181][SQL] Adds ReplaceExceptWithFilter rule

## What changes were proposed in this pull request?

Adds a new optimisation rule 'ReplaceExceptWithNotFilter' that replaces Except logical with Filter operator and schedule it before applying 'ReplaceExceptWithAntiJoin' rule. This way we can avoid expensive join operation if one or both of the datasets of the Except operation are fully derived out of Filters from a same parent.

## How was this patch tested?

The patch is tested locally using spark-shell + unit test.

Author: Sathiya <sa...@polytechnique.edu>

Closes #19451 from sathiyapk/SPARK-22181-optimize-exceptWithFilter.


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/01f6ba0e
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/01f6ba0e
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/01f6ba0e

Branch: refs/heads/master
Commit: 01f6ba0e7a12ef818d56e7d5b1bd889b79f2b57c
Parents: 20eb95e
Author: Sathiya <sa...@polytechnique.edu>
Authored: Fri Oct 27 18:57:08 2017 -0700
Committer: gatorsmile <ga...@gmail.com>
Committed: Fri Oct 27 18:57:08 2017 -0700

----------------------------------------------------------------------
 .../sql/catalyst/expressions/subquery.scala     |  10 ++
 .../sql/catalyst/optimizer/Optimizer.scala      |   1 +
 .../optimizer/ReplaceExceptWithFilter.scala     | 101 ++++++++++++++++++
 .../org/apache/spark/sql/internal/SQLConf.scala |  15 +++
 .../optimizer/ReplaceOperatorSuite.scala        | 106 ++++++++++++++++++-
 .../test/resources/sql-tests/inputs/except.sql  |  57 ++++++++++
 .../resources/sql-tests/results/except.sql.out  | 105 ++++++++++++++++++
 7 files changed, 394 insertions(+), 1 deletion(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/spark/blob/01f6ba0e/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/subquery.scala
----------------------------------------------------------------------
diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/subquery.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/subquery.scala
index c614604..6acc87a 100644
--- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/subquery.scala
+++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/subquery.scala
@@ -89,6 +89,16 @@ object SubqueryExpression {
       case _ => false
     }.isDefined
   }
+
+  /**
+   * Returns true when an expression contains a subquery
+   */
+  def hasSubquery(e: Expression): Boolean = {
+    e.find {
+      case _: SubqueryExpression => true
+      case _ => false
+    }.isDefined
+  }
 }
 
 object SubExprUtils extends PredicateHelper {

http://git-wip-us.apache.org/repos/asf/spark/blob/01f6ba0e/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/Optimizer.scala
----------------------------------------------------------------------
diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/Optimizer.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/Optimizer.scala
index d829e01..3273a61 100644
--- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/Optimizer.scala
+++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/Optimizer.scala
@@ -76,6 +76,7 @@ abstract class Optimizer(sessionCatalog: SessionCatalog)
       OptimizeSubqueries) ::
     Batch("Replace Operators", fixedPoint,
       ReplaceIntersectWithSemiJoin,
+      ReplaceExceptWithFilter,
       ReplaceExceptWithAntiJoin,
       ReplaceDistinctWithAggregate) ::
     Batch("Aggregate", fixedPoint,

http://git-wip-us.apache.org/repos/asf/spark/blob/01f6ba0e/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/ReplaceExceptWithFilter.scala
----------------------------------------------------------------------
diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/ReplaceExceptWithFilter.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/ReplaceExceptWithFilter.scala
new file mode 100644
index 0000000..89bfcee
--- /dev/null
+++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/ReplaceExceptWithFilter.scala
@@ -0,0 +1,101 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.catalyst.optimizer
+
+import scala.annotation.tailrec
+
+import org.apache.spark.sql.catalyst.expressions._
+import org.apache.spark.sql.catalyst.plans.logical._
+import org.apache.spark.sql.catalyst.rules.Rule
+
+
+/**
+ * If one or both of the datasets in the logical [[Except]] operator are purely transformed using
+ * [[Filter]], this rule will replace logical [[Except]] operator with a [[Filter]] operator by
+ * flipping the filter condition of the right child.
+ * {{{
+ *   SELECT a1, a2 FROM Tab1 WHERE a2 = 12 EXCEPT SELECT a1, a2 FROM Tab1 WHERE a1 = 5
+ *   ==>  SELECT DISTINCT a1, a2 FROM Tab1 WHERE a2 = 12 AND (a1 is null OR a1 <> 5)
+ * }}}
+ *
+ * Note:
+ * Before flipping the filter condition of the right node, we should:
+ * 1. Combine all it's [[Filter]].
+ * 2. Apply InferFiltersFromConstraints rule (to take into account of NULL values in the condition).
+ */
+object ReplaceExceptWithFilter extends Rule[LogicalPlan] {
+
+  def apply(plan: LogicalPlan): LogicalPlan = {
+    if (!plan.conf.replaceExceptWithFilter) {
+      return plan
+    }
+
+    plan.transform {
+      case Except(left, right) if isEligible(left, right) =>
+        Distinct(Filter(Not(transformCondition(left, skipProject(right))), left))
+    }
+  }
+
+  private def transformCondition(left: LogicalPlan, right: LogicalPlan): Expression = {
+    val filterCondition =
+      InferFiltersFromConstraints(combineFilters(right)).asInstanceOf[Filter].condition
+
+    val attributeNameMap: Map[String, Attribute] = left.output.map(x => (x.name, x)).toMap
+
+    filterCondition.transform { case a : AttributeReference => attributeNameMap(a.name) }
+  }
+
+  // TODO: This can be further extended in the future.
+  private def isEligible(left: LogicalPlan, right: LogicalPlan): Boolean = (left, right) match {
+    case (_, right @ (Project(_, _: Filter) | Filter(_, _))) => verifyConditions(left, right)
+    case _ => false
+  }
+
+  private def verifyConditions(left: LogicalPlan, right: LogicalPlan): Boolean = {
+    val leftProjectList = projectList(left)
+    val rightProjectList = projectList(right)
+
+    left.output.size == left.output.map(_.name).distinct.size &&
+      left.find(_.expressions.exists(SubqueryExpression.hasSubquery)).isEmpty &&
+        right.find(_.expressions.exists(SubqueryExpression.hasSubquery)).isEmpty &&
+          Project(leftProjectList, nonFilterChild(skipProject(left))).sameResult(
+            Project(rightProjectList, nonFilterChild(skipProject(right))))
+  }
+
+  private def projectList(node: LogicalPlan): Seq[NamedExpression] = node match {
+    case p: Project => p.projectList
+    case x => x.output
+  }
+
+  private def skipProject(node: LogicalPlan): LogicalPlan = node match {
+    case p: Project => p.child
+    case x => x
+  }
+
+  private def nonFilterChild(plan: LogicalPlan) = plan.find(!_.isInstanceOf[Filter]).getOrElse {
+    throw new IllegalStateException("Leaf node is expected")
+  }
+
+  private def combineFilters(plan: LogicalPlan): LogicalPlan = {
+    @tailrec
+    def iterate(plan: LogicalPlan, acc: LogicalPlan): LogicalPlan = {
+      if (acc.fastEquals(plan)) acc else iterate(acc, CombineFilters(acc))
+    }
+    iterate(plan, CombineFilters(plan))
+  }
+}

http://git-wip-us.apache.org/repos/asf/spark/blob/01f6ba0e/sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala
----------------------------------------------------------------------
diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala
index 21e4685..5203e88 100644
--- a/sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala
+++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala
@@ -948,6 +948,19 @@ object SQLConf {
       .intConf
       .createWithDefault(10000)
 
+  val REPLACE_EXCEPT_WITH_FILTER = buildConf("spark.sql.optimizer.replaceExceptWithFilter")
+    .internal()
+    .doc("When true, the apply function of the rule verifies whether the right node of the" +
+      " except operation is of type Filter or Project followed by Filter. If yes, the rule" +
+      " further verifies 1) Excluding the filter operations from the right (as well as the" +
+      " left node, if any) on the top, whether both the nodes evaluates to a same result." +
+      " 2) The left and right nodes don't contain any SubqueryExpressions. 3) The output" +
+      " column names of the left node are distinct. If all the conditions are met, the" +
+      " rule will replace the except operation with a Filter by flipping the filter" +
+      " condition(s) of the right node.")
+    .booleanConf
+    .createWithDefault(true)
+
   object Deprecated {
     val MAPRED_REDUCE_TASKS = "mapred.reduce.tasks"
   }
@@ -1233,6 +1246,8 @@ class SQLConf extends Serializable with Logging {
 
   def arrowMaxRecordsPerBatch: Int = getConf(ARROW_EXECUTION_MAX_RECORDS_PER_BATCH)
 
+  def replaceExceptWithFilter: Boolean = getConf(REPLACE_EXCEPT_WITH_FILTER)
+
   /** ********************** SQLConf functionality methods ************ */
 
   /** Set Spark SQL configuration properties. */

http://git-wip-us.apache.org/repos/asf/spark/blob/01f6ba0e/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/ReplaceOperatorSuite.scala
----------------------------------------------------------------------
diff --git a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/ReplaceOperatorSuite.scala b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/ReplaceOperatorSuite.scala
index 85988d2..0fa1aae 100644
--- a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/ReplaceOperatorSuite.scala
+++ b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/ReplaceOperatorSuite.scala
@@ -17,9 +17,10 @@
 
 package org.apache.spark.sql.catalyst.optimizer
 
+import org.apache.spark.sql.Row
 import org.apache.spark.sql.catalyst.dsl.expressions._
 import org.apache.spark.sql.catalyst.dsl.plans._
-import org.apache.spark.sql.catalyst.expressions.Alias
+import org.apache.spark.sql.catalyst.expressions.{Alias, Not}
 import org.apache.spark.sql.catalyst.expressions.aggregate.First
 import org.apache.spark.sql.catalyst.plans.{LeftAnti, LeftSemi, PlanTest}
 import org.apache.spark.sql.catalyst.plans.logical._
@@ -31,6 +32,7 @@ class ReplaceOperatorSuite extends PlanTest {
     val batches =
       Batch("Replace Operators", FixedPoint(100),
         ReplaceDistinctWithAggregate,
+        ReplaceExceptWithFilter,
         ReplaceExceptWithAntiJoin,
         ReplaceIntersectWithSemiJoin,
         ReplaceDeduplicateWithAggregate) :: Nil
@@ -50,6 +52,108 @@ class ReplaceOperatorSuite extends PlanTest {
     comparePlans(optimized, correctAnswer)
   }
 
+  test("replace Except with Filter while both the nodes are of type Filter") {
+    val attributeA = 'a.int
+    val attributeB = 'b.int
+
+    val table1 = LocalRelation.fromExternalRows(Seq(attributeA, attributeB), data = Seq(Row(1, 2)))
+    val table2 = Filter(attributeB === 2, Filter(attributeA === 1, table1))
+    val table3 = Filter(attributeB < 1, Filter(attributeA >= 2, table1))
+
+    val query = Except(table2, table3)
+    val optimized = Optimize.execute(query.analyze)
+
+    val correctAnswer =
+      Aggregate(table1.output, table1.output,
+        Filter(Not((attributeA.isNotNull && attributeB.isNotNull) &&
+          (attributeA >= 2 && attributeB < 1)),
+          Filter(attributeB === 2, Filter(attributeA === 1, table1)))).analyze
+
+    comparePlans(optimized, correctAnswer)
+  }
+
+  test("replace Except with Filter while only right node is of type Filter") {
+    val attributeA = 'a.int
+    val attributeB = 'b.int
+
+    val table1 = LocalRelation.fromExternalRows(Seq(attributeA, attributeB), data = Seq(Row(1, 2)))
+    val table2 = Filter(attributeB < 1, Filter(attributeA >= 2, table1))
+
+    val query = Except(table1, table2)
+    val optimized = Optimize.execute(query.analyze)
+
+    val correctAnswer =
+      Aggregate(table1.output, table1.output,
+        Filter(Not((attributeA.isNotNull && attributeB.isNotNull) &&
+          (attributeA >= 2 && attributeB < 1)), table1)).analyze
+
+    comparePlans(optimized, correctAnswer)
+  }
+
+  test("replace Except with Filter while both the nodes are of type Project") {
+    val attributeA = 'a.int
+    val attributeB = 'b.int
+
+    val table1 = LocalRelation.fromExternalRows(Seq(attributeA, attributeB), data = Seq(Row(1, 2)))
+    val table2 = Project(Seq(attributeA, attributeB), table1)
+    val table3 = Project(Seq(attributeA, attributeB),
+      Filter(attributeB < 1, Filter(attributeA >= 2, table1)))
+
+    val query = Except(table2, table3)
+    val optimized = Optimize.execute(query.analyze)
+
+    val correctAnswer =
+      Aggregate(table1.output, table1.output,
+        Filter(Not((attributeA.isNotNull && attributeB.isNotNull) &&
+          (attributeA >= 2 && attributeB < 1)),
+          Project(Seq(attributeA, attributeB), table1))).analyze
+
+    comparePlans(optimized, correctAnswer)
+  }
+
+  test("replace Except with Filter while only right node is of type Project") {
+    val attributeA = 'a.int
+    val attributeB = 'b.int
+
+    val table1 = LocalRelation.fromExternalRows(Seq(attributeA, attributeB), data = Seq(Row(1, 2)))
+    val table2 = Filter(attributeB === 2, Filter(attributeA === 1, table1))
+    val table3 = Project(Seq(attributeA, attributeB),
+      Filter(attributeB < 1, Filter(attributeA >= 2, table1)))
+
+    val query = Except(table2, table3)
+    val optimized = Optimize.execute(query.analyze)
+
+    val correctAnswer =
+      Aggregate(table1.output, table1.output,
+          Filter(Not((attributeA.isNotNull && attributeB.isNotNull) &&
+            (attributeA >= 2 && attributeB < 1)),
+            Filter(attributeB === 2, Filter(attributeA === 1, table1)))).analyze
+
+    comparePlans(optimized, correctAnswer)
+  }
+
+  test("replace Except with Filter while left node is Project and right node is Filter") {
+    val attributeA = 'a.int
+    val attributeB = 'b.int
+
+    val table1 = LocalRelation.fromExternalRows(Seq(attributeA, attributeB), data = Seq(Row(1, 2)))
+    val table2 = Project(Seq(attributeA, attributeB),
+      Filter(attributeB < 1, Filter(attributeA >= 2, table1)))
+    val table3 = Filter(attributeB === 2, Filter(attributeA === 1, table1))
+
+    val query = Except(table2, table3)
+    val optimized = Optimize.execute(query.analyze)
+
+    val correctAnswer =
+      Aggregate(table1.output, table1.output,
+        Filter(Not((attributeA.isNotNull && attributeB.isNotNull) &&
+          (attributeA === 1 && attributeB === 2)),
+          Project(Seq(attributeA, attributeB),
+            Filter(attributeB < 1, Filter(attributeA >= 2, table1))))).analyze
+
+    comparePlans(optimized, correctAnswer)
+  }
+
   test("replace Except with Left-anti Join") {
     val table1 = LocalRelation('a.int, 'b.int)
     val table2 = LocalRelation('c.int, 'd.int)

http://git-wip-us.apache.org/repos/asf/spark/blob/01f6ba0e/sql/core/src/test/resources/sql-tests/inputs/except.sql
----------------------------------------------------------------------
diff --git a/sql/core/src/test/resources/sql-tests/inputs/except.sql b/sql/core/src/test/resources/sql-tests/inputs/except.sql
new file mode 100644
index 0000000..1d579e6
--- /dev/null
+++ b/sql/core/src/test/resources/sql-tests/inputs/except.sql
@@ -0,0 +1,57 @@
+-- Tests different scenarios of except operation
+create temporary view t1 as select * from values
+  ("one", 1),
+  ("two", 2),
+  ("three", 3),
+  ("one", NULL)
+  as t1(k, v);
+
+create temporary view t2 as select * from values
+  ("one", 1),
+  ("two", 22),
+  ("one", 5),
+  ("one", NULL),
+  (NULL, 5)
+  as t2(k, v);
+
+
+-- Except operation that will be replaced by left anti join
+SELECT * FROM t1 EXCEPT SELECT * FROM t2;
+
+
+-- Except operation that will be replaced by Filter: SPARK-22181
+SELECT * FROM t1 EXCEPT SELECT * FROM t1 where v <> 1 and v <> 2;
+
+
+-- Except operation that will be replaced by Filter: SPARK-22181
+SELECT * FROM t1 where v <> 1 and v <> 22 EXCEPT SELECT * FROM t1 where v <> 2 and v >= 3;
+
+
+-- Except operation that will be replaced by Filter: SPARK-22181
+SELECT t1.* FROM t1, t2 where t1.k = t2.k
+EXCEPT
+SELECT t1.* FROM t1, t2 where t1.k = t2.k and t1.k != 'one';
+
+
+-- Except operation that will be replaced by left anti join
+SELECT * FROM t2 where v >= 1 and v <> 22 EXCEPT SELECT * FROM t1;
+
+
+-- Except operation that will be replaced by left anti join
+SELECT (SELECT min(k) FROM t2 WHERE t2.k = t1.k) min_t2 FROM t1
+MINUS
+SELECT (SELECT min(k) FROM t2) abs_min_t2 FROM t1 WHERE  t1.k = 'one';
+
+
+-- Except operation that will be replaced by left anti join
+SELECT t1.k
+FROM   t1
+WHERE  t1.v <= (SELECT   max(t2.v)
+                FROM     t2
+                WHERE    t2.k = t1.k)
+MINUS
+SELECT t1.k
+FROM   t1
+WHERE  t1.v >= (SELECT   min(t2.v)
+                FROM     t2
+                WHERE    t2.k = t1.k);

http://git-wip-us.apache.org/repos/asf/spark/blob/01f6ba0e/sql/core/src/test/resources/sql-tests/results/except.sql.out
----------------------------------------------------------------------
diff --git a/sql/core/src/test/resources/sql-tests/results/except.sql.out b/sql/core/src/test/resources/sql-tests/results/except.sql.out
new file mode 100644
index 0000000..c9b712d
--- /dev/null
+++ b/sql/core/src/test/resources/sql-tests/results/except.sql.out
@@ -0,0 +1,105 @@
+-- Automatically generated by SQLQueryTestSuite
+-- Number of queries: 9
+
+
+-- !query 0
+create temporary view t1 as select * from values
+  ("one", 1),
+  ("two", 2),
+  ("three", 3),
+  ("one", NULL)
+  as t1(k, v)
+-- !query 0 schema
+struct<>
+-- !query 0 output
+
+
+
+-- !query 1
+create temporary view t2 as select * from values
+  ("one", 1),
+  ("two", 22),
+  ("one", 5),
+  ("one", NULL),
+  (NULL, 5)
+  as t2(k, v)
+-- !query 1 schema
+struct<>
+-- !query 1 output
+
+
+
+-- !query 2
+SELECT * FROM t1 EXCEPT SELECT * FROM t2
+-- !query 2 schema
+struct<k:string,v:int>
+-- !query 2 output
+three	3
+two	2
+
+
+-- !query 3
+SELECT * FROM t1 EXCEPT SELECT * FROM t1 where v <> 1 and v <> 2
+-- !query 3 schema
+struct<k:string,v:int>
+-- !query 3 output
+one	1
+one	NULL
+two	2
+
+
+-- !query 4
+SELECT * FROM t1 where v <> 1 and v <> 22 EXCEPT SELECT * FROM t1 where v <> 2 and v >= 3
+-- !query 4 schema
+struct<k:string,v:int>
+-- !query 4 output
+two	2
+
+
+-- !query 5
+SELECT t1.* FROM t1, t2 where t1.k = t2.k
+EXCEPT
+SELECT t1.* FROM t1, t2 where t1.k = t2.k and t1.k != 'one'
+-- !query 5 schema
+struct<k:string,v:int>
+-- !query 5 output
+one	1
+one	NULL
+
+
+-- !query 6
+SELECT * FROM t2 where v >= 1 and v <> 22 EXCEPT SELECT * FROM t1
+-- !query 6 schema
+struct<k:string,v:int>
+-- !query 6 output
+NULL	5
+one	5
+
+
+-- !query 7
+SELECT (SELECT min(k) FROM t2 WHERE t2.k = t1.k) min_t2 FROM t1
+MINUS
+SELECT (SELECT min(k) FROM t2) abs_min_t2 FROM t1 WHERE  t1.k = 'one'
+-- !query 7 schema
+struct<min_t2:string>
+-- !query 7 output
+NULL
+two
+
+
+-- !query 8
+SELECT t1.k
+FROM   t1
+WHERE  t1.v <= (SELECT   max(t2.v)
+                FROM     t2
+                WHERE    t2.k = t1.k)
+MINUS
+SELECT t1.k
+FROM   t1
+WHERE  t1.v >= (SELECT   min(t2.v)
+                FROM     t2
+                WHERE    t2.k = t1.k)
+-- !query 8 schema
+struct<k:string>
+-- !query 8 output
+two


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@spark.apache.org
For additional commands, e-mail: commits-help@spark.apache.org