You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@spark.apache.org by li...@apache.org on 2018/05/11 17:00:31 UTC

spark git commit: [SPARK-24172][SQL] we should not apply operator pushdown to data source v2 many times

Repository: spark
Updated Branches:
  refs/heads/master 54032682b -> 928845a42


[SPARK-24172][SQL] we should not apply operator pushdown to data source v2 many times

## What changes were proposed in this pull request?

In `PushDownOperatorsToDataSource`, we use `transformUp` to match `PhysicalOperation` and apply pushdown. This is problematic if we have multiple `Filter` and `Project` above the data source v2 relation.

e.g. for a query
```
Project
  Filter
    DataSourceV2Relation
```

The pattern match will be triggered twice and we will do operator pushdown twice. This is unnecessary, we can use `mapChildren` to only apply pushdown once.

## How was this patch tested?

existing test

Author: Wenchen Fan <we...@databricks.com>

Closes #21230 from cloud-fan/step2.


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/928845a4
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/928845a4
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/928845a4

Branch: refs/heads/master
Commit: 928845a42230a2c0a318011002a54ad871468b2e
Parents: 5403268
Author: Wenchen Fan <we...@databricks.com>
Authored: Fri May 11 10:00:28 2018 -0700
Committer: gatorsmile <ga...@gmail.com>
Committed: Fri May 11 10:00:28 2018 -0700

----------------------------------------------------------------------
 .../v2/PushDownOperatorsToDataSource.scala           | 15 +++++----------
 1 file changed, 5 insertions(+), 10 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/spark/blob/928845a4/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/PushDownOperatorsToDataSource.scala
----------------------------------------------------------------------
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/PushDownOperatorsToDataSource.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/PushDownOperatorsToDataSource.scala
index 9293d4f..e894f8a 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/PushDownOperatorsToDataSource.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/PushDownOperatorsToDataSource.scala
@@ -23,17 +23,10 @@ import org.apache.spark.sql.catalyst.plans.logical.{Filter, LogicalPlan, Project
 import org.apache.spark.sql.catalyst.rules.Rule
 
 object PushDownOperatorsToDataSource extends Rule[LogicalPlan] {
-  override def apply(
-      plan: LogicalPlan): LogicalPlan = plan transformUp {
+  override def apply(plan: LogicalPlan): LogicalPlan = plan match {
     // PhysicalOperation guarantees that filters are deterministic; no need to check
-    case PhysicalOperation(project, newFilters, relation : DataSourceV2Relation) =>
-      // merge the filters
-      val filters = relation.filters match {
-        case Some(existing) =>
-          existing ++ newFilters
-        case _ =>
-          newFilters
-      }
+    case PhysicalOperation(project, filters, relation: DataSourceV2Relation) =>
+      assert(relation.filters.isEmpty, "data source v2 should do push down only once.")
 
       val projectAttrs = project.map(_.toAttribute)
       val projectSet = AttributeSet(project.flatMap(_.references))
@@ -67,5 +60,7 @@ object PushDownOperatorsToDataSource extends Rule[LogicalPlan] {
       } else {
         filtered
       }
+
+    case other => other.mapChildren(apply)
   }
 }


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@spark.apache.org
For additional commands, e-mail: commits-help@spark.apache.org