You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@spark.apache.org by we...@apache.org on 2022/03/23 13:43:57 UTC
[spark] branch branch-3.3 updated: [SPARK-37483][SQL][FOLLOWUP] Rename `pushedTopN` to `PushedTopN` and improve JDBCV2Suite
This is an automated email from the ASF dual-hosted git repository.
wenchen pushed a commit to branch branch-3.3
in repository https://gitbox.apache.org/repos/asf/spark.git
The following commit(s) were added to refs/heads/branch-3.3 by this push:
new 737077a [SPARK-37483][SQL][FOLLOWUP] Rename `pushedTopN` to `PushedTopN` and improve JDBCV2Suite
737077a is described below
commit 737077af04c1a62a99bfd5dba731174dd29f97f4
Author: Jiaan Geng <be...@163.com>
AuthorDate: Wed Mar 23 21:40:44 2022 +0800
[SPARK-37483][SQL][FOLLOWUP] Rename `pushedTopN` to `PushedTopN` and improve JDBCV2Suite
### What changes were proposed in this pull request?
This PR fix three issues.
**First**, create method `checkPushedInfo` and `checkSortRemoved` to reuse code.
**Second**, remove method `checkPushedLimit`, because `checkPushedInfo` can cover it.
**Third**, rename `pushedTopN` to `PushedTopN`, so as consistent with other pushed information.
### Why are the changes needed?
Reuse code and let pushed information more correctly.
### Does this PR introduce _any_ user-facing change?
'No'. New feature and improve the tests.
### How was this patch tested?
Adjust existing tests.
Closes #35921 from beliefer/SPARK-37483_followup.
Authored-by: Jiaan Geng <be...@163.com>
Signed-off-by: Wenchen Fan <we...@databricks.com>
(cherry picked from commit 4fe55c522d7bc34487f21d0e69fc7c230d61a3bf)
Signed-off-by: Wenchen Fan <we...@databricks.com>
---
.../spark/sql/execution/DataSourceScanExec.scala | 2 +-
.../org/apache/spark/sql/jdbc/JDBCV2Suite.scala | 517 ++++++---------------
2 files changed, 132 insertions(+), 387 deletions(-)
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/DataSourceScanExec.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/DataSourceScanExec.scala
index e6de7d0..5067cd7 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/DataSourceScanExec.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/DataSourceScanExec.scala
@@ -148,7 +148,7 @@ case class RowDataSourceScanExec(
val pushedTopN =
s"ORDER BY ${seqToString(pushedDownOperators.sortValues.map(_.describe()))}" +
s" LIMIT ${pushedDownOperators.limit.get}"
- Some("pushedTopN" -> pushedTopN)
+ Some("PushedTopN" -> pushedTopN)
} else {
pushedDownOperators.limit.map(value => "PushedLimit" -> s"LIMIT $value")
}
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/jdbc/JDBCV2Suite.scala b/sql/core/src/test/scala/org/apache/spark/sql/jdbc/JDBCV2Suite.scala
index 31fdb02..e7e9174 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/jdbc/JDBCV2Suite.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/jdbc/JDBCV2Suite.scala
@@ -24,7 +24,6 @@ import org.apache.spark.{SparkConf, SparkException}
import org.apache.spark.sql.{DataFrame, ExplainSuiteHelper, QueryTest, Row}
import org.apache.spark.sql.catalyst.analysis.CannotReplaceMissingTableException
import org.apache.spark.sql.catalyst.plans.logical.{Aggregate, Filter, Sort}
-import org.apache.spark.sql.connector.expressions.{FieldReference, NullOrdering, SortDirection, SortValue}
import org.apache.spark.sql.execution.datasources.v2.{DataSourceV2ScanRelation, V1ScanWrapper}
import org.apache.spark.sql.execution.datasources.v2.jdbc.JDBCTableCatalog
import org.apache.spark.sql.functions.{avg, count, count_distinct, lit, not, sum, udf, when}
@@ -110,13 +109,20 @@ class JDBCV2Suite extends QueryTest with SharedSparkSession with ExplainSuiteHel
checkAnswer(sql("SELECT name, id FROM h2.test.people"), Seq(Row("fred", 1), Row("mary", 2)))
}
+ private def checkPushedInfo(df: DataFrame, expectedPlanFragment: String): Unit = {
+ df.queryExecution.optimizedPlan.collect {
+ case _: DataSourceV2ScanRelation =>
+ checkKeywordsExistsInExplain(df, expectedPlanFragment)
+ }
+ }
+
// TABLESAMPLE ({integer_expression | decimal_expression} PERCENT) and
// TABLESAMPLE (BUCKET integer_expression OUT OF integer_expression)
// are tested in JDBC dialect tests because TABLESAMPLE is not supported by all the DBMS
test("TABLESAMPLE (integer_expression ROWS) is the same as LIMIT") {
val df = sql("SELECT NAME FROM h2.test.employee TABLESAMPLE (3 ROWS)")
checkSchemaNames(df, Seq("NAME"))
- checkPushedLimit(df, Some(3))
+ checkPushedInfo(df, "PushedFilters: [], PushedLimit: LIMIT 3, ")
checkAnswer(df, Seq(Row("amy"), Row("alex"), Row("cathy")))
}
@@ -130,7 +136,8 @@ class JDBCV2Suite extends QueryTest with SharedSparkSession with ExplainSuiteHel
test("simple scan with LIMIT") {
val df1 = spark.read.table("h2.test.employee")
.where($"dept" === 1).limit(1)
- checkPushedLimit(df1, Some(1))
+ checkPushedInfo(df1,
+ "PushedFilters: [DEPT IS NOT NULL, DEPT = 1], PushedLimit: LIMIT 1, ")
checkAnswer(df1, Seq(Row(1, "amy", 10000.00, 1000.0, true)))
val df2 = spark.read
@@ -141,19 +148,22 @@ class JDBCV2Suite extends QueryTest with SharedSparkSession with ExplainSuiteHel
.table("h2.test.employee")
.filter($"dept" > 1)
.limit(1)
- checkPushedLimit(df2, Some(1))
+ checkPushedInfo(df2,
+ "PushedFilters: [DEPT IS NOT NULL, DEPT > 1], PushedLimit: LIMIT 1, ")
checkAnswer(df2, Seq(Row(2, "alex", 12000.00, 1200.0, false)))
val df3 = sql("SELECT name FROM h2.test.employee WHERE dept > 1 LIMIT 1")
checkSchemaNames(df3, Seq("NAME"))
- checkPushedLimit(df3, Some(1))
+ checkPushedInfo(df3,
+ "PushedFilters: [DEPT IS NOT NULL, DEPT > 1], PushedLimit: LIMIT 1, ")
checkAnswer(df3, Seq(Row("alex")))
val df4 = spark.read
.table("h2.test.employee")
.groupBy("DEPT").sum("SALARY")
.limit(1)
- checkPushedLimit(df4, None)
+ checkPushedInfo(df4,
+ "PushedAggregates: [SUM(SALARY)], PushedFilters: [], PushedGroupByColumns: [DEPT], ")
checkAnswer(df4, Seq(Row(1, 19000.00)))
val name = udf { (x: String) => x.matches("cat|dav|amy") }
@@ -164,24 +174,18 @@ class JDBCV2Suite extends QueryTest with SharedSparkSession with ExplainSuiteHel
.filter(name($"shortName"))
.limit(1)
// LIMIT is pushed down only if all the filters are pushed down
- checkPushedLimit(df5, None)
+ checkPushedInfo(df5, "PushedFilters: [], ")
checkAnswer(df5, Seq(Row(10000.00, 1000.0, "amy")))
}
- private def checkPushedLimit(df: DataFrame, limit: Option[Int] = None,
- sortValues: Seq[SortValue] = Nil): Unit = {
- df.queryExecution.optimizedPlan.collect {
- case relation: DataSourceV2ScanRelation => relation.scan match {
- case v1: V1ScanWrapper =>
- assert(v1.pushedDownOperators.limit === limit)
- assert(v1.pushedDownOperators.sortValues === sortValues)
- }
+ private def checkSortRemoved(df: DataFrame, removed: Boolean = true): Unit = {
+ val sorts = df.queryExecution.optimizedPlan.collect {
+ case s: Sort => s
}
- if (sortValues.nonEmpty) {
- val sorts = df.queryExecution.optimizedPlan.collect {
- case s: Sort => s
- }
+ if (removed) {
assert(sorts.isEmpty)
+ } else {
+ assert(sorts.nonEmpty)
}
}
@@ -190,12 +194,16 @@ class JDBCV2Suite extends QueryTest with SharedSparkSession with ExplainSuiteHel
.table("h2.test.employee")
.sort("salary")
.limit(1)
- checkPushedLimit(df1, Some(1), createSortValues())
+ checkSortRemoved(df1)
+ checkPushedInfo(df1,
+ "PushedFilters: [], PushedTopN: ORDER BY [salary ASC NULLS FIRST] LIMIT 1, ")
checkAnswer(df1, Seq(Row(1, "cathy", 9000.00, 1200.0, false)))
val df2 = spark.read.table("h2.test.employee")
.where($"dept" === 1).orderBy($"salary").limit(1)
- checkPushedLimit(df2, Some(1), createSortValues())
+ checkSortRemoved(df2)
+ checkPushedInfo(df2, "PushedFilters: [DEPT IS NOT NULL, DEPT = 1], " +
+ "PushedTopN: ORDER BY [salary ASC NULLS FIRST] LIMIT 1, ")
checkAnswer(df2, Seq(Row(1, "cathy", 9000.00, 1200.0, false)))
val df3 = spark.read
@@ -207,19 +215,23 @@ class JDBCV2Suite extends QueryTest with SharedSparkSession with ExplainSuiteHel
.filter($"dept" > 1)
.orderBy($"salary".desc)
.limit(1)
- checkPushedLimit(
- df3, Some(1), createSortValues(SortDirection.DESCENDING, NullOrdering.NULLS_LAST))
+ checkSortRemoved(df3)
+ checkPushedInfo(df3, "PushedFilters: [DEPT IS NOT NULL, DEPT > 1], " +
+ "PushedTopN: ORDER BY [salary DESC NULLS LAST] LIMIT 1, ")
checkAnswer(df3, Seq(Row(2, "alex", 12000.00, 1200.0, false)))
val df4 =
sql("SELECT name FROM h2.test.employee WHERE dept > 1 ORDER BY salary NULLS LAST LIMIT 1")
checkSchemaNames(df4, Seq("NAME"))
- checkPushedLimit(df4, Some(1), createSortValues(nullOrdering = NullOrdering.NULLS_LAST))
+ checkSortRemoved(df4)
+ checkPushedInfo(df4, "PushedFilters: [DEPT IS NOT NULL, DEPT > 1], " +
+ "PushedTopN: ORDER BY [salary ASC NULLS LAST] LIMIT 1, ")
checkAnswer(df4, Seq(Row("david")))
val df5 = spark.read.table("h2.test.employee")
.where($"dept" === 1).orderBy($"salary")
- checkPushedLimit(df5, None)
+ checkSortRemoved(df5, false)
+ checkPushedInfo(df5, "PushedFilters: [DEPT IS NOT NULL, DEPT = 1], ")
checkAnswer(df5,
Seq(Row(1, "cathy", 9000.00, 1200.0, false), Row(1, "amy", 10000.00, 1000.0, true)))
@@ -228,7 +240,9 @@ class JDBCV2Suite extends QueryTest with SharedSparkSession with ExplainSuiteHel
.groupBy("DEPT").sum("SALARY")
.orderBy("DEPT")
.limit(1)
- checkPushedLimit(df6)
+ checkSortRemoved(df6, false)
+ checkPushedInfo(df6, "PushedAggregates: [SUM(SALARY)]," +
+ " PushedFilters: [], PushedGroupByColumns: [DEPT], ")
checkAnswer(df6, Seq(Row(1, 19000.00)))
val name = udf { (x: String) => x.matches("cat|dav|amy") }
@@ -240,145 +254,69 @@ class JDBCV2Suite extends QueryTest with SharedSparkSession with ExplainSuiteHel
.sort($"SALARY".desc)
.limit(1)
// LIMIT is pushed down only if all the filters are pushed down
- checkPushedLimit(df7)
+ checkSortRemoved(df7, false)
+ checkPushedInfo(df7, "PushedFilters: [], ")
checkAnswer(df7, Seq(Row(10000.00, 1000.0, "amy")))
val df8 = spark.read
.table("h2.test.employee")
.sort(sub($"NAME"))
.limit(1)
- checkPushedLimit(df8)
+ checkSortRemoved(df8, false)
+ checkPushedInfo(df8, "PushedFilters: [], ")
checkAnswer(df8, Seq(Row(2, "alex", 12000.00, 1200.0, false)))
}
- private def createSortValues(
- sortDirection: SortDirection = SortDirection.ASCENDING,
- nullOrdering: NullOrdering = NullOrdering.NULLS_FIRST): Seq[SortValue] = {
- Seq(SortValue(FieldReference("salary"), sortDirection, nullOrdering))
- }
-
test("scan with filter push-down") {
val df = spark.table("h2.test.people").filter($"id" > 1)
-
checkFiltersRemoved(df)
-
- df.queryExecution.optimizedPlan.collect {
- case _: DataSourceV2ScanRelation =>
- val expected_plan_fragment =
- "PushedFilters: [ID IS NOT NULL, ID > 1]"
- checkKeywordsExistsInExplain(df, expected_plan_fragment)
- }
-
+ checkPushedInfo(df, "PushedFilters: [ID IS NOT NULL, ID > 1], ")
checkAnswer(df, Row("mary", 2))
val df2 = spark.table("h2.test.employee").filter($"name".isin("amy", "cathy"))
-
checkFiltersRemoved(df2)
-
- df2.queryExecution.optimizedPlan.collect {
- case _: DataSourceV2ScanRelation =>
- val expected_plan_fragment =
- "PushedFilters: [NAME IN ('amy', 'cathy')]"
- checkKeywordsExistsInExplain(df2, expected_plan_fragment)
- }
-
+ checkPushedInfo(df2, "PushedFilters: [NAME IN ('amy', 'cathy')]")
checkAnswer(df2, Seq(Row(1, "amy", 10000, 1000, true), Row(1, "cathy", 9000, 1200, false)))
val df3 = spark.table("h2.test.employee").filter($"name".startsWith("a"))
-
checkFiltersRemoved(df3)
-
- df3.queryExecution.optimizedPlan.collect {
- case _: DataSourceV2ScanRelation =>
- val expected_plan_fragment =
- "PushedFilters: [NAME IS NOT NULL, NAME LIKE 'a%']"
- checkKeywordsExistsInExplain(df3, expected_plan_fragment)
- }
-
+ checkPushedInfo(df3, "PushedFilters: [NAME IS NOT NULL, NAME LIKE 'a%']")
checkAnswer(df3, Seq(Row(1, "amy", 10000, 1000, true), Row(2, "alex", 12000, 1200, false)))
val df4 = spark.table("h2.test.employee").filter($"is_manager")
-
checkFiltersRemoved(df4)
-
- df4.queryExecution.optimizedPlan.collect {
- case _: DataSourceV2ScanRelation =>
- val expected_plan_fragment =
- "PushedFilters: [IS_MANAGER IS NOT NULL, IS_MANAGER = true]"
- checkKeywordsExistsInExplain(df4, expected_plan_fragment)
- }
-
+ checkPushedInfo(df4, "PushedFilters: [IS_MANAGER IS NOT NULL, IS_MANAGER = true]")
checkAnswer(df4, Seq(Row(1, "amy", 10000, 1000, true), Row(2, "david", 10000, 1300, true),
Row(6, "jen", 12000, 1200, true)))
val df5 = spark.table("h2.test.employee").filter($"is_manager".and($"salary" > 10000))
-
checkFiltersRemoved(df5)
-
- df5.queryExecution.optimizedPlan.collect {
- case _: DataSourceV2ScanRelation =>
- val expected_plan_fragment =
- "PushedFilters: [IS_MANAGER IS NOT NULL, SALARY IS NOT NULL, " +
- "IS_MANAGER = true, SALARY > 10000.00]"
- checkKeywordsExistsInExplain(df5, expected_plan_fragment)
- }
-
+ checkPushedInfo(df5, "PushedFilters: [IS_MANAGER IS NOT NULL, SALARY IS NOT NULL, " +
+ "IS_MANAGER = true, SALARY > 10000.00]")
checkAnswer(df5, Seq(Row(6, "jen", 12000, 1200, true)))
val df6 = spark.table("h2.test.employee").filter($"is_manager".or($"salary" > 10000))
-
checkFiltersRemoved(df6)
-
- df6.queryExecution.optimizedPlan.collect {
- case _: DataSourceV2ScanRelation =>
- val expected_plan_fragment =
- "PushedFilters: [(IS_MANAGER = true) OR (SALARY > 10000.00)], "
- checkKeywordsExistsInExplain(df6, expected_plan_fragment)
- }
-
+ checkPushedInfo(df6, "PushedFilters: [(IS_MANAGER = true) OR (SALARY > 10000.00)], ")
checkAnswer(df6, Seq(Row(1, "amy", 10000, 1000, true), Row(2, "alex", 12000, 1200, false),
Row(2, "david", 10000, 1300, true), Row(6, "jen", 12000, 1200, true)))
val df7 = spark.table("h2.test.employee").filter(not($"is_manager") === true)
-
checkFiltersRemoved(df7)
-
- df7.queryExecution.optimizedPlan.collect {
- case _: DataSourceV2ScanRelation =>
- val expected_plan_fragment =
- "PushedFilters: [IS_MANAGER IS NOT NULL, NOT (IS_MANAGER = true)], "
- checkKeywordsExistsInExplain(df7, expected_plan_fragment)
- }
-
+ checkPushedInfo(df7, "PushedFilters: [IS_MANAGER IS NOT NULL, NOT (IS_MANAGER = true)], ")
checkAnswer(df7, Seq(Row(1, "cathy", 9000, 1200, false), Row(2, "alex", 12000, 1200, false)))
val df8 = spark.table("h2.test.employee").filter($"is_manager" === true)
-
checkFiltersRemoved(df8)
-
- df8.queryExecution.optimizedPlan.collect {
- case _: DataSourceV2ScanRelation =>
- val expected_plan_fragment =
- "PushedFilters: [IS_MANAGER IS NOT NULL, IS_MANAGER = true], "
- checkKeywordsExistsInExplain(df8, expected_plan_fragment)
- }
-
+ checkPushedInfo(df8, "PushedFilters: [IS_MANAGER IS NOT NULL, IS_MANAGER = true], ")
checkAnswer(df8, Seq(Row(1, "amy", 10000, 1000, true),
Row(2, "david", 10000, 1300, true), Row(6, "jen", 12000, 1200, true)))
val df9 = spark.table("h2.test.employee")
.filter(when($"dept" > 1, true).when($"is_manager", false).otherwise($"dept" > 3))
-
checkFiltersRemoved(df9)
-
- df9.queryExecution.optimizedPlan.collect {
- case _: DataSourceV2ScanRelation =>
- val expected_plan_fragment =
- "PushedFilters: [CASE WHEN DEPT > 1 THEN TRUE WHEN IS_MANAGER = true THEN FALSE" +
- " ELSE DEPT > 3 END], "
- checkKeywordsExistsInExplain(df9, expected_plan_fragment)
- }
-
+ checkPushedInfo(df9, "PushedFilters: [CASE WHEN DEPT > 1 THEN TRUE " +
+ "WHEN IS_MANAGER = true THEN FALSE ELSE DEPT > 3 END], ")
checkAnswer(df9, Seq(Row(2, "alex", 12000, 1200, false),
Row(2, "david", 10000, 1300, true), Row(6, "jen", 12000, 1200, true)))
}
@@ -387,19 +325,13 @@ class JDBCV2Suite extends QueryTest with SharedSparkSession with ExplainSuiteHel
Seq(false, true).foreach { ansiMode =>
withSQLConf(SQLConf.ANSI_ENABLED.key -> ansiMode.toString) {
val df = spark.table("h2.test.people").filter($"id" + 1 > 1)
-
checkFiltersRemoved(df, ansiMode)
-
- df.queryExecution.optimizedPlan.collect {
- case _: DataSourceV2ScanRelation =>
- val expected_plan_fragment = if (ansiMode) {
- "PushedFilters: [ID IS NOT NULL, (ID + 1) > 1]"
- } else {
- "PushedFilters: [ID IS NOT NULL]"
- }
- checkKeywordsExistsInExplain(df, expected_plan_fragment)
+ val expectedPlanFragment = if (ansiMode) {
+ "PushedFilters: [ID IS NOT NULL, (ID + 1) > 1]"
+ } else {
+ "PushedFilters: [ID IS NOT NULL]"
}
-
+ checkPushedInfo(df, expectedPlanFragment)
checkAnswer(df, Seq(Row("fred", 1), Row("mary", 2)))
val df2 = spark.table("h2.test.people").filter($"id" + Int.MaxValue > 1)
@@ -432,18 +364,13 @@ class JDBCV2Suite extends QueryTest with SharedSparkSession with ExplainSuiteHel
|""".stripMargin)
checkFiltersRemoved(df3, ansiMode)
-
- df3.queryExecution.optimizedPlan.collect {
- case _: DataSourceV2ScanRelation =>
- val expected_plan_fragment = if (ansiMode) {
- "PushedFilters: [(CASE WHEN SALARY > 10000.00 THEN BONUS" +
- " ELSE BONUS + 200.0 END) > 1200.0]"
- } else {
- "PushedFilters: []"
- }
- checkKeywordsExistsInExplain(df3, expected_plan_fragment)
+ val expectedPlanFragment3 = if (ansiMode) {
+ "PushedFilters: [(CASE WHEN SALARY > 10000.00 THEN BONUS" +
+ " ELSE BONUS + 200.0 END) > 1200.0]"
+ } else {
+ "PushedFilters: []"
}
-
+ checkPushedInfo(df3, expectedPlanFragment3)
checkAnswer(df3,
Seq(Row(1, "cathy", 9000, 1200, false), Row(2, "david", 10000, 1300, true)))
}
@@ -587,14 +514,9 @@ class JDBCV2Suite extends QueryTest with SharedSparkSession with ExplainSuiteHel
" group by DePt")
checkFiltersRemoved(df)
checkAggregateRemoved(df)
- df.queryExecution.optimizedPlan.collect {
- case _: DataSourceV2ScanRelation =>
- val expected_plan_fragment =
- "PushedAggregates: [MAX(SALARY), AVG(BONUS)], " +
- "PushedFilters: [DEPT IS NOT NULL, DEPT > 0], " +
- "PushedGroupByColumns: [DEPT], "
- checkKeywordsExistsInExplain(df, expected_plan_fragment)
- }
+ checkPushedInfo(df, "PushedAggregates: [MAX(SALARY), AVG(BONUS)], " +
+ "PushedFilters: [DEPT IS NOT NULL, DEPT > 0], " +
+ "PushedGroupByColumns: [DEPT], ")
checkAnswer(df, Seq(Row(10000, 1100.0), Row(12000, 1250.0), Row(12000, 1200.0)))
}
@@ -613,14 +535,9 @@ class JDBCV2Suite extends QueryTest with SharedSparkSession with ExplainSuiteHel
val df = sql("select MAX(ID), AVG(ID) FROM h2.test.people where id > 0")
checkFiltersRemoved(df)
checkAggregateRemoved(df)
- df.queryExecution.optimizedPlan.collect {
- case _: DataSourceV2ScanRelation =>
- val expected_plan_fragment =
- "PushedAggregates: [MAX(ID), AVG(ID)], " +
- "PushedFilters: [ID IS NOT NULL, ID > 0], " +
- "PushedGroupByColumns: [], "
- checkKeywordsExistsInExplain(df, expected_plan_fragment)
- }
+ checkPushedInfo(df, "PushedAggregates: [MAX(ID), AVG(ID)], " +
+ "PushedFilters: [ID IS NOT NULL, ID > 0], " +
+ "PushedGroupByColumns: [], ")
checkAnswer(df, Seq(Row(2, 1.5)))
}
@@ -650,42 +567,28 @@ class JDBCV2Suite extends QueryTest with SharedSparkSession with ExplainSuiteHel
"PushedAggregates: [MAX(SALARY)]"
checkKeywordsExistsInExplain(df, expected_plan_fragment)
}
+ checkPushedInfo(df, "PushedAggregates: [MAX(SALARY)]")
checkAnswer(df, Seq(Row(12001)))
}
test("scan with aggregate push-down: COUNT(*)") {
val df = sql("select COUNT(*) FROM h2.test.employee")
checkAggregateRemoved(df)
- df.queryExecution.optimizedPlan.collect {
- case _: DataSourceV2ScanRelation =>
- val expected_plan_fragment =
- "PushedAggregates: [COUNT(*)]"
- checkKeywordsExistsInExplain(df, expected_plan_fragment)
- }
+ checkPushedInfo(df, "PushedAggregates: [COUNT(*)]")
checkAnswer(df, Seq(Row(5)))
}
test("scan with aggregate push-down: COUNT(col)") {
val df = sql("select COUNT(DEPT) FROM h2.test.employee")
checkAggregateRemoved(df)
- df.queryExecution.optimizedPlan.collect {
- case _: DataSourceV2ScanRelation =>
- val expected_plan_fragment =
- "PushedAggregates: [COUNT(DEPT)]"
- checkKeywordsExistsInExplain(df, expected_plan_fragment)
- }
+ checkPushedInfo(df, "PushedAggregates: [COUNT(DEPT)]")
checkAnswer(df, Seq(Row(5)))
}
test("scan with aggregate push-down: COUNT(DISTINCT col)") {
val df = sql("select COUNT(DISTINCT DEPT) FROM h2.test.employee")
checkAggregateRemoved(df)
- df.queryExecution.optimizedPlan.collect {
- case _: DataSourceV2ScanRelation =>
- val expected_plan_fragment =
- "PushedAggregates: [COUNT(DISTINCT DEPT)]"
- checkKeywordsExistsInExplain(df, expected_plan_fragment)
- }
+ checkPushedInfo(df, "PushedAggregates: [COUNT(DISTINCT DEPT)]")
checkAnswer(df, Seq(Row(3)))
}
@@ -704,52 +607,30 @@ class JDBCV2Suite extends QueryTest with SharedSparkSession with ExplainSuiteHel
test("scan with aggregate push-down: SUM without filer and group by") {
val df = sql("SELECT SUM(SALARY) FROM h2.test.employee")
checkAggregateRemoved(df)
- df.queryExecution.optimizedPlan.collect {
- case _: DataSourceV2ScanRelation =>
- val expected_plan_fragment =
- "PushedAggregates: [SUM(SALARY)]"
- checkKeywordsExistsInExplain(df, expected_plan_fragment)
- }
+ checkPushedInfo(df, "PushedAggregates: [SUM(SALARY)]")
checkAnswer(df, Seq(Row(53000)))
}
test("scan with aggregate push-down: DISTINCT SUM without filer and group by") {
val df = sql("SELECT SUM(DISTINCT SALARY) FROM h2.test.employee")
checkAggregateRemoved(df)
- df.queryExecution.optimizedPlan.collect {
- case _: DataSourceV2ScanRelation =>
- val expected_plan_fragment =
- "PushedAggregates: [SUM(DISTINCT SALARY)]"
- checkKeywordsExistsInExplain(df, expected_plan_fragment)
- }
+ checkPushedInfo(df, "PushedAggregates: [SUM(DISTINCT SALARY)]")
checkAnswer(df, Seq(Row(31000)))
}
test("scan with aggregate push-down: SUM with group by") {
val df = sql("SELECT SUM(SALARY) FROM h2.test.employee GROUP BY DEPT")
checkAggregateRemoved(df)
- df.queryExecution.optimizedPlan.collect {
- case _: DataSourceV2ScanRelation =>
- val expected_plan_fragment =
- "PushedAggregates: [SUM(SALARY)], " +
- "PushedFilters: [], " +
- "PushedGroupByColumns: [DEPT], "
- checkKeywordsExistsInExplain(df, expected_plan_fragment)
- }
+ checkPushedInfo(df, "PushedAggregates: [SUM(SALARY)], " +
+ "PushedFilters: [], PushedGroupByColumns: [DEPT], ")
checkAnswer(df, Seq(Row(19000), Row(22000), Row(12000)))
}
test("scan with aggregate push-down: DISTINCT SUM with group by") {
val df = sql("SELECT SUM(DISTINCT SALARY) FROM h2.test.employee GROUP BY DEPT")
checkAggregateRemoved(df)
- df.queryExecution.optimizedPlan.collect {
- case _: DataSourceV2ScanRelation =>
- val expected_plan_fragment =
- "PushedAggregates: [SUM(DISTINCT SALARY)], " +
- "PushedFilters: [], " +
- "PushedGroupByColumns: [DEPT], "
- checkKeywordsExistsInExplain(df, expected_plan_fragment)
- }
+ checkPushedInfo(df, "PushedAggregates: [SUM(DISTINCT SALARY)], " +
+ "PushedFilters: [], PushedGroupByColumns: [DEPT]")
checkAnswer(df, Seq(Row(19000), Row(22000), Row(12000)))
}
@@ -758,14 +639,8 @@ class JDBCV2Suite extends QueryTest with SharedSparkSession with ExplainSuiteHel
" group by DEPT, NAME")
checkFiltersRemoved(df)
checkAggregateRemoved(df)
- df.queryExecution.optimizedPlan.collect {
- case _: DataSourceV2ScanRelation =>
- val expected_plan_fragment =
- "PushedAggregates: [MAX(SALARY), MIN(BONUS)], " +
- "PushedFilters: [DEPT IS NOT NULL, DEPT > 0], " +
- "PushedGroupByColumns: [DEPT, NAME], "
- checkKeywordsExistsInExplain(df, expected_plan_fragment)
- }
+ checkPushedInfo(df, "PushedAggregates: [MAX(SALARY), MIN(BONUS)], " +
+ "PushedFilters: [DEPT IS NOT NULL, DEPT > 0], PushedGroupByColumns: [DEPT, NAME]")
checkAnswer(df, Seq(Row(9000, 1200), Row(12000, 1200), Row(10000, 1300),
Row(10000, 1000), Row(12000, 1200)))
}
@@ -778,14 +653,8 @@ class JDBCV2Suite extends QueryTest with SharedSparkSession with ExplainSuiteHel
}
assert(filters1.isEmpty)
checkAggregateRemoved(df1)
- df1.queryExecution.optimizedPlan.collect {
- case _: DataSourceV2ScanRelation =>
- val expected_plan_fragment =
- "PushedAggregates: [MAX(SALARY)], " +
- "PushedFilters: [DEPT IS NOT NULL, DEPT > 0], " +
- "PushedGroupByColumns: [DEPT, NAME], "
- checkKeywordsExistsInExplain(df1, expected_plan_fragment)
- }
+ checkPushedInfo(df1, "PushedAggregates: [MAX(SALARY)], " +
+ "PushedFilters: [DEPT IS NOT NULL, DEPT > 0], PushedGroupByColumns: [DEPT, NAME]")
checkAnswer(df1, Seq(Row("1#amy", 10000), Row("1#cathy", 9000), Row("2#alex", 12000),
Row("2#david", 10000), Row("6#jen", 12000)))
@@ -796,30 +665,16 @@ class JDBCV2Suite extends QueryTest with SharedSparkSession with ExplainSuiteHel
}
assert(filters2.isEmpty)
checkAggregateRemoved(df2)
- df2.queryExecution.optimizedPlan.collect {
- case _: DataSourceV2ScanRelation =>
- val expected_plan_fragment =
- "PushedAggregates: [MAX(SALARY), MIN(BONUS)], " +
- "PushedFilters: [DEPT IS NOT NULL, DEPT > 0], " +
- "PushedGroupByColumns: [DEPT, NAME], "
- checkKeywordsExistsInExplain(df2, expected_plan_fragment)
- }
+ checkPushedInfo(df2, "PushedAggregates: [MAX(SALARY), MIN(BONUS)], " +
+ "PushedFilters: [DEPT IS NOT NULL, DEPT > 0], PushedGroupByColumns: [DEPT, NAME]")
checkAnswer(df2, Seq(Row("1#amy", 11000), Row("1#cathy", 10200), Row("2#alex", 13200),
Row("2#david", 11300), Row("6#jen", 13200)))
val df3 = sql("select concat_ws('#', DEPT, NAME), MAX(SALARY) + MIN(BONUS)" +
" FROM h2.test.employee where dept > 0 group by concat_ws('#', DEPT, NAME)")
- val filters3 = df3.queryExecution.optimizedPlan.collect {
- case f: Filter => f
- }
- assert(filters3.isEmpty)
+ checkFiltersRemoved(df3)
checkAggregateRemoved(df3, false)
- df3.queryExecution.optimizedPlan.collect {
- case _: DataSourceV2ScanRelation =>
- val expected_plan_fragment =
- "PushedFilters: [DEPT IS NOT NULL, DEPT > 0], "
- checkKeywordsExistsInExplain(df3, expected_plan_fragment)
- }
+ checkPushedInfo(df3, "PushedFilters: [DEPT IS NOT NULL, DEPT > 0], ")
checkAnswer(df3, Seq(Row("1#amy", 11000), Row("1#cathy", 10200), Row("2#alex", 13200),
Row("2#david", 11300), Row("6#jen", 13200)))
}
@@ -827,19 +682,11 @@ class JDBCV2Suite extends QueryTest with SharedSparkSession with ExplainSuiteHel
test("scan with aggregate push-down: with having clause") {
val df = sql("select MAX(SALARY), MIN(BONUS) FROM h2.test.employee where dept > 0" +
" group by DEPT having MIN(BONUS) > 1000")
- val filters = df.queryExecution.optimizedPlan.collect {
- case f: Filter => f // filter over aggregate not push down
- }
- assert(filters.nonEmpty)
+ // filter over aggregate not push down
+ checkFiltersRemoved(df, false)
checkAggregateRemoved(df)
- df.queryExecution.optimizedPlan.collect {
- case _: DataSourceV2ScanRelation =>
- val expected_plan_fragment =
- "PushedAggregates: [MAX(SALARY), MIN(BONUS)], " +
- "PushedFilters: [DEPT IS NOT NULL, DEPT > 0], " +
- "PushedGroupByColumns: [DEPT], "
- checkKeywordsExistsInExplain(df, expected_plan_fragment)
- }
+ checkPushedInfo(df, "PushedAggregates: [MAX(SALARY), MIN(BONUS)], " +
+ "PushedFilters: [DEPT IS NOT NULL, DEPT > 0], PushedGroupByColumns: [DEPT]")
checkAnswer(df, Seq(Row(12000, 1200), Row(12000, 1200)))
}
@@ -848,14 +695,8 @@ class JDBCV2Suite extends QueryTest with SharedSparkSession with ExplainSuiteHel
.groupBy($"DEPT")
.min("SALARY").as("total")
checkAggregateRemoved(df)
- df.queryExecution.optimizedPlan.collect {
- case _: DataSourceV2ScanRelation =>
- val expected_plan_fragment =
- "PushedAggregates: [MIN(SALARY)], " +
- "PushedFilters: [], " +
- "PushedGroupByColumns: [DEPT], "
- checkKeywordsExistsInExplain(df, expected_plan_fragment)
- }
+ checkPushedInfo(df, "PushedAggregates: [MIN(SALARY)], " +
+ "PushedFilters: [], PushedGroupByColumns: [DEPT]")
checkAnswer(df, Seq(Row(1, 9000), Row(2, 10000), Row(6, 12000)))
}
@@ -867,19 +708,10 @@ class JDBCV2Suite extends QueryTest with SharedSparkSession with ExplainSuiteHel
.agg(sum($"SALARY").as("total"))
.filter($"total" > 1000)
.orderBy($"total")
- val filters = query.queryExecution.optimizedPlan.collect {
- case f: Filter => f
- }
- assert(filters.nonEmpty) // filter over aggregate not pushed down
- checkAggregateRemoved(df)
- query.queryExecution.optimizedPlan.collect {
- case _: DataSourceV2ScanRelation =>
- val expected_plan_fragment =
- "PushedAggregates: [SUM(SALARY)], " +
- "PushedFilters: [DEPT IS NOT NULL, DEPT > 0], " +
- "PushedGroupByColumns: [DEPT], "
- checkKeywordsExistsInExplain(query, expected_plan_fragment)
- }
+ checkFiltersRemoved(query, false)// filter over aggregate not pushed down
+ checkAggregateRemoved(query)
+ checkPushedInfo(query, "PushedAggregates: [SUM(SALARY)], " +
+ "PushedFilters: [DEPT IS NOT NULL, DEPT > 0], PushedGroupByColumns: [DEPT]")
checkAnswer(query, Seq(Row(6, 12000), Row(1, 19000), Row(2, 22000)))
}
@@ -888,12 +720,7 @@ class JDBCV2Suite extends QueryTest with SharedSparkSession with ExplainSuiteHel
val decrease = udf { (x: Double, y: Double) => x - y }
val query = df.select(decrease(sum($"SALARY"), sum($"BONUS")).as("value"))
checkAggregateRemoved(query)
- query.queryExecution.optimizedPlan.collect {
- case _: DataSourceV2ScanRelation =>
- val expected_plan_fragment =
- "PushedAggregates: [SUM(SALARY), SUM(BONUS)]"
- checkKeywordsExistsInExplain(query, expected_plan_fragment)
- }
+ checkPushedInfo(query, "PushedAggregates: [SUM(SALARY), SUM(BONUS)], ")
checkAnswer(query, Seq(Row(47100.0)))
}
@@ -915,14 +742,8 @@ class JDBCV2Suite extends QueryTest with SharedSparkSession with ExplainSuiteHel
" group by DePt")
checkFiltersRemoved(df)
checkAggregateRemoved(df)
- df.queryExecution.optimizedPlan.collect {
- case _: DataSourceV2ScanRelation =>
- val expected_plan_fragment =
- "PushedAggregates: [VAR_POP(BONUS), VAR_SAMP(BONUS)], " +
- "PushedFilters: [DEPT IS NOT NULL, DEPT > 0], " +
- "PushedGroupByColumns: [DEPT], "
- checkKeywordsExistsInExplain(df, expected_plan_fragment)
- }
+ checkPushedInfo(df, "PushedAggregates: [VAR_POP(BONUS), VAR_SAMP(BONUS)], " +
+ "PushedFilters: [DEPT IS NOT NULL, DEPT > 0], PushedGroupByColumns: [DEPT]")
checkAnswer(df, Seq(Row(10000d, 20000d), Row(2500d, 5000d), Row(0d, null)))
}
@@ -931,14 +752,8 @@ class JDBCV2Suite extends QueryTest with SharedSparkSession with ExplainSuiteHel
" where dept > 0 group by DePt")
checkFiltersRemoved(df)
checkAggregateRemoved(df)
- df.queryExecution.optimizedPlan.collect {
- case _: DataSourceV2ScanRelation =>
- val expected_plan_fragment =
- "PushedAggregates: [STDDEV_POP(BONUS), STDDEV_SAMP(BONUS)], " +
- "PushedFilters: [DEPT IS NOT NULL, DEPT > 0], " +
- "PushedGroupByColumns: [DEPT], "
- checkKeywordsExistsInExplain(df, expected_plan_fragment)
- }
+ checkPushedInfo(df, "PushedAggregates: [STDDEV_POP(BONUS), STDDEV_SAMP(BONUS)], " +
+ "PushedFilters: [DEPT IS NOT NULL, DEPT > 0], PushedGroupByColumns: [DEPT]")
checkAnswer(df, Seq(Row(100d, 141.4213562373095d), Row(50d, 70.71067811865476d), Row(0d, null)))
}
@@ -947,14 +762,8 @@ class JDBCV2Suite extends QueryTest with SharedSparkSession with ExplainSuiteHel
" FROM h2.test.employee where dept > 0 group by DePt")
checkFiltersRemoved(df)
checkAggregateRemoved(df)
- df.queryExecution.optimizedPlan.collect {
- case _: DataSourceV2ScanRelation =>
- val expected_plan_fragment =
- "PushedAggregates: [COVAR_POP(BONUS, BONUS), COVAR_SAMP(BONUS, BONUS)], " +
- "PushedFilters: [DEPT IS NOT NULL, DEPT > 0], " +
- "PushedGroupByColumns: [DEPT], "
- checkKeywordsExistsInExplain(df, expected_plan_fragment)
- }
+ checkPushedInfo(df, "PushedAggregates: [COVAR_POP(BONUS, BONUS), COVAR_SAMP(BONUS, BONUS)], " +
+ "PushedFilters: [DEPT IS NOT NULL, DEPT > 0], PushedGroupByColumns: [DEPT]")
checkAnswer(df, Seq(Row(10000d, 20000d), Row(2500d, 5000d), Row(0d, null)))
}
@@ -963,14 +772,8 @@ class JDBCV2Suite extends QueryTest with SharedSparkSession with ExplainSuiteHel
" group by DePt")
checkFiltersRemoved(df)
checkAggregateRemoved(df)
- df.queryExecution.optimizedPlan.collect {
- case _: DataSourceV2ScanRelation =>
- val expected_plan_fragment =
- "PushedAggregates: [CORR(BONUS, BONUS)], " +
- "PushedFilters: [DEPT IS NOT NULL, DEPT > 0], " +
- "PushedGroupByColumns: [DEPT], "
- checkKeywordsExistsInExplain(df, expected_plan_fragment)
- }
+ checkPushedInfo(df, "PushedAggregates: [CORR(BONUS, BONUS)], " +
+ "PushedFilters: [DEPT IS NOT NULL, DEPT > 0], PushedGroupByColumns: [DEPT]")
checkAnswer(df, Seq(Row(1d), Row(1d), Row(null)))
}
@@ -1032,15 +835,11 @@ class JDBCV2Suite extends QueryTest with SharedSparkSession with ExplainSuiteHel
|FROM h2.test.employee GROUP BY DEPT
""".stripMargin)
checkAggregateRemoved(df)
- df.queryExecution.optimizedPlan.collect {
- case _: DataSourceV2ScanRelation =>
- val expected_plan_fragment =
- "PushedAggregates: [COUNT(CASE WHEN (SALARY > 8000.00) AND (SALARY < 10000.00)" +
- " THEN SALARY ELSE 0.00 END), COUNT(CAS..., " +
- "PushedFilters: [], " +
- "PushedGroupByColumns: [DEPT], "
- checkKeywordsExistsInExplain(df, expected_plan_fragment)
- }
+ checkPushedInfo(df,
+ "PushedAggregates: [COUNT(CASE WHEN (SALARY > 8000.00) AND (SALARY < 10000.00)" +
+ " THEN SALARY ELSE 0.00 END), COUNT(CAS..., " +
+ "PushedFilters: [], " +
+ "PushedGroupByColumns: [DEPT], ")
checkAnswer(df, Seq(Row(1, 1, 1, 1, 1, 0d, 12000d, 0d, 12000d, 12000d, 0d, 0d, 2, 0d),
Row(2, 2, 2, 2, 2, 0d, 10000d, 0d, 10000d, 10000d, 0d, 0d, 2, 0d),
Row(2, 2, 2, 2, 2, 0d, 12000d, 0d, 12000d, 12000d, 0d, 0d, 3, 0d)))
@@ -1051,17 +850,14 @@ class JDBCV2Suite extends QueryTest with SharedSparkSession with ExplainSuiteHel
withSQLConf(SQLConf.ANSI_ENABLED.key -> ansiMode.toString) {
val df = sql("SELECT SUM(2147483647 + DEPT) FROM h2.test.employee")
checkAggregateRemoved(df, ansiMode)
- val expected_plan_fragment = if (ansiMode) {
+ val expectedPlanFragment = if (ansiMode) {
"PushedAggregates: [SUM(2147483647 + DEPT)], " +
"PushedFilters: [], " +
"PushedGroupByColumns: []"
} else {
"PushedFilters: []"
}
- df.queryExecution.optimizedPlan.collect {
- case _: DataSourceV2ScanRelation =>
- checkKeywordsExistsInExplain(df, expected_plan_fragment)
- }
+ checkPushedInfo(df, expectedPlanFragment)
if (ansiMode) {
val e = intercept[SparkException] {
checkAnswer(df, Seq(Row(-10737418233L)))
@@ -1080,12 +876,7 @@ class JDBCV2Suite extends QueryTest with SharedSparkSession with ExplainSuiteHel
val decrease = udf { (x: Double, y: Double) => x - y }
val query = df.select(sum(decrease($"SALARY", $"BONUS")).as("value"))
checkAggregateRemoved(query, false)
- query.queryExecution.optimizedPlan.collect {
- case _: DataSourceV2ScanRelation =>
- val expected_plan_fragment =
- "PushedFilters: []"
- checkKeywordsExistsInExplain(query, expected_plan_fragment)
- }
+ checkPushedInfo(query, "PushedFilters: []")
checkAnswer(query, Seq(Row(47100.0)))
}
@@ -1121,12 +912,7 @@ class JDBCV2Suite extends QueryTest with SharedSparkSession with ExplainSuiteHel
checkAnswer(sql("SELECT `dept id` FROM h2.test.dept"), Seq(Row(1), Row(2)))
val df = sql("SELECT COUNT(`dept id`) FROM h2.test.dept")
checkAggregateRemoved(df)
- df.queryExecution.optimizedPlan.collect {
- case _: DataSourceV2ScanRelation =>
- val expected_plan_fragment =
- "PushedAggregates: [COUNT(`dept id`)]"
- checkKeywordsExistsInExplain(df, expected_plan_fragment)
- }
+ checkPushedInfo(df, "PushedAggregates: [COUNT(`dept id`)]")
checkAnswer(df, Seq(Row(2)))
}
@@ -1135,12 +921,7 @@ class JDBCV2Suite extends QueryTest with SharedSparkSession with ExplainSuiteHel
checkAnswer(sql("SELECT `名` FROM h2.test.person"), Seq(Row(1), Row(2)))
val df = sql("SELECT COUNT(`名`) FROM h2.test.person")
checkAggregateRemoved(df)
- df.queryExecution.optimizedPlan.collect {
- case _: DataSourceV2ScanRelation =>
- val expected_plan_fragment =
- "PushedAggregates: [COUNT(`名`)]"
- checkKeywordsExistsInExplain(df, expected_plan_fragment)
- }
+ checkPushedInfo(df, "PushedAggregates: [COUNT(`名`)]")
checkAnswer(df, Seq(Row(2)))
// scalastyle:on
}
@@ -1154,12 +935,7 @@ class JDBCV2Suite extends QueryTest with SharedSparkSession with ExplainSuiteHel
.table("h2.test.employee")
.agg(sum($"SALARY").as("sum"), avg($"SALARY").as("avg"), count($"SALARY").as("count"))
checkAggregateRemoved(df)
- df.queryExecution.optimizedPlan.collect {
- case _: DataSourceV2ScanRelation =>
- val expected_plan_fragment =
- "PushedAggregates: [SUM(SALARY), AVG(SALARY), COUNT(SALARY)]"
- checkKeywordsExistsInExplain(df, expected_plan_fragment)
- }
+ checkPushedInfo(df, "PushedAggregates: [SUM(SALARY), AVG(SALARY), COUNT(SALARY)]")
checkAnswer(df, Seq(Row(53000.00, 10600.000000, 5)))
val df2 = spark.read
@@ -1171,12 +947,7 @@ class JDBCV2Suite extends QueryTest with SharedSparkSession with ExplainSuiteHel
.groupBy($"name")
.agg(sum($"SALARY").as("sum"), avg($"SALARY").as("avg"), count($"SALARY").as("count"))
checkAggregateRemoved(df)
- df.queryExecution.optimizedPlan.collect {
- case _: DataSourceV2ScanRelation =>
- val expected_plan_fragment =
- "PushedAggregates: [SUM(SALARY), AVG(SALARY), COUNT(SALARY)]"
- checkKeywordsExistsInExplain(df, expected_plan_fragment)
- }
+ checkPushedInfo(df, "PushedAggregates: [SUM(SALARY), AVG(SALARY), COUNT(SALARY)]")
checkAnswer(df2, Seq(
Row("alex", 12000.00, 12000.000000, 1),
Row("amy", 10000.00, 10000.000000, 1),
@@ -1194,12 +965,7 @@ class JDBCV2Suite extends QueryTest with SharedSparkSession with ExplainSuiteHel
.table("h2.test.employee")
.agg(sum($"SALARY").as("sum"), avg($"SALARY").as("avg"), count($"SALARY").as("count"))
checkAggregateRemoved(df, false)
- df.queryExecution.optimizedPlan.collect {
- case _: DataSourceV2ScanRelation =>
- val expected_plan_fragment =
- "PushedAggregates: [SUM(SALARY), COUNT(SALARY)]"
- checkKeywordsExistsInExplain(df, expected_plan_fragment)
- }
+ checkPushedInfo(df, "PushedAggregates: [SUM(SALARY), COUNT(SALARY)]")
checkAnswer(df, Seq(Row(53000.00, 10600.000000, 5)))
val df2 = spark.read
@@ -1211,12 +977,7 @@ class JDBCV2Suite extends QueryTest with SharedSparkSession with ExplainSuiteHel
.groupBy($"name")
.agg(sum($"SALARY").as("sum"), avg($"SALARY").as("avg"), count($"SALARY").as("count"))
checkAggregateRemoved(df, false)
- df.queryExecution.optimizedPlan.collect {
- case _: DataSourceV2ScanRelation =>
- val expected_plan_fragment =
- "PushedAggregates: [SUM(SALARY), COUNT(SALARY)]"
- checkKeywordsExistsInExplain(df, expected_plan_fragment)
- }
+ checkPushedInfo(df, "PushedAggregates: [SUM(SALARY), COUNT(SALARY)]")
checkAnswer(df2, Seq(
Row("alex", 12000.00, 12000.000000, 1),
Row("amy", 10000.00, 10000.000000, 1),
@@ -1240,12 +1001,8 @@ class JDBCV2Suite extends QueryTest with SharedSparkSession with ExplainSuiteHel
.agg(sum($"mySalary").as("total"))
.filter($"total" > 1000)
checkAggregateRemoved(df)
- df.queryExecution.optimizedPlan.collect {
- case _: DataSourceV2ScanRelation =>
- val expectedPlanFragment =
- "PushedAggregates: [SUM(SALARY)], PushedFilters: [], PushedGroupByColumns: [DEPT]"
- checkKeywordsExistsInExplain(df, expectedPlanFragment)
- }
+ checkPushedInfo(df,
+ "PushedAggregates: [SUM(SALARY)], PushedFilters: [], PushedGroupByColumns: [DEPT]")
checkAnswer(df, Seq(Row(1, 19000.00), Row(2, 22000.00), Row(6, 12000.00)))
val df2 = spark.table("h2.test.employee")
@@ -1254,12 +1011,8 @@ class JDBCV2Suite extends QueryTest with SharedSparkSession with ExplainSuiteHel
.agg(sum($"mySalary").as("total"))
.filter($"total" > 1000)
checkAggregateRemoved(df2)
- df2.queryExecution.optimizedPlan.collect {
- case _: DataSourceV2ScanRelation =>
- val expectedPlanFragment =
- "PushedAggregates: [SUM(SALARY)], PushedFilters: [], PushedGroupByColumns: [DEPT]"
- checkKeywordsExistsInExplain(df2, expectedPlanFragment)
- }
+ checkPushedInfo(df2,
+ "PushedAggregates: [SUM(SALARY)], PushedFilters: [], PushedGroupByColumns: [DEPT]")
checkAnswer(df2, Seq(Row(1, 19000.00), Row(2, 22000.00), Row(6, 12000.00)))
}
@@ -1275,12 +1028,8 @@ class JDBCV2Suite extends QueryTest with SharedSparkSession with ExplainSuiteHel
.agg(sum($"mySalary").as("total"))
.filter($"total" > 1000)
checkAggregateRemoved(df, false)
- df.queryExecution.optimizedPlan.collect {
- case _: DataSourceV2ScanRelation =>
- val expectedPlanFragment =
- "PushedAggregates: [SUM(SALARY)], PushedFilters: [], PushedGroupByColumns: [NAME]"
- checkKeywordsExistsInExplain(df, expectedPlanFragment)
- }
+ checkPushedInfo(df,
+ "PushedAggregates: [SUM(SALARY)], PushedFilters: [], PushedGroupByColumns: [NAME]")
checkAnswer(df, Seq(Row("alex", 12000.00), Row("amy", 10000.00),
Row("cathy", 9000.00), Row("david", 10000.00), Row("jen", 12000.00)))
@@ -1295,12 +1044,8 @@ class JDBCV2Suite extends QueryTest with SharedSparkSession with ExplainSuiteHel
.agg(sum($"mySalary").as("total"))
.filter($"total" > 1000)
checkAggregateRemoved(df2, false)
- df2.queryExecution.optimizedPlan.collect {
- case _: DataSourceV2ScanRelation =>
- val expectedPlanFragment =
- "PushedAggregates: [SUM(SALARY)], PushedFilters: [], PushedGroupByColumns: [NAME]"
- checkKeywordsExistsInExplain(df2, expectedPlanFragment)
- }
+ checkPushedInfo(df2,
+ "PushedAggregates: [SUM(SALARY)], PushedFilters: [], PushedGroupByColumns: [NAME]")
checkAnswer(df2, Seq(Row("alex", 12000.00), Row("amy", 10000.00),
Row("cathy", 9000.00), Row("david", 10000.00), Row("jen", 12000.00)))
}
---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@spark.apache.org
For additional commands, e-mail: commits-help@spark.apache.org