You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@spark.apache.org by su...@apache.org on 2022/08/17 22:33:04 UTC

[spark] branch master updated: [SPARK-40110][SQL][TESTS] Add JDBCWithAQESuite

This is an automated email from the ASF dual-hosted git repository.

sunchao pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/spark.git


The following commit(s) were added to refs/heads/master by this push:
     new 44f30a04dad [SPARK-40110][SQL][TESTS] Add JDBCWithAQESuite
44f30a04dad is described below

commit 44f30a04dad2baa471b505f95c6a29992ee7ca72
Author: Kazuyuki Tanimura <kt...@apple.com>
AuthorDate: Wed Aug 17 15:32:46 2022 -0700

    [SPARK-40110][SQL][TESTS] Add JDBCWithAQESuite
    
    ### What changes were proposed in this pull request?
    This PR proposes to add `JDBCWithAQESuite` i.e. test cases of `JDBCSuite` with AQE (Adaptive Query Execution) enabled.
    
    ### Why are the changes needed?
    Currently `JDBCSuite` assumes that AQE is always turned off. We should also test with AQE turned on
    
    ### Does this PR introduce _any_ user-facing change?
    No
    
    ### How was this patch tested?
    Added the AQE version tests along with the non AQE version
    
    Closes #37544 from kazuyukitanimura/SPARK-40110.
    
    Authored-by: Kazuyuki Tanimura <kt...@apple.com>
    Signed-off-by: Chao Sun <su...@apple.com>
---
 .../org/apache/spark/sql/jdbc/JDBCSuite.scala      | 32 ++++++++++++++++------
 1 file changed, 23 insertions(+), 9 deletions(-)

diff --git a/sql/core/src/test/scala/org/apache/spark/sql/jdbc/JDBCSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/jdbc/JDBCSuite.scala
index b87fee6cec2..8eda0c288a3 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/jdbc/JDBCSuite.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/jdbc/JDBCSuite.scala
@@ -34,6 +34,7 @@ import org.apache.spark.sql.catalyst.parser.CatalystSqlParser
 import org.apache.spark.sql.catalyst.plans.logical.ShowCreateTable
 import org.apache.spark.sql.catalyst.util.{CaseInsensitiveMap, DateTimeTestUtils}
 import org.apache.spark.sql.execution.{DataSourceScanExec, ExtendedMode}
+import org.apache.spark.sql.execution.adaptive.{AdaptiveSparkPlanExec, AdaptiveSparkPlanHelper, DisableAdaptiveExecutionSuite, EnableAdaptiveExecutionSuite}
 import org.apache.spark.sql.execution.command.{ExplainCommand, ShowCreateTableCommand}
 import org.apache.spark.sql.execution.datasources.LogicalRelation
 import org.apache.spark.sql.execution.datasources.jdbc.{JDBCOptions, JDBCPartition, JDBCRelation, JdbcUtils}
@@ -44,7 +45,8 @@ import org.apache.spark.sql.test.SharedSparkSession
 import org.apache.spark.sql.types._
 import org.apache.spark.util.Utils
 
-class JDBCSuite extends QueryTest with SharedSparkSession {
+class JDBCSuite extends QueryTest with SharedSparkSession
+  with AdaptiveSparkPlanHelper with DisableAdaptiveExecutionSuite {
   import testImplicits._
 
   val url = "jdbc:h2:mem:testdb0"
@@ -298,10 +300,15 @@ class JDBCSuite extends QueryTest with SharedSparkSession {
     val parentPlan = df.queryExecution.executedPlan
     // Check if SparkPlan Filter is removed in a physical plan and
     // the plan only has PhysicalRDD to scan JDBCRelation.
-    assert(parentPlan.isInstanceOf[org.apache.spark.sql.execution.WholeStageCodegenExec])
-    val node = parentPlan.asInstanceOf[org.apache.spark.sql.execution.WholeStageCodegenExec]
-    assert(node.child.isInstanceOf[org.apache.spark.sql.execution.DataSourceScanExec])
-    assert(node.child.asInstanceOf[DataSourceScanExec].nodeName.contains("JDBCRelation"))
+    val child = if (df.sqlContext.conf.adaptiveExecutionEnabled) {
+      assert(parentPlan.isInstanceOf[AdaptiveSparkPlanExec])
+      parentPlan.asInstanceOf[AdaptiveSparkPlanExec].executedPlan
+    } else {
+      assert(parentPlan.isInstanceOf[org.apache.spark.sql.execution.WholeStageCodegenExec])
+      parentPlan.asInstanceOf[org.apache.spark.sql.execution.WholeStageCodegenExec].child
+    }
+    assert(child.isInstanceOf[org.apache.spark.sql.execution.DataSourceScanExec])
+    assert(child.asInstanceOf[DataSourceScanExec].nodeName.contains("JDBCRelation"))
     df
   }
 
@@ -309,9 +316,14 @@ class JDBCSuite extends QueryTest with SharedSparkSession {
     val parentPlan = df.queryExecution.executedPlan
     // Check if SparkPlan Filter is not removed in a physical plan because JDBCRDD
     // cannot compile given predicates.
-    assert(parentPlan.isInstanceOf[org.apache.spark.sql.execution.WholeStageCodegenExec])
-    val node = parentPlan.asInstanceOf[org.apache.spark.sql.execution.WholeStageCodegenExec]
-    assert(node.child.isInstanceOf[org.apache.spark.sql.execution.FilterExec])
+    val child = if (df.sqlContext.conf.adaptiveExecutionEnabled) {
+      assert(parentPlan.isInstanceOf[AdaptiveSparkPlanExec])
+      parentPlan.asInstanceOf[AdaptiveSparkPlanExec].executedPlan
+    } else {
+      assert(parentPlan.isInstanceOf[org.apache.spark.sql.execution.WholeStageCodegenExec])
+      parentPlan.asInstanceOf[org.apache.spark.sql.execution.WholeStageCodegenExec].child
+    }
+    assert(child.isInstanceOf[org.apache.spark.sql.execution.FilterExec])
     df
   }
 
@@ -1767,7 +1779,7 @@ class JDBCSuite extends QueryTest with SharedSparkSession {
 
     def getRowCount(df: DataFrame): Long = {
       val queryExecution = df.queryExecution
-      val rawPlan = queryExecution.executedPlan.collect {
+      val rawPlan = collect(queryExecution.executedPlan) {
         case p: DataSourceScanExec => p
       } match {
         case Seq(p) => p
@@ -1964,3 +1976,5 @@ class JDBCSuite extends QueryTest with SharedSparkSession {
     }
   }
 }
+
+class JDBCWithAQESuite extends JDBCSuite with EnableAdaptiveExecutionSuite


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@spark.apache.org
For additional commands, e-mail: commits-help@spark.apache.org