You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@spark.apache.org by ge...@apache.org on 2020/09/14 08:00:02 UTC

[spark] branch branch-2.4 updated: [SPARK-32708] Query optimization fails to reuse exchange with DataSourceV2

This is an automated email from the ASF dual-hosted git repository.

gengliang pushed a commit to branch branch-2.4
in repository https://gitbox.apache.org/repos/asf/spark.git


The following commit(s) were added to refs/heads/branch-2.4 by this push:
     new d33052b  [SPARK-32708] Query optimization fails to reuse exchange with DataSourceV2
d33052b is described below

commit d33052b8456818a71724638248cfb91f1aacf1d4
Author: mingjial <mi...@google.com>
AuthorDate: Mon Sep 14 15:51:13 2020 +0800

    [SPARK-32708] Query optimization fails to reuse exchange with DataSourceV2
    
    ### What changes were proposed in this pull request?
    
    Override doCanonicalize function of class DataSourceV2ScanExec
    
    ### Why are the changes needed?
    
    Query plan of DataSourceV2 fails to reuse any exchange. This change can make DataSourceV2's plan more optimized and reuse exchange as DataSourceV1 and parquet do.
    
    Direct reason: equals function of DataSourceV2ScanExec returns 'false' as comparing the same V2 scans(same table, outputs and pushedfilters)
    
    Actual cause : With query plan's default doCanonicalize function, pushedfilters of DataSourceV2ScanExec are not canonicalized correctly. Essentially expressionId of predicate columns are not normalized.
    
    [Spark 32708](https://issues.apache.org/jira/browse/SPARK-32708#) was not caught by my [tests](https://github.com/apache/spark/blob/5b1b9b39eb612cbf9ec67efd4e364adafcff66c4/sql/core/src/test/scala/org/apache/spark/sql/sources/v2/DataSourceV2Suite.scala#L392) previously added for [SPARK-32609] because the above issue happens only when the same filtered column are of different expression id (eg :  join table t1 with t1)
    
    ### Does this PR introduce _any_ user-facing change?
    
    no
    
    ### How was this patch tested?
    
    unit test added
    
    Closes #29564 from mingjialiu/branch-2.4.
    
    Authored-by: mingjial <mi...@google.com>
    Signed-off-by: Gengliang Wang <ge...@databricks.com>
---
 .../datasources/v2/DataSourceV2ScanExec.scala      | 12 +++++++++++
 .../spark/sql/sources/v2/DataSourceV2Suite.scala   | 23 ++++++++++++++++++++++
 2 files changed, 35 insertions(+)

diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/DataSourceV2ScanExec.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/DataSourceV2ScanExec.scala
index 9b70eec..2fe563a 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/DataSourceV2ScanExec.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/DataSourceV2ScanExec.scala
@@ -22,6 +22,7 @@ import scala.collection.JavaConverters._
 import org.apache.spark.rdd.RDD
 import org.apache.spark.sql.catalyst.InternalRow
 import org.apache.spark.sql.catalyst.expressions._
+import org.apache.spark.sql.catalyst.plans.QueryPlan
 import org.apache.spark.sql.catalyst.plans.physical
 import org.apache.spark.sql.catalyst.plans.physical.SinglePartition
 import org.apache.spark.sql.execution.{ColumnarBatchScan, LeafExecNode, WholeStageCodegenExec}
@@ -52,6 +53,17 @@ case class DataSourceV2ScanExec(
     case _ => false
   }
 
+  override def doCanonicalize(): DataSourceV2ScanExec = {
+    DataSourceV2ScanExec(
+      output.map(QueryPlan.normalizeExprId(_, output)),
+      source,
+      options,
+      QueryPlan.normalizePredicates(
+        pushedFilters,
+        AttributeSeq(pushedFilters.flatMap(_.references).distinct)),
+      reader)
+  }
+
   override def hashCode(): Int = {
     Seq(output, source, options).hashCode()
   }
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/sources/v2/DataSourceV2Suite.scala b/sql/core/src/test/scala/org/apache/spark/sql/sources/v2/DataSourceV2Suite.scala
index ef0a8bd..92e0ac9 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/sources/v2/DataSourceV2Suite.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/sources/v2/DataSourceV2Suite.scala
@@ -393,6 +393,29 @@ class DataSourceV2Suite extends QueryTest with SharedSQLContext {
       }
     }
   }
+
+  test("SPARK-32708: same columns with different ExprIds should be equal after canonicalization ") {
+    def getV2ScanExec(query: DataFrame): DataSourceV2ScanExec = {
+      query.queryExecution.executedPlan.collect {
+        case d: DataSourceV2ScanExec => d
+      }.head
+    }
+
+    val df1 = spark.read.format(classOf[AdvancedDataSourceV2].getName).load()
+    val q1 = df1.select('i).filter('i > 6)
+    val df2 = spark.read.format(classOf[AdvancedDataSourceV2].getName).load()
+    val q2 = df2.select('i).filter('i > 6)
+    val scan1 = getV2ScanExec(q1)
+    val scan2 = getV2ScanExec(q2)
+    assert(scan1.sameResult(scan2))
+    assert(scan1.doCanonicalize().equals(scan2.doCanonicalize()))
+
+    val q3 = df2.select('i).filter('i > 5)
+    val scan3 = getV2ScanExec(q3)
+    assert(!scan1.sameResult(scan3))
+    assert(!scan1.doCanonicalize().equals(scan3.doCanonicalize()))
+  }
+
 }
 
 class SimpleSinglePartitionSource extends DataSourceV2 with ReadSupport {


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@spark.apache.org
For additional commands, e-mail: commits-help@spark.apache.org