You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@spark.apache.org by we...@apache.org on 2022/03/23 13:43:57 UTC

[spark] branch branch-3.3 updated: [SPARK-37483][SQL][FOLLOWUP] Rename `pushedTopN` to `PushedTopN` and improve JDBCV2Suite

This is an automated email from the ASF dual-hosted git repository.

wenchen pushed a commit to branch branch-3.3
in repository https://gitbox.apache.org/repos/asf/spark.git


The following commit(s) were added to refs/heads/branch-3.3 by this push:
     new 737077a  [SPARK-37483][SQL][FOLLOWUP] Rename `pushedTopN` to `PushedTopN` and improve JDBCV2Suite
737077a is described below

commit 737077af04c1a62a99bfd5dba731174dd29f97f4
Author: Jiaan Geng <be...@163.com>
AuthorDate: Wed Mar 23 21:40:44 2022 +0800

    [SPARK-37483][SQL][FOLLOWUP] Rename `pushedTopN` to `PushedTopN` and improve JDBCV2Suite
    
    ### What changes were proposed in this pull request?
    This PR fix three issues.
    **First**, create method `checkPushedInfo` and `checkSortRemoved` to reuse code.
    **Second**, remove method `checkPushedLimit`, because `checkPushedInfo` can cover it.
    **Third**, rename `pushedTopN` to `PushedTopN`, so as consistent with other pushed information.
    
    ### Why are the changes needed?
    Reuse code and let pushed information more correctly.
    
    ### Does this PR introduce _any_ user-facing change?
    'No'. New feature and improve the tests.
    
    ### How was this patch tested?
    Adjust existing tests.
    
    Closes #35921 from beliefer/SPARK-37483_followup.
    
    Authored-by: Jiaan Geng <be...@163.com>
    Signed-off-by: Wenchen Fan <we...@databricks.com>
    (cherry picked from commit 4fe55c522d7bc34487f21d0e69fc7c230d61a3bf)
    Signed-off-by: Wenchen Fan <we...@databricks.com>
---
 .../spark/sql/execution/DataSourceScanExec.scala   |   2 +-
 .../org/apache/spark/sql/jdbc/JDBCV2Suite.scala    | 517 ++++++---------------
 2 files changed, 132 insertions(+), 387 deletions(-)

diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/DataSourceScanExec.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/DataSourceScanExec.scala
index e6de7d0..5067cd7 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/DataSourceScanExec.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/DataSourceScanExec.scala
@@ -148,7 +148,7 @@ case class RowDataSourceScanExec(
         val pushedTopN =
           s"ORDER BY ${seqToString(pushedDownOperators.sortValues.map(_.describe()))}" +
           s" LIMIT ${pushedDownOperators.limit.get}"
-        Some("pushedTopN" -> pushedTopN)
+        Some("PushedTopN" -> pushedTopN)
     } else {
       pushedDownOperators.limit.map(value => "PushedLimit" -> s"LIMIT $value")
     }
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/jdbc/JDBCV2Suite.scala b/sql/core/src/test/scala/org/apache/spark/sql/jdbc/JDBCV2Suite.scala
index 31fdb02..e7e9174 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/jdbc/JDBCV2Suite.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/jdbc/JDBCV2Suite.scala
@@ -24,7 +24,6 @@ import org.apache.spark.{SparkConf, SparkException}
 import org.apache.spark.sql.{DataFrame, ExplainSuiteHelper, QueryTest, Row}
 import org.apache.spark.sql.catalyst.analysis.CannotReplaceMissingTableException
 import org.apache.spark.sql.catalyst.plans.logical.{Aggregate, Filter, Sort}
-import org.apache.spark.sql.connector.expressions.{FieldReference, NullOrdering, SortDirection, SortValue}
 import org.apache.spark.sql.execution.datasources.v2.{DataSourceV2ScanRelation, V1ScanWrapper}
 import org.apache.spark.sql.execution.datasources.v2.jdbc.JDBCTableCatalog
 import org.apache.spark.sql.functions.{avg, count, count_distinct, lit, not, sum, udf, when}
@@ -110,13 +109,20 @@ class JDBCV2Suite extends QueryTest with SharedSparkSession with ExplainSuiteHel
     checkAnswer(sql("SELECT name, id FROM h2.test.people"), Seq(Row("fred", 1), Row("mary", 2)))
   }
 
+  private def checkPushedInfo(df: DataFrame, expectedPlanFragment: String): Unit = {
+    df.queryExecution.optimizedPlan.collect {
+      case _: DataSourceV2ScanRelation =>
+        checkKeywordsExistsInExplain(df, expectedPlanFragment)
+    }
+  }
+
   // TABLESAMPLE ({integer_expression | decimal_expression} PERCENT) and
   // TABLESAMPLE (BUCKET integer_expression OUT OF integer_expression)
   // are tested in JDBC dialect tests because TABLESAMPLE is not supported by all the DBMS
   test("TABLESAMPLE (integer_expression ROWS) is the same as LIMIT") {
     val df = sql("SELECT NAME FROM h2.test.employee TABLESAMPLE (3 ROWS)")
     checkSchemaNames(df, Seq("NAME"))
-    checkPushedLimit(df, Some(3))
+    checkPushedInfo(df, "PushedFilters: [], PushedLimit: LIMIT 3, ")
     checkAnswer(df, Seq(Row("amy"), Row("alex"), Row("cathy")))
   }
 
@@ -130,7 +136,8 @@ class JDBCV2Suite extends QueryTest with SharedSparkSession with ExplainSuiteHel
   test("simple scan with LIMIT") {
     val df1 = spark.read.table("h2.test.employee")
       .where($"dept" === 1).limit(1)
-    checkPushedLimit(df1, Some(1))
+    checkPushedInfo(df1,
+      "PushedFilters: [DEPT IS NOT NULL, DEPT = 1], PushedLimit: LIMIT 1, ")
     checkAnswer(df1, Seq(Row(1, "amy", 10000.00, 1000.0, true)))
 
     val df2 = spark.read
@@ -141,19 +148,22 @@ class JDBCV2Suite extends QueryTest with SharedSparkSession with ExplainSuiteHel
       .table("h2.test.employee")
       .filter($"dept" > 1)
       .limit(1)
-    checkPushedLimit(df2, Some(1))
+    checkPushedInfo(df2,
+      "PushedFilters: [DEPT IS NOT NULL, DEPT > 1], PushedLimit: LIMIT 1, ")
     checkAnswer(df2, Seq(Row(2, "alex", 12000.00, 1200.0, false)))
 
     val df3 = sql("SELECT name FROM h2.test.employee WHERE dept > 1 LIMIT 1")
     checkSchemaNames(df3, Seq("NAME"))
-    checkPushedLimit(df3, Some(1))
+    checkPushedInfo(df3,
+      "PushedFilters: [DEPT IS NOT NULL, DEPT > 1], PushedLimit: LIMIT 1, ")
     checkAnswer(df3, Seq(Row("alex")))
 
     val df4 = spark.read
       .table("h2.test.employee")
       .groupBy("DEPT").sum("SALARY")
       .limit(1)
-    checkPushedLimit(df4, None)
+    checkPushedInfo(df4,
+      "PushedAggregates: [SUM(SALARY)], PushedFilters: [], PushedGroupByColumns: [DEPT], ")
     checkAnswer(df4, Seq(Row(1, 19000.00)))
 
     val name = udf { (x: String) => x.matches("cat|dav|amy") }
@@ -164,24 +174,18 @@ class JDBCV2Suite extends QueryTest with SharedSparkSession with ExplainSuiteHel
       .filter(name($"shortName"))
       .limit(1)
     // LIMIT is pushed down only if all the filters are pushed down
-    checkPushedLimit(df5, None)
+    checkPushedInfo(df5, "PushedFilters: [], ")
     checkAnswer(df5, Seq(Row(10000.00, 1000.0, "amy")))
   }
 
-  private def checkPushedLimit(df: DataFrame, limit: Option[Int] = None,
-      sortValues: Seq[SortValue] = Nil): Unit = {
-    df.queryExecution.optimizedPlan.collect {
-      case relation: DataSourceV2ScanRelation => relation.scan match {
-        case v1: V1ScanWrapper =>
-          assert(v1.pushedDownOperators.limit === limit)
-          assert(v1.pushedDownOperators.sortValues === sortValues)
-      }
+  private def checkSortRemoved(df: DataFrame, removed: Boolean = true): Unit = {
+    val sorts = df.queryExecution.optimizedPlan.collect {
+      case s: Sort => s
     }
-    if (sortValues.nonEmpty) {
-      val sorts = df.queryExecution.optimizedPlan.collect {
-        case s: Sort => s
-      }
+    if (removed) {
       assert(sorts.isEmpty)
+    } else {
+      assert(sorts.nonEmpty)
     }
   }
 
@@ -190,12 +194,16 @@ class JDBCV2Suite extends QueryTest with SharedSparkSession with ExplainSuiteHel
       .table("h2.test.employee")
       .sort("salary")
       .limit(1)
-    checkPushedLimit(df1, Some(1), createSortValues())
+    checkSortRemoved(df1)
+    checkPushedInfo(df1,
+      "PushedFilters: [], PushedTopN: ORDER BY [salary ASC NULLS FIRST] LIMIT 1, ")
     checkAnswer(df1, Seq(Row(1, "cathy", 9000.00, 1200.0, false)))
 
     val df2 = spark.read.table("h2.test.employee")
       .where($"dept" === 1).orderBy($"salary").limit(1)
-    checkPushedLimit(df2, Some(1), createSortValues())
+    checkSortRemoved(df2)
+    checkPushedInfo(df2, "PushedFilters: [DEPT IS NOT NULL, DEPT = 1], " +
+      "PushedTopN: ORDER BY [salary ASC NULLS FIRST] LIMIT 1, ")
     checkAnswer(df2, Seq(Row(1, "cathy", 9000.00, 1200.0, false)))
 
     val df3 = spark.read
@@ -207,19 +215,23 @@ class JDBCV2Suite extends QueryTest with SharedSparkSession with ExplainSuiteHel
       .filter($"dept" > 1)
       .orderBy($"salary".desc)
       .limit(1)
-    checkPushedLimit(
-      df3, Some(1), createSortValues(SortDirection.DESCENDING, NullOrdering.NULLS_LAST))
+    checkSortRemoved(df3)
+    checkPushedInfo(df3, "PushedFilters: [DEPT IS NOT NULL, DEPT > 1], " +
+      "PushedTopN: ORDER BY [salary DESC NULLS LAST] LIMIT 1, ")
     checkAnswer(df3, Seq(Row(2, "alex", 12000.00, 1200.0, false)))
 
     val df4 =
       sql("SELECT name FROM h2.test.employee WHERE dept > 1 ORDER BY salary NULLS LAST LIMIT 1")
     checkSchemaNames(df4, Seq("NAME"))
-    checkPushedLimit(df4, Some(1), createSortValues(nullOrdering = NullOrdering.NULLS_LAST))
+    checkSortRemoved(df4)
+    checkPushedInfo(df4, "PushedFilters: [DEPT IS NOT NULL, DEPT > 1], " +
+      "PushedTopN: ORDER BY [salary ASC NULLS LAST] LIMIT 1, ")
     checkAnswer(df4, Seq(Row("david")))
 
     val df5 = spark.read.table("h2.test.employee")
       .where($"dept" === 1).orderBy($"salary")
-    checkPushedLimit(df5, None)
+    checkSortRemoved(df5, false)
+    checkPushedInfo(df5, "PushedFilters: [DEPT IS NOT NULL, DEPT = 1], ")
     checkAnswer(df5,
       Seq(Row(1, "cathy", 9000.00, 1200.0, false), Row(1, "amy", 10000.00, 1000.0, true)))
 
@@ -228,7 +240,9 @@ class JDBCV2Suite extends QueryTest with SharedSparkSession with ExplainSuiteHel
       .groupBy("DEPT").sum("SALARY")
       .orderBy("DEPT")
       .limit(1)
-    checkPushedLimit(df6)
+    checkSortRemoved(df6, false)
+    checkPushedInfo(df6, "PushedAggregates: [SUM(SALARY)]," +
+      " PushedFilters: [], PushedGroupByColumns: [DEPT], ")
     checkAnswer(df6, Seq(Row(1, 19000.00)))
 
     val name = udf { (x: String) => x.matches("cat|dav|amy") }
@@ -240,145 +254,69 @@ class JDBCV2Suite extends QueryTest with SharedSparkSession with ExplainSuiteHel
       .sort($"SALARY".desc)
       .limit(1)
     // LIMIT is pushed down only if all the filters are pushed down
-    checkPushedLimit(df7)
+    checkSortRemoved(df7, false)
+    checkPushedInfo(df7, "PushedFilters: [], ")
     checkAnswer(df7, Seq(Row(10000.00, 1000.0, "amy")))
 
     val df8 = spark.read
       .table("h2.test.employee")
       .sort(sub($"NAME"))
       .limit(1)
-    checkPushedLimit(df8)
+    checkSortRemoved(df8, false)
+    checkPushedInfo(df8, "PushedFilters: [], ")
     checkAnswer(df8, Seq(Row(2, "alex", 12000.00, 1200.0, false)))
   }
 
-  private def createSortValues(
-      sortDirection: SortDirection = SortDirection.ASCENDING,
-      nullOrdering: NullOrdering = NullOrdering.NULLS_FIRST): Seq[SortValue] = {
-    Seq(SortValue(FieldReference("salary"), sortDirection, nullOrdering))
-  }
-
   test("scan with filter push-down") {
     val df = spark.table("h2.test.people").filter($"id" > 1)
-
     checkFiltersRemoved(df)
-
-    df.queryExecution.optimizedPlan.collect {
-      case _: DataSourceV2ScanRelation =>
-        val expected_plan_fragment =
-          "PushedFilters: [ID IS NOT NULL, ID > 1]"
-        checkKeywordsExistsInExplain(df, expected_plan_fragment)
-    }
-
+    checkPushedInfo(df, "PushedFilters: [ID IS NOT NULL, ID > 1], ")
     checkAnswer(df, Row("mary", 2))
 
     val df2 = spark.table("h2.test.employee").filter($"name".isin("amy", "cathy"))
-
     checkFiltersRemoved(df2)
-
-    df2.queryExecution.optimizedPlan.collect {
-      case _: DataSourceV2ScanRelation =>
-        val expected_plan_fragment =
-          "PushedFilters: [NAME IN ('amy', 'cathy')]"
-        checkKeywordsExistsInExplain(df2, expected_plan_fragment)
-    }
-
+    checkPushedInfo(df2, "PushedFilters: [NAME IN ('amy', 'cathy')]")
     checkAnswer(df2, Seq(Row(1, "amy", 10000, 1000, true), Row(1, "cathy", 9000, 1200, false)))
 
     val df3 = spark.table("h2.test.employee").filter($"name".startsWith("a"))
-
     checkFiltersRemoved(df3)
-
-    df3.queryExecution.optimizedPlan.collect {
-      case _: DataSourceV2ScanRelation =>
-        val expected_plan_fragment =
-          "PushedFilters: [NAME IS NOT NULL, NAME LIKE 'a%']"
-        checkKeywordsExistsInExplain(df3, expected_plan_fragment)
-    }
-
+    checkPushedInfo(df3, "PushedFilters: [NAME IS NOT NULL, NAME LIKE 'a%']")
     checkAnswer(df3, Seq(Row(1, "amy", 10000, 1000, true), Row(2, "alex", 12000, 1200, false)))
 
     val df4 = spark.table("h2.test.employee").filter($"is_manager")
-
     checkFiltersRemoved(df4)
-
-    df4.queryExecution.optimizedPlan.collect {
-      case _: DataSourceV2ScanRelation =>
-        val expected_plan_fragment =
-          "PushedFilters: [IS_MANAGER IS NOT NULL, IS_MANAGER = true]"
-        checkKeywordsExistsInExplain(df4, expected_plan_fragment)
-    }
-
+    checkPushedInfo(df4, "PushedFilters: [IS_MANAGER IS NOT NULL, IS_MANAGER = true]")
     checkAnswer(df4, Seq(Row(1, "amy", 10000, 1000, true), Row(2, "david", 10000, 1300, true),
       Row(6, "jen", 12000, 1200, true)))
 
     val df5 = spark.table("h2.test.employee").filter($"is_manager".and($"salary" > 10000))
-
     checkFiltersRemoved(df5)
-
-    df5.queryExecution.optimizedPlan.collect {
-      case _: DataSourceV2ScanRelation =>
-        val expected_plan_fragment =
-          "PushedFilters: [IS_MANAGER IS NOT NULL, SALARY IS NOT NULL, " +
-            "IS_MANAGER = true, SALARY > 10000.00]"
-        checkKeywordsExistsInExplain(df5, expected_plan_fragment)
-    }
-
+    checkPushedInfo(df5, "PushedFilters: [IS_MANAGER IS NOT NULL, SALARY IS NOT NULL, " +
+      "IS_MANAGER = true, SALARY > 10000.00]")
     checkAnswer(df5, Seq(Row(6, "jen", 12000, 1200, true)))
 
     val df6 = spark.table("h2.test.employee").filter($"is_manager".or($"salary" > 10000))
-
     checkFiltersRemoved(df6)
-
-    df6.queryExecution.optimizedPlan.collect {
-      case _: DataSourceV2ScanRelation =>
-        val expected_plan_fragment =
-          "PushedFilters: [(IS_MANAGER = true) OR (SALARY > 10000.00)], "
-        checkKeywordsExistsInExplain(df6, expected_plan_fragment)
-    }
-
+    checkPushedInfo(df6, "PushedFilters: [(IS_MANAGER = true) OR (SALARY > 10000.00)], ")
     checkAnswer(df6, Seq(Row(1, "amy", 10000, 1000, true), Row(2, "alex", 12000, 1200, false),
       Row(2, "david", 10000, 1300, true), Row(6, "jen", 12000, 1200, true)))
 
     val df7 = spark.table("h2.test.employee").filter(not($"is_manager") === true)
-
     checkFiltersRemoved(df7)
-
-    df7.queryExecution.optimizedPlan.collect {
-      case _: DataSourceV2ScanRelation =>
-        val expected_plan_fragment =
-          "PushedFilters: [IS_MANAGER IS NOT NULL, NOT (IS_MANAGER = true)], "
-        checkKeywordsExistsInExplain(df7, expected_plan_fragment)
-    }
-
+    checkPushedInfo(df7, "PushedFilters: [IS_MANAGER IS NOT NULL, NOT (IS_MANAGER = true)], ")
     checkAnswer(df7, Seq(Row(1, "cathy", 9000, 1200, false), Row(2, "alex", 12000, 1200, false)))
 
     val df8 = spark.table("h2.test.employee").filter($"is_manager" === true)
-
     checkFiltersRemoved(df8)
-
-    df8.queryExecution.optimizedPlan.collect {
-      case _: DataSourceV2ScanRelation =>
-        val expected_plan_fragment =
-          "PushedFilters: [IS_MANAGER IS NOT NULL, IS_MANAGER = true], "
-        checkKeywordsExistsInExplain(df8, expected_plan_fragment)
-    }
-
+    checkPushedInfo(df8, "PushedFilters: [IS_MANAGER IS NOT NULL, IS_MANAGER = true], ")
     checkAnswer(df8, Seq(Row(1, "amy", 10000, 1000, true),
       Row(2, "david", 10000, 1300, true), Row(6, "jen", 12000, 1200, true)))
 
     val df9 = spark.table("h2.test.employee")
       .filter(when($"dept" > 1, true).when($"is_manager", false).otherwise($"dept" > 3))
-
     checkFiltersRemoved(df9)
-
-    df9.queryExecution.optimizedPlan.collect {
-      case _: DataSourceV2ScanRelation =>
-        val expected_plan_fragment =
-          "PushedFilters: [CASE WHEN DEPT > 1 THEN TRUE WHEN IS_MANAGER = true THEN FALSE" +
-            " ELSE DEPT > 3 END], "
-        checkKeywordsExistsInExplain(df9, expected_plan_fragment)
-    }
-
+    checkPushedInfo(df9, "PushedFilters: [CASE WHEN DEPT > 1 THEN TRUE " +
+      "WHEN IS_MANAGER = true THEN FALSE ELSE DEPT > 3 END], ")
     checkAnswer(df9, Seq(Row(2, "alex", 12000, 1200, false),
       Row(2, "david", 10000, 1300, true), Row(6, "jen", 12000, 1200, true)))
   }
@@ -387,19 +325,13 @@ class JDBCV2Suite extends QueryTest with SharedSparkSession with ExplainSuiteHel
     Seq(false, true).foreach { ansiMode =>
       withSQLConf(SQLConf.ANSI_ENABLED.key -> ansiMode.toString) {
         val df = spark.table("h2.test.people").filter($"id" + 1 > 1)
-
         checkFiltersRemoved(df, ansiMode)
-
-        df.queryExecution.optimizedPlan.collect {
-          case _: DataSourceV2ScanRelation =>
-            val expected_plan_fragment = if (ansiMode) {
-              "PushedFilters: [ID IS NOT NULL, (ID + 1) > 1]"
-            } else {
-              "PushedFilters: [ID IS NOT NULL]"
-            }
-            checkKeywordsExistsInExplain(df, expected_plan_fragment)
+        val expectedPlanFragment = if (ansiMode) {
+          "PushedFilters: [ID IS NOT NULL, (ID + 1) > 1]"
+        } else {
+          "PushedFilters: [ID IS NOT NULL]"
         }
-
+        checkPushedInfo(df, expectedPlanFragment)
         checkAnswer(df, Seq(Row("fred", 1), Row("mary", 2)))
 
         val df2 = spark.table("h2.test.people").filter($"id" + Int.MaxValue > 1)
@@ -432,18 +364,13 @@ class JDBCV2Suite extends QueryTest with SharedSparkSession with ExplainSuiteHel
                         |""".stripMargin)
 
         checkFiltersRemoved(df3, ansiMode)
-
-        df3.queryExecution.optimizedPlan.collect {
-          case _: DataSourceV2ScanRelation =>
-            val expected_plan_fragment = if (ansiMode) {
-              "PushedFilters: [(CASE WHEN SALARY > 10000.00 THEN BONUS" +
-                " ELSE BONUS + 200.0 END) > 1200.0]"
-            } else {
-              "PushedFilters: []"
-            }
-            checkKeywordsExistsInExplain(df3, expected_plan_fragment)
+        val expectedPlanFragment3 = if (ansiMode) {
+          "PushedFilters: [(CASE WHEN SALARY > 10000.00 THEN BONUS" +
+            " ELSE BONUS + 200.0 END) > 1200.0]"
+        } else {
+          "PushedFilters: []"
         }
-
+        checkPushedInfo(df3, expectedPlanFragment3)
         checkAnswer(df3,
           Seq(Row(1, "cathy", 9000, 1200, false), Row(2, "david", 10000, 1300, true)))
       }
@@ -587,14 +514,9 @@ class JDBCV2Suite extends QueryTest with SharedSparkSession with ExplainSuiteHel
       " group by DePt")
     checkFiltersRemoved(df)
     checkAggregateRemoved(df)
-    df.queryExecution.optimizedPlan.collect {
-      case _: DataSourceV2ScanRelation =>
-        val expected_plan_fragment =
-          "PushedAggregates: [MAX(SALARY), AVG(BONUS)], " +
-            "PushedFilters: [DEPT IS NOT NULL, DEPT > 0], " +
-            "PushedGroupByColumns: [DEPT], "
-        checkKeywordsExistsInExplain(df, expected_plan_fragment)
-    }
+    checkPushedInfo(df, "PushedAggregates: [MAX(SALARY), AVG(BONUS)], " +
+      "PushedFilters: [DEPT IS NOT NULL, DEPT > 0], " +
+      "PushedGroupByColumns: [DEPT], ")
     checkAnswer(df, Seq(Row(10000, 1100.0), Row(12000, 1250.0), Row(12000, 1200.0)))
   }
 
@@ -613,14 +535,9 @@ class JDBCV2Suite extends QueryTest with SharedSparkSession with ExplainSuiteHel
     val df = sql("select MAX(ID), AVG(ID) FROM h2.test.people where id > 0")
     checkFiltersRemoved(df)
     checkAggregateRemoved(df)
-    df.queryExecution.optimizedPlan.collect {
-      case _: DataSourceV2ScanRelation =>
-        val expected_plan_fragment =
-          "PushedAggregates: [MAX(ID), AVG(ID)], " +
-            "PushedFilters: [ID IS NOT NULL, ID > 0], " +
-            "PushedGroupByColumns: [], "
-        checkKeywordsExistsInExplain(df, expected_plan_fragment)
-    }
+    checkPushedInfo(df, "PushedAggregates: [MAX(ID), AVG(ID)], " +
+      "PushedFilters: [ID IS NOT NULL, ID > 0], " +
+      "PushedGroupByColumns: [], ")
     checkAnswer(df, Seq(Row(2, 1.5)))
   }
 
@@ -650,42 +567,28 @@ class JDBCV2Suite extends QueryTest with SharedSparkSession with ExplainSuiteHel
           "PushedAggregates: [MAX(SALARY)]"
         checkKeywordsExistsInExplain(df, expected_plan_fragment)
     }
+    checkPushedInfo(df, "PushedAggregates: [MAX(SALARY)]")
     checkAnswer(df, Seq(Row(12001)))
   }
 
   test("scan with aggregate push-down: COUNT(*)") {
     val df = sql("select COUNT(*) FROM h2.test.employee")
     checkAggregateRemoved(df)
-    df.queryExecution.optimizedPlan.collect {
-      case _: DataSourceV2ScanRelation =>
-        val expected_plan_fragment =
-          "PushedAggregates: [COUNT(*)]"
-        checkKeywordsExistsInExplain(df, expected_plan_fragment)
-    }
+    checkPushedInfo(df, "PushedAggregates: [COUNT(*)]")
     checkAnswer(df, Seq(Row(5)))
   }
 
   test("scan with aggregate push-down: COUNT(col)") {
     val df = sql("select COUNT(DEPT) FROM h2.test.employee")
     checkAggregateRemoved(df)
-    df.queryExecution.optimizedPlan.collect {
-      case _: DataSourceV2ScanRelation =>
-        val expected_plan_fragment =
-          "PushedAggregates: [COUNT(DEPT)]"
-        checkKeywordsExistsInExplain(df, expected_plan_fragment)
-    }
+    checkPushedInfo(df, "PushedAggregates: [COUNT(DEPT)]")
     checkAnswer(df, Seq(Row(5)))
   }
 
   test("scan with aggregate push-down: COUNT(DISTINCT col)") {
     val df = sql("select COUNT(DISTINCT DEPT) FROM h2.test.employee")
     checkAggregateRemoved(df)
-    df.queryExecution.optimizedPlan.collect {
-      case _: DataSourceV2ScanRelation =>
-        val expected_plan_fragment =
-          "PushedAggregates: [COUNT(DISTINCT DEPT)]"
-        checkKeywordsExistsInExplain(df, expected_plan_fragment)
-    }
+    checkPushedInfo(df, "PushedAggregates: [COUNT(DISTINCT DEPT)]")
     checkAnswer(df, Seq(Row(3)))
   }
 
@@ -704,52 +607,30 @@ class JDBCV2Suite extends QueryTest with SharedSparkSession with ExplainSuiteHel
   test("scan with aggregate push-down: SUM without filer and group by") {
     val df = sql("SELECT SUM(SALARY) FROM h2.test.employee")
     checkAggregateRemoved(df)
-    df.queryExecution.optimizedPlan.collect {
-      case _: DataSourceV2ScanRelation =>
-        val expected_plan_fragment =
-          "PushedAggregates: [SUM(SALARY)]"
-        checkKeywordsExistsInExplain(df, expected_plan_fragment)
-    }
+    checkPushedInfo(df, "PushedAggregates: [SUM(SALARY)]")
     checkAnswer(df, Seq(Row(53000)))
   }
 
   test("scan with aggregate push-down: DISTINCT SUM without filer and group by") {
     val df = sql("SELECT SUM(DISTINCT SALARY) FROM h2.test.employee")
     checkAggregateRemoved(df)
-    df.queryExecution.optimizedPlan.collect {
-      case _: DataSourceV2ScanRelation =>
-        val expected_plan_fragment =
-          "PushedAggregates: [SUM(DISTINCT SALARY)]"
-        checkKeywordsExistsInExplain(df, expected_plan_fragment)
-    }
+    checkPushedInfo(df, "PushedAggregates: [SUM(DISTINCT SALARY)]")
     checkAnswer(df, Seq(Row(31000)))
   }
 
   test("scan with aggregate push-down: SUM with group by") {
     val df = sql("SELECT SUM(SALARY) FROM h2.test.employee GROUP BY DEPT")
     checkAggregateRemoved(df)
-    df.queryExecution.optimizedPlan.collect {
-      case _: DataSourceV2ScanRelation =>
-        val expected_plan_fragment =
-          "PushedAggregates: [SUM(SALARY)], " +
-            "PushedFilters: [], " +
-            "PushedGroupByColumns: [DEPT], "
-        checkKeywordsExistsInExplain(df, expected_plan_fragment)
-    }
+    checkPushedInfo(df, "PushedAggregates: [SUM(SALARY)], " +
+      "PushedFilters: [], PushedGroupByColumns: [DEPT], ")
     checkAnswer(df, Seq(Row(19000), Row(22000), Row(12000)))
   }
 
   test("scan with aggregate push-down: DISTINCT SUM with group by") {
     val df = sql("SELECT SUM(DISTINCT SALARY) FROM h2.test.employee GROUP BY DEPT")
     checkAggregateRemoved(df)
-    df.queryExecution.optimizedPlan.collect {
-      case _: DataSourceV2ScanRelation =>
-        val expected_plan_fragment =
-          "PushedAggregates: [SUM(DISTINCT SALARY)], " +
-            "PushedFilters: [], " +
-            "PushedGroupByColumns: [DEPT], "
-        checkKeywordsExistsInExplain(df, expected_plan_fragment)
-    }
+    checkPushedInfo(df, "PushedAggregates: [SUM(DISTINCT SALARY)], " +
+      "PushedFilters: [], PushedGroupByColumns: [DEPT]")
     checkAnswer(df, Seq(Row(19000), Row(22000), Row(12000)))
   }
 
@@ -758,14 +639,8 @@ class JDBCV2Suite extends QueryTest with SharedSparkSession with ExplainSuiteHel
       " group by DEPT, NAME")
     checkFiltersRemoved(df)
     checkAggregateRemoved(df)
-    df.queryExecution.optimizedPlan.collect {
-      case _: DataSourceV2ScanRelation =>
-        val expected_plan_fragment =
-          "PushedAggregates: [MAX(SALARY), MIN(BONUS)], " +
-            "PushedFilters: [DEPT IS NOT NULL, DEPT > 0], " +
-            "PushedGroupByColumns: [DEPT, NAME], "
-        checkKeywordsExistsInExplain(df, expected_plan_fragment)
-    }
+    checkPushedInfo(df, "PushedAggregates: [MAX(SALARY), MIN(BONUS)], " +
+      "PushedFilters: [DEPT IS NOT NULL, DEPT > 0], PushedGroupByColumns: [DEPT, NAME]")
     checkAnswer(df, Seq(Row(9000, 1200), Row(12000, 1200), Row(10000, 1300),
       Row(10000, 1000), Row(12000, 1200)))
   }
@@ -778,14 +653,8 @@ class JDBCV2Suite extends QueryTest with SharedSparkSession with ExplainSuiteHel
     }
     assert(filters1.isEmpty)
     checkAggregateRemoved(df1)
-    df1.queryExecution.optimizedPlan.collect {
-      case _: DataSourceV2ScanRelation =>
-        val expected_plan_fragment =
-          "PushedAggregates: [MAX(SALARY)], " +
-            "PushedFilters: [DEPT IS NOT NULL, DEPT > 0], " +
-            "PushedGroupByColumns: [DEPT, NAME], "
-        checkKeywordsExistsInExplain(df1, expected_plan_fragment)
-    }
+    checkPushedInfo(df1, "PushedAggregates: [MAX(SALARY)], " +
+      "PushedFilters: [DEPT IS NOT NULL, DEPT > 0], PushedGroupByColumns: [DEPT, NAME]")
     checkAnswer(df1, Seq(Row("1#amy", 10000), Row("1#cathy", 9000), Row("2#alex", 12000),
       Row("2#david", 10000), Row("6#jen", 12000)))
 
@@ -796,30 +665,16 @@ class JDBCV2Suite extends QueryTest with SharedSparkSession with ExplainSuiteHel
     }
     assert(filters2.isEmpty)
     checkAggregateRemoved(df2)
-    df2.queryExecution.optimizedPlan.collect {
-      case _: DataSourceV2ScanRelation =>
-        val expected_plan_fragment =
-          "PushedAggregates: [MAX(SALARY), MIN(BONUS)], " +
-            "PushedFilters: [DEPT IS NOT NULL, DEPT > 0], " +
-            "PushedGroupByColumns: [DEPT, NAME], "
-        checkKeywordsExistsInExplain(df2, expected_plan_fragment)
-    }
+    checkPushedInfo(df2, "PushedAggregates: [MAX(SALARY), MIN(BONUS)], " +
+      "PushedFilters: [DEPT IS NOT NULL, DEPT > 0], PushedGroupByColumns: [DEPT, NAME]")
     checkAnswer(df2, Seq(Row("1#amy", 11000), Row("1#cathy", 10200), Row("2#alex", 13200),
       Row("2#david", 11300), Row("6#jen", 13200)))
 
     val df3 = sql("select concat_ws('#', DEPT, NAME), MAX(SALARY) + MIN(BONUS)" +
       " FROM h2.test.employee where dept > 0 group by concat_ws('#', DEPT, NAME)")
-    val filters3 = df3.queryExecution.optimizedPlan.collect {
-      case f: Filter => f
-    }
-    assert(filters3.isEmpty)
+    checkFiltersRemoved(df3)
     checkAggregateRemoved(df3, false)
-    df3.queryExecution.optimizedPlan.collect {
-      case _: DataSourceV2ScanRelation =>
-        val expected_plan_fragment =
-          "PushedFilters: [DEPT IS NOT NULL, DEPT > 0], "
-        checkKeywordsExistsInExplain(df3, expected_plan_fragment)
-    }
+    checkPushedInfo(df3, "PushedFilters: [DEPT IS NOT NULL, DEPT > 0], ")
     checkAnswer(df3, Seq(Row("1#amy", 11000), Row("1#cathy", 10200), Row("2#alex", 13200),
       Row("2#david", 11300), Row("6#jen", 13200)))
   }
@@ -827,19 +682,11 @@ class JDBCV2Suite extends QueryTest with SharedSparkSession with ExplainSuiteHel
   test("scan with aggregate push-down: with having clause") {
     val df = sql("select MAX(SALARY), MIN(BONUS) FROM h2.test.employee where dept > 0" +
       " group by DEPT having MIN(BONUS) > 1000")
-    val filters = df.queryExecution.optimizedPlan.collect {
-      case f: Filter => f  // filter over aggregate not push down
-    }
-    assert(filters.nonEmpty)
+    // filter over aggregate not push down
+    checkFiltersRemoved(df, false)
     checkAggregateRemoved(df)
-    df.queryExecution.optimizedPlan.collect {
-      case _: DataSourceV2ScanRelation =>
-        val expected_plan_fragment =
-          "PushedAggregates: [MAX(SALARY), MIN(BONUS)], " +
-            "PushedFilters: [DEPT IS NOT NULL, DEPT > 0], " +
-            "PushedGroupByColumns: [DEPT], "
-        checkKeywordsExistsInExplain(df, expected_plan_fragment)
-    }
+    checkPushedInfo(df, "PushedAggregates: [MAX(SALARY), MIN(BONUS)], " +
+      "PushedFilters: [DEPT IS NOT NULL, DEPT > 0], PushedGroupByColumns: [DEPT]")
     checkAnswer(df, Seq(Row(12000, 1200), Row(12000, 1200)))
   }
 
@@ -848,14 +695,8 @@ class JDBCV2Suite extends QueryTest with SharedSparkSession with ExplainSuiteHel
       .groupBy($"DEPT")
       .min("SALARY").as("total")
     checkAggregateRemoved(df)
-    df.queryExecution.optimizedPlan.collect {
-      case _: DataSourceV2ScanRelation =>
-        val expected_plan_fragment =
-          "PushedAggregates: [MIN(SALARY)], " +
-            "PushedFilters: [], " +
-            "PushedGroupByColumns: [DEPT], "
-        checkKeywordsExistsInExplain(df, expected_plan_fragment)
-    }
+    checkPushedInfo(df, "PushedAggregates: [MIN(SALARY)], " +
+      "PushedFilters: [], PushedGroupByColumns: [DEPT]")
     checkAnswer(df, Seq(Row(1, 9000), Row(2, 10000), Row(6, 12000)))
   }
 
@@ -867,19 +708,10 @@ class JDBCV2Suite extends QueryTest with SharedSparkSession with ExplainSuiteHel
       .agg(sum($"SALARY").as("total"))
       .filter($"total" > 1000)
       .orderBy($"total")
-    val filters = query.queryExecution.optimizedPlan.collect {
-      case f: Filter => f
-    }
-    assert(filters.nonEmpty) // filter over aggregate not pushed down
-    checkAggregateRemoved(df)
-    query.queryExecution.optimizedPlan.collect {
-      case _: DataSourceV2ScanRelation =>
-        val expected_plan_fragment =
-          "PushedAggregates: [SUM(SALARY)], " +
-            "PushedFilters: [DEPT IS NOT NULL, DEPT > 0], " +
-            "PushedGroupByColumns: [DEPT], "
-        checkKeywordsExistsInExplain(query, expected_plan_fragment)
-    }
+    checkFiltersRemoved(query, false)// filter over aggregate not pushed down
+    checkAggregateRemoved(query)
+    checkPushedInfo(query, "PushedAggregates: [SUM(SALARY)], " +
+      "PushedFilters: [DEPT IS NOT NULL, DEPT > 0], PushedGroupByColumns: [DEPT]")
     checkAnswer(query, Seq(Row(6, 12000), Row(1, 19000), Row(2, 22000)))
   }
 
@@ -888,12 +720,7 @@ class JDBCV2Suite extends QueryTest with SharedSparkSession with ExplainSuiteHel
     val decrease = udf { (x: Double, y: Double) => x - y }
     val query = df.select(decrease(sum($"SALARY"), sum($"BONUS")).as("value"))
     checkAggregateRemoved(query)
-    query.queryExecution.optimizedPlan.collect {
-      case _: DataSourceV2ScanRelation =>
-        val expected_plan_fragment =
-          "PushedAggregates: [SUM(SALARY), SUM(BONUS)]"
-        checkKeywordsExistsInExplain(query, expected_plan_fragment)
-    }
+    checkPushedInfo(query, "PushedAggregates: [SUM(SALARY), SUM(BONUS)], ")
     checkAnswer(query, Seq(Row(47100.0)))
   }
 
@@ -915,14 +742,8 @@ class JDBCV2Suite extends QueryTest with SharedSparkSession with ExplainSuiteHel
       " group by DePt")
     checkFiltersRemoved(df)
     checkAggregateRemoved(df)
-    df.queryExecution.optimizedPlan.collect {
-      case _: DataSourceV2ScanRelation =>
-        val expected_plan_fragment =
-          "PushedAggregates: [VAR_POP(BONUS), VAR_SAMP(BONUS)], " +
-            "PushedFilters: [DEPT IS NOT NULL, DEPT > 0], " +
-            "PushedGroupByColumns: [DEPT], "
-        checkKeywordsExistsInExplain(df, expected_plan_fragment)
-    }
+    checkPushedInfo(df, "PushedAggregates: [VAR_POP(BONUS), VAR_SAMP(BONUS)], " +
+      "PushedFilters: [DEPT IS NOT NULL, DEPT > 0], PushedGroupByColumns: [DEPT]")
     checkAnswer(df, Seq(Row(10000d, 20000d), Row(2500d, 5000d), Row(0d, null)))
   }
 
@@ -931,14 +752,8 @@ class JDBCV2Suite extends QueryTest with SharedSparkSession with ExplainSuiteHel
       " where dept > 0 group by DePt")
     checkFiltersRemoved(df)
     checkAggregateRemoved(df)
-    df.queryExecution.optimizedPlan.collect {
-      case _: DataSourceV2ScanRelation =>
-        val expected_plan_fragment =
-          "PushedAggregates: [STDDEV_POP(BONUS), STDDEV_SAMP(BONUS)], " +
-            "PushedFilters: [DEPT IS NOT NULL, DEPT > 0], " +
-            "PushedGroupByColumns: [DEPT], "
-        checkKeywordsExistsInExplain(df, expected_plan_fragment)
-    }
+    checkPushedInfo(df, "PushedAggregates: [STDDEV_POP(BONUS), STDDEV_SAMP(BONUS)], " +
+      "PushedFilters: [DEPT IS NOT NULL, DEPT > 0], PushedGroupByColumns: [DEPT]")
     checkAnswer(df, Seq(Row(100d, 141.4213562373095d), Row(50d, 70.71067811865476d), Row(0d, null)))
   }
 
@@ -947,14 +762,8 @@ class JDBCV2Suite extends QueryTest with SharedSparkSession with ExplainSuiteHel
       " FROM h2.test.employee where dept > 0 group by DePt")
     checkFiltersRemoved(df)
     checkAggregateRemoved(df)
-    df.queryExecution.optimizedPlan.collect {
-      case _: DataSourceV2ScanRelation =>
-        val expected_plan_fragment =
-          "PushedAggregates: [COVAR_POP(BONUS, BONUS), COVAR_SAMP(BONUS, BONUS)], " +
-            "PushedFilters: [DEPT IS NOT NULL, DEPT > 0], " +
-            "PushedGroupByColumns: [DEPT], "
-        checkKeywordsExistsInExplain(df, expected_plan_fragment)
-    }
+    checkPushedInfo(df, "PushedAggregates: [COVAR_POP(BONUS, BONUS), COVAR_SAMP(BONUS, BONUS)], " +
+      "PushedFilters: [DEPT IS NOT NULL, DEPT > 0], PushedGroupByColumns: [DEPT]")
     checkAnswer(df, Seq(Row(10000d, 20000d), Row(2500d, 5000d), Row(0d, null)))
   }
 
@@ -963,14 +772,8 @@ class JDBCV2Suite extends QueryTest with SharedSparkSession with ExplainSuiteHel
       " group by DePt")
     checkFiltersRemoved(df)
     checkAggregateRemoved(df)
-    df.queryExecution.optimizedPlan.collect {
-      case _: DataSourceV2ScanRelation =>
-        val expected_plan_fragment =
-          "PushedAggregates: [CORR(BONUS, BONUS)], " +
-            "PushedFilters: [DEPT IS NOT NULL, DEPT > 0], " +
-            "PushedGroupByColumns: [DEPT], "
-        checkKeywordsExistsInExplain(df, expected_plan_fragment)
-    }
+    checkPushedInfo(df, "PushedAggregates: [CORR(BONUS, BONUS)], " +
+      "PushedFilters: [DEPT IS NOT NULL, DEPT > 0], PushedGroupByColumns: [DEPT]")
     checkAnswer(df, Seq(Row(1d), Row(1d), Row(null)))
   }
 
@@ -1032,15 +835,11 @@ class JDBCV2Suite extends QueryTest with SharedSparkSession with ExplainSuiteHel
         |FROM h2.test.employee GROUP BY DEPT
       """.stripMargin)
     checkAggregateRemoved(df)
-    df.queryExecution.optimizedPlan.collect {
-      case _: DataSourceV2ScanRelation =>
-        val expected_plan_fragment =
-          "PushedAggregates: [COUNT(CASE WHEN (SALARY > 8000.00) AND (SALARY < 10000.00)" +
-            " THEN SALARY ELSE 0.00 END), COUNT(CAS..., " +
-            "PushedFilters: [], " +
-            "PushedGroupByColumns: [DEPT], "
-        checkKeywordsExistsInExplain(df, expected_plan_fragment)
-    }
+    checkPushedInfo(df,
+      "PushedAggregates: [COUNT(CASE WHEN (SALARY > 8000.00) AND (SALARY < 10000.00)" +
+      " THEN SALARY ELSE 0.00 END), COUNT(CAS..., " +
+      "PushedFilters: [], " +
+      "PushedGroupByColumns: [DEPT], ")
     checkAnswer(df, Seq(Row(1, 1, 1, 1, 1, 0d, 12000d, 0d, 12000d, 12000d, 0d, 0d, 2, 0d),
       Row(2, 2, 2, 2, 2, 0d, 10000d, 0d, 10000d, 10000d, 0d, 0d, 2, 0d),
       Row(2, 2, 2, 2, 2, 0d, 12000d, 0d, 12000d, 12000d, 0d, 0d, 3, 0d)))
@@ -1051,17 +850,14 @@ class JDBCV2Suite extends QueryTest with SharedSparkSession with ExplainSuiteHel
       withSQLConf(SQLConf.ANSI_ENABLED.key -> ansiMode.toString) {
         val df = sql("SELECT SUM(2147483647 + DEPT) FROM h2.test.employee")
         checkAggregateRemoved(df, ansiMode)
-        val expected_plan_fragment = if (ansiMode) {
+        val expectedPlanFragment = if (ansiMode) {
           "PushedAggregates: [SUM(2147483647 + DEPT)], " +
             "PushedFilters: [], " +
             "PushedGroupByColumns: []"
         } else {
           "PushedFilters: []"
         }
-        df.queryExecution.optimizedPlan.collect {
-          case _: DataSourceV2ScanRelation =>
-            checkKeywordsExistsInExplain(df, expected_plan_fragment)
-        }
+        checkPushedInfo(df, expectedPlanFragment)
         if (ansiMode) {
           val e = intercept[SparkException] {
             checkAnswer(df, Seq(Row(-10737418233L)))
@@ -1080,12 +876,7 @@ class JDBCV2Suite extends QueryTest with SharedSparkSession with ExplainSuiteHel
     val decrease = udf { (x: Double, y: Double) => x - y }
     val query = df.select(sum(decrease($"SALARY", $"BONUS")).as("value"))
     checkAggregateRemoved(query, false)
-    query.queryExecution.optimizedPlan.collect {
-      case _: DataSourceV2ScanRelation =>
-        val expected_plan_fragment =
-          "PushedFilters: []"
-        checkKeywordsExistsInExplain(query, expected_plan_fragment)
-    }
+    checkPushedInfo(query, "PushedFilters: []")
     checkAnswer(query, Seq(Row(47100.0)))
   }
 
@@ -1121,12 +912,7 @@ class JDBCV2Suite extends QueryTest with SharedSparkSession with ExplainSuiteHel
     checkAnswer(sql("SELECT `dept id` FROM h2.test.dept"), Seq(Row(1), Row(2)))
     val df = sql("SELECT COUNT(`dept id`) FROM h2.test.dept")
     checkAggregateRemoved(df)
-    df.queryExecution.optimizedPlan.collect {
-      case _: DataSourceV2ScanRelation =>
-        val expected_plan_fragment =
-          "PushedAggregates: [COUNT(`dept id`)]"
-        checkKeywordsExistsInExplain(df, expected_plan_fragment)
-    }
+    checkPushedInfo(df, "PushedAggregates: [COUNT(`dept id`)]")
     checkAnswer(df, Seq(Row(2)))
   }
 
@@ -1135,12 +921,7 @@ class JDBCV2Suite extends QueryTest with SharedSparkSession with ExplainSuiteHel
     checkAnswer(sql("SELECT `名` FROM h2.test.person"), Seq(Row(1), Row(2)))
     val df = sql("SELECT COUNT(`名`) FROM h2.test.person")
     checkAggregateRemoved(df)
-    df.queryExecution.optimizedPlan.collect {
-      case _: DataSourceV2ScanRelation =>
-        val expected_plan_fragment =
-          "PushedAggregates: [COUNT(`名`)]"
-        checkKeywordsExistsInExplain(df, expected_plan_fragment)
-    }
+    checkPushedInfo(df, "PushedAggregates: [COUNT(`名`)]")
     checkAnswer(df, Seq(Row(2)))
     // scalastyle:on
   }
@@ -1154,12 +935,7 @@ class JDBCV2Suite extends QueryTest with SharedSparkSession with ExplainSuiteHel
       .table("h2.test.employee")
       .agg(sum($"SALARY").as("sum"), avg($"SALARY").as("avg"), count($"SALARY").as("count"))
     checkAggregateRemoved(df)
-    df.queryExecution.optimizedPlan.collect {
-      case _: DataSourceV2ScanRelation =>
-        val expected_plan_fragment =
-          "PushedAggregates: [SUM(SALARY), AVG(SALARY), COUNT(SALARY)]"
-        checkKeywordsExistsInExplain(df, expected_plan_fragment)
-    }
+    checkPushedInfo(df, "PushedAggregates: [SUM(SALARY), AVG(SALARY), COUNT(SALARY)]")
     checkAnswer(df, Seq(Row(53000.00, 10600.000000, 5)))
 
     val df2 = spark.read
@@ -1171,12 +947,7 @@ class JDBCV2Suite extends QueryTest with SharedSparkSession with ExplainSuiteHel
       .groupBy($"name")
       .agg(sum($"SALARY").as("sum"), avg($"SALARY").as("avg"), count($"SALARY").as("count"))
     checkAggregateRemoved(df)
-    df.queryExecution.optimizedPlan.collect {
-      case _: DataSourceV2ScanRelation =>
-        val expected_plan_fragment =
-          "PushedAggregates: [SUM(SALARY), AVG(SALARY), COUNT(SALARY)]"
-        checkKeywordsExistsInExplain(df, expected_plan_fragment)
-    }
+    checkPushedInfo(df, "PushedAggregates: [SUM(SALARY), AVG(SALARY), COUNT(SALARY)]")
     checkAnswer(df2, Seq(
       Row("alex", 12000.00, 12000.000000, 1),
       Row("amy", 10000.00, 10000.000000, 1),
@@ -1194,12 +965,7 @@ class JDBCV2Suite extends QueryTest with SharedSparkSession with ExplainSuiteHel
       .table("h2.test.employee")
       .agg(sum($"SALARY").as("sum"), avg($"SALARY").as("avg"), count($"SALARY").as("count"))
     checkAggregateRemoved(df, false)
-    df.queryExecution.optimizedPlan.collect {
-      case _: DataSourceV2ScanRelation =>
-        val expected_plan_fragment =
-          "PushedAggregates: [SUM(SALARY), COUNT(SALARY)]"
-        checkKeywordsExistsInExplain(df, expected_plan_fragment)
-    }
+    checkPushedInfo(df, "PushedAggregates: [SUM(SALARY), COUNT(SALARY)]")
     checkAnswer(df, Seq(Row(53000.00, 10600.000000, 5)))
 
     val df2 = spark.read
@@ -1211,12 +977,7 @@ class JDBCV2Suite extends QueryTest with SharedSparkSession with ExplainSuiteHel
       .groupBy($"name")
       .agg(sum($"SALARY").as("sum"), avg($"SALARY").as("avg"), count($"SALARY").as("count"))
     checkAggregateRemoved(df, false)
-    df.queryExecution.optimizedPlan.collect {
-      case _: DataSourceV2ScanRelation =>
-        val expected_plan_fragment =
-          "PushedAggregates: [SUM(SALARY), COUNT(SALARY)]"
-        checkKeywordsExistsInExplain(df, expected_plan_fragment)
-    }
+    checkPushedInfo(df, "PushedAggregates: [SUM(SALARY), COUNT(SALARY)]")
     checkAnswer(df2, Seq(
       Row("alex", 12000.00, 12000.000000, 1),
       Row("amy", 10000.00, 10000.000000, 1),
@@ -1240,12 +1001,8 @@ class JDBCV2Suite extends QueryTest with SharedSparkSession with ExplainSuiteHel
       .agg(sum($"mySalary").as("total"))
       .filter($"total" > 1000)
     checkAggregateRemoved(df)
-    df.queryExecution.optimizedPlan.collect {
-      case _: DataSourceV2ScanRelation =>
-        val expectedPlanFragment =
-          "PushedAggregates: [SUM(SALARY)], PushedFilters: [], PushedGroupByColumns: [DEPT]"
-        checkKeywordsExistsInExplain(df, expectedPlanFragment)
-    }
+    checkPushedInfo(df,
+      "PushedAggregates: [SUM(SALARY)], PushedFilters: [], PushedGroupByColumns: [DEPT]")
     checkAnswer(df, Seq(Row(1, 19000.00), Row(2, 22000.00), Row(6, 12000.00)))
 
     val df2 = spark.table("h2.test.employee")
@@ -1254,12 +1011,8 @@ class JDBCV2Suite extends QueryTest with SharedSparkSession with ExplainSuiteHel
       .agg(sum($"mySalary").as("total"))
       .filter($"total" > 1000)
     checkAggregateRemoved(df2)
-    df2.queryExecution.optimizedPlan.collect {
-      case _: DataSourceV2ScanRelation =>
-        val expectedPlanFragment =
-          "PushedAggregates: [SUM(SALARY)], PushedFilters: [], PushedGroupByColumns: [DEPT]"
-        checkKeywordsExistsInExplain(df2, expectedPlanFragment)
-    }
+    checkPushedInfo(df2,
+      "PushedAggregates: [SUM(SALARY)], PushedFilters: [], PushedGroupByColumns: [DEPT]")
     checkAnswer(df2, Seq(Row(1, 19000.00), Row(2, 22000.00), Row(6, 12000.00)))
   }
 
@@ -1275,12 +1028,8 @@ class JDBCV2Suite extends QueryTest with SharedSparkSession with ExplainSuiteHel
       .agg(sum($"mySalary").as("total"))
       .filter($"total" > 1000)
     checkAggregateRemoved(df, false)
-    df.queryExecution.optimizedPlan.collect {
-      case _: DataSourceV2ScanRelation =>
-        val expectedPlanFragment =
-          "PushedAggregates: [SUM(SALARY)], PushedFilters: [], PushedGroupByColumns: [NAME]"
-        checkKeywordsExistsInExplain(df, expectedPlanFragment)
-    }
+    checkPushedInfo(df,
+      "PushedAggregates: [SUM(SALARY)], PushedFilters: [], PushedGroupByColumns: [NAME]")
     checkAnswer(df, Seq(Row("alex", 12000.00), Row("amy", 10000.00),
       Row("cathy", 9000.00), Row("david", 10000.00), Row("jen", 12000.00)))
 
@@ -1295,12 +1044,8 @@ class JDBCV2Suite extends QueryTest with SharedSparkSession with ExplainSuiteHel
       .agg(sum($"mySalary").as("total"))
       .filter($"total" > 1000)
     checkAggregateRemoved(df2, false)
-    df2.queryExecution.optimizedPlan.collect {
-      case _: DataSourceV2ScanRelation =>
-        val expectedPlanFragment =
-          "PushedAggregates: [SUM(SALARY)], PushedFilters: [], PushedGroupByColumns: [NAME]"
-        checkKeywordsExistsInExplain(df2, expectedPlanFragment)
-    }
+    checkPushedInfo(df2,
+      "PushedAggregates: [SUM(SALARY)], PushedFilters: [], PushedGroupByColumns: [NAME]")
     checkAnswer(df2, Seq(Row("alex", 12000.00), Row("amy", 10000.00),
       Row("cathy", 9000.00), Row("david", 10000.00), Row("jen", 12000.00)))
   }

---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@spark.apache.org
For additional commands, e-mail: commits-help@spark.apache.org