You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@spark.apache.org by li...@apache.org on 2018/05/11 17:00:31 UTC
spark git commit: [SPARK-24172][SQL] we should not apply operator
pushdown to data source v2 many times
Repository: spark
Updated Branches:
refs/heads/master 54032682b -> 928845a42
[SPARK-24172][SQL] we should not apply operator pushdown to data source v2 many times
## What changes were proposed in this pull request?
In `PushDownOperatorsToDataSource`, we use `transformUp` to match `PhysicalOperation` and apply pushdown. This is problematic if we have multiple `Filter` and `Project` above the data source v2 relation.
e.g. for a query
```
Project
Filter
DataSourceV2Relation
```
The pattern match will be triggered twice and we will do operator pushdown twice. This is unnecessary, we can use `mapChildren` to only apply pushdown once.
## How was this patch tested?
existing test
Author: Wenchen Fan <we...@databricks.com>
Closes #21230 from cloud-fan/step2.
Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/928845a4
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/928845a4
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/928845a4
Branch: refs/heads/master
Commit: 928845a42230a2c0a318011002a54ad871468b2e
Parents: 5403268
Author: Wenchen Fan <we...@databricks.com>
Authored: Fri May 11 10:00:28 2018 -0700
Committer: gatorsmile <ga...@gmail.com>
Committed: Fri May 11 10:00:28 2018 -0700
----------------------------------------------------------------------
.../v2/PushDownOperatorsToDataSource.scala | 15 +++++----------
1 file changed, 5 insertions(+), 10 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/spark/blob/928845a4/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/PushDownOperatorsToDataSource.scala
----------------------------------------------------------------------
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/PushDownOperatorsToDataSource.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/PushDownOperatorsToDataSource.scala
index 9293d4f..e894f8a 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/PushDownOperatorsToDataSource.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/PushDownOperatorsToDataSource.scala
@@ -23,17 +23,10 @@ import org.apache.spark.sql.catalyst.plans.logical.{Filter, LogicalPlan, Project
import org.apache.spark.sql.catalyst.rules.Rule
object PushDownOperatorsToDataSource extends Rule[LogicalPlan] {
- override def apply(
- plan: LogicalPlan): LogicalPlan = plan transformUp {
+ override def apply(plan: LogicalPlan): LogicalPlan = plan match {
// PhysicalOperation guarantees that filters are deterministic; no need to check
- case PhysicalOperation(project, newFilters, relation : DataSourceV2Relation) =>
- // merge the filters
- val filters = relation.filters match {
- case Some(existing) =>
- existing ++ newFilters
- case _ =>
- newFilters
- }
+ case PhysicalOperation(project, filters, relation: DataSourceV2Relation) =>
+ assert(relation.filters.isEmpty, "data source v2 should do push down only once.")
val projectAttrs = project.map(_.toAttribute)
val projectSet = AttributeSet(project.flatMap(_.references))
@@ -67,5 +60,7 @@ object PushDownOperatorsToDataSource extends Rule[LogicalPlan] {
} else {
filtered
}
+
+ case other => other.mapChildren(apply)
}
}
---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@spark.apache.org
For additional commands, e-mail: commits-help@spark.apache.org