You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@spark.apache.org by su...@apache.org on 2022/08/17 22:33:04 UTC
[spark] branch master updated: [SPARK-40110][SQL][TESTS] Add JDBCWithAQESuite
This is an automated email from the ASF dual-hosted git repository.
sunchao pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/spark.git
The following commit(s) were added to refs/heads/master by this push:
new 44f30a04dad [SPARK-40110][SQL][TESTS] Add JDBCWithAQESuite
44f30a04dad is described below
commit 44f30a04dad2baa471b505f95c6a29992ee7ca72
Author: Kazuyuki Tanimura <kt...@apple.com>
AuthorDate: Wed Aug 17 15:32:46 2022 -0700
[SPARK-40110][SQL][TESTS] Add JDBCWithAQESuite
### What changes were proposed in this pull request?
This PR proposes to add `JDBCWithAQESuite` i.e. test cases of `JDBCSuite` with AQE (Adaptive Query Execution) enabled.
### Why are the changes needed?
Currently `JDBCSuite` assumes that AQE is always turned off. We should also test with AQE turned on
### Does this PR introduce _any_ user-facing change?
No
### How was this patch tested?
Added the AQE version tests along with the non AQE version
Closes #37544 from kazuyukitanimura/SPARK-40110.
Authored-by: Kazuyuki Tanimura <kt...@apple.com>
Signed-off-by: Chao Sun <su...@apple.com>
---
.../org/apache/spark/sql/jdbc/JDBCSuite.scala | 32 ++++++++++++++++------
1 file changed, 23 insertions(+), 9 deletions(-)
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/jdbc/JDBCSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/jdbc/JDBCSuite.scala
index b87fee6cec2..8eda0c288a3 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/jdbc/JDBCSuite.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/jdbc/JDBCSuite.scala
@@ -34,6 +34,7 @@ import org.apache.spark.sql.catalyst.parser.CatalystSqlParser
import org.apache.spark.sql.catalyst.plans.logical.ShowCreateTable
import org.apache.spark.sql.catalyst.util.{CaseInsensitiveMap, DateTimeTestUtils}
import org.apache.spark.sql.execution.{DataSourceScanExec, ExtendedMode}
+import org.apache.spark.sql.execution.adaptive.{AdaptiveSparkPlanExec, AdaptiveSparkPlanHelper, DisableAdaptiveExecutionSuite, EnableAdaptiveExecutionSuite}
import org.apache.spark.sql.execution.command.{ExplainCommand, ShowCreateTableCommand}
import org.apache.spark.sql.execution.datasources.LogicalRelation
import org.apache.spark.sql.execution.datasources.jdbc.{JDBCOptions, JDBCPartition, JDBCRelation, JdbcUtils}
@@ -44,7 +45,8 @@ import org.apache.spark.sql.test.SharedSparkSession
import org.apache.spark.sql.types._
import org.apache.spark.util.Utils
-class JDBCSuite extends QueryTest with SharedSparkSession {
+class JDBCSuite extends QueryTest with SharedSparkSession
+ with AdaptiveSparkPlanHelper with DisableAdaptiveExecutionSuite {
import testImplicits._
val url = "jdbc:h2:mem:testdb0"
@@ -298,10 +300,15 @@ class JDBCSuite extends QueryTest with SharedSparkSession {
val parentPlan = df.queryExecution.executedPlan
// Check if SparkPlan Filter is removed in a physical plan and
// the plan only has PhysicalRDD to scan JDBCRelation.
- assert(parentPlan.isInstanceOf[org.apache.spark.sql.execution.WholeStageCodegenExec])
- val node = parentPlan.asInstanceOf[org.apache.spark.sql.execution.WholeStageCodegenExec]
- assert(node.child.isInstanceOf[org.apache.spark.sql.execution.DataSourceScanExec])
- assert(node.child.asInstanceOf[DataSourceScanExec].nodeName.contains("JDBCRelation"))
+ val child = if (df.sqlContext.conf.adaptiveExecutionEnabled) {
+ assert(parentPlan.isInstanceOf[AdaptiveSparkPlanExec])
+ parentPlan.asInstanceOf[AdaptiveSparkPlanExec].executedPlan
+ } else {
+ assert(parentPlan.isInstanceOf[org.apache.spark.sql.execution.WholeStageCodegenExec])
+ parentPlan.asInstanceOf[org.apache.spark.sql.execution.WholeStageCodegenExec].child
+ }
+ assert(child.isInstanceOf[org.apache.spark.sql.execution.DataSourceScanExec])
+ assert(child.asInstanceOf[DataSourceScanExec].nodeName.contains("JDBCRelation"))
df
}
@@ -309,9 +316,14 @@ class JDBCSuite extends QueryTest with SharedSparkSession {
val parentPlan = df.queryExecution.executedPlan
// Check if SparkPlan Filter is not removed in a physical plan because JDBCRDD
// cannot compile given predicates.
- assert(parentPlan.isInstanceOf[org.apache.spark.sql.execution.WholeStageCodegenExec])
- val node = parentPlan.asInstanceOf[org.apache.spark.sql.execution.WholeStageCodegenExec]
- assert(node.child.isInstanceOf[org.apache.spark.sql.execution.FilterExec])
+ val child = if (df.sqlContext.conf.adaptiveExecutionEnabled) {
+ assert(parentPlan.isInstanceOf[AdaptiveSparkPlanExec])
+ parentPlan.asInstanceOf[AdaptiveSparkPlanExec].executedPlan
+ } else {
+ assert(parentPlan.isInstanceOf[org.apache.spark.sql.execution.WholeStageCodegenExec])
+ parentPlan.asInstanceOf[org.apache.spark.sql.execution.WholeStageCodegenExec].child
+ }
+ assert(child.isInstanceOf[org.apache.spark.sql.execution.FilterExec])
df
}
@@ -1767,7 +1779,7 @@ class JDBCSuite extends QueryTest with SharedSparkSession {
def getRowCount(df: DataFrame): Long = {
val queryExecution = df.queryExecution
- val rawPlan = queryExecution.executedPlan.collect {
+ val rawPlan = collect(queryExecution.executedPlan) {
case p: DataSourceScanExec => p
} match {
case Seq(p) => p
@@ -1964,3 +1976,5 @@ class JDBCSuite extends QueryTest with SharedSparkSession {
}
}
}
+
+class JDBCWithAQESuite extends JDBCSuite with EnableAdaptiveExecutionSuite
---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@spark.apache.org
For additional commands, e-mail: commits-help@spark.apache.org