You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@spark.apache.org by do...@apache.org on 2020/12/08 03:07:59 UTC

[spark] branch branch-3.1 updated: [SPARK-33680][SQL][TESTS][FOLLOWUP] Fix more test suites to have explicit confs

This is an automated email from the ASF dual-hosted git repository.

dongjoon pushed a commit to branch branch-3.1
in repository https://gitbox.apache.org/repos/asf/spark.git


The following commit(s) were added to refs/heads/branch-3.1 by this push:
     new 925b2a8  [SPARK-33680][SQL][TESTS][FOLLOWUP] Fix more test suites to have explicit confs
925b2a8 is described below

commit 925b2a815d609dc18451b3d679b419df3f7689cf
Author: Dongjoon Hyun <do...@apache.org>
AuthorDate: Mon Dec 7 18:59:15 2020 -0800

    [SPARK-33680][SQL][TESTS][FOLLOWUP] Fix more test suites to have explicit confs
    
    ### What changes were proposed in this pull request?
    
    This is a follow-up for SPARK-33680 to remove the assumption on the default value of `spark.sql.adaptive.enabled` .
    
    ### Why are the changes needed?
    
    According to the test result https://github.com/apache/spark/pull/30628#issuecomment-739866168, the [previous run](https://github.com/apache/spark/pull/30628#issuecomment-739641105) didn't run all tests.
    
    ### Does this PR introduce _any_ user-facing change?
    
    No.
    
    ### How was this patch tested?
    
    Pass the CIs.
    
    Closes #30655 from dongjoon-hyun/SPARK-33680.
    
    Authored-by: Dongjoon Hyun <do...@apache.org>
    Signed-off-by: Dongjoon Hyun <do...@apache.org>
    (cherry picked from commit b2a79306ef7b330c5bf4dc1337ed80ebd6e08d0c)
    Signed-off-by: Dongjoon Hyun <do...@apache.org>
---
 .../apache/spark/sql/DataFrameAggregateSuite.scala |  4 +-
 .../org/apache/spark/sql/DataFrameJoinSuite.scala  |  4 +-
 .../scala/org/apache/spark/sql/JoinSuite.scala     |  9 ++-
 .../apache/spark/sql/execution/PlannerSuite.scala  | 73 +++++++++++++++-------
 .../spark/sql/sources/BucketedReadSuite.scala      |  5 +-
 .../v1/sql/SqlResourceWithActualMetricsSuite.scala | 11 +++-
 6 files changed, 74 insertions(+), 32 deletions(-)

diff --git a/sql/core/src/test/scala/org/apache/spark/sql/DataFrameAggregateSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/DataFrameAggregateSuite.scala
index d4e64aa..78983a4 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/DataFrameAggregateSuite.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/DataFrameAggregateSuite.scala
@@ -1001,7 +1001,9 @@ class DataFrameAggregateSuite extends QueryTest
 
   Seq(true, false).foreach { value =>
     test(s"SPARK-31620: agg with subquery (whole-stage-codegen = $value)") {
-      withSQLConf(SQLConf.WHOLESTAGE_CODEGEN_ENABLED.key -> value.toString) {
+      withSQLConf(
+        SQLConf.WHOLESTAGE_CODEGEN_ENABLED.key -> value.toString,
+        SQLConf.ADAPTIVE_EXECUTION_ENABLED.key -> "false") {
         withTempView("t1", "t2") {
           sql("create temporary view t1 as select * from values (1, 2) as t1(a, b)")
           sql("create temporary view t2 as select * from values (3, 4) as t2(c, d)")
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/DataFrameJoinSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/DataFrameJoinSuite.scala
index 14d03a3..c317f56 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/DataFrameJoinSuite.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/DataFrameJoinSuite.scala
@@ -335,7 +335,9 @@ class DataFrameJoinSuite extends QueryTest
 
     withTempDatabase { dbName =>
       withTable(table1Name, table2Name) {
-        withSQLConf(SQLConf.AUTO_BROADCASTJOIN_THRESHOLD.key -> "-1") {
+        withSQLConf(
+          SQLConf.AUTO_BROADCASTJOIN_THRESHOLD.key -> "-1",
+          SQLConf.ADAPTIVE_EXECUTION_ENABLED.key -> "false") {
           spark.range(50).write.saveAsTable(s"$dbName.$table1Name")
           spark.range(100).write.saveAsTable(s"$dbName.$table2Name")
 
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/JoinSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/JoinSuite.scala
index 8755dcc..a728e5c 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/JoinSuite.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/JoinSuite.scala
@@ -1107,6 +1107,7 @@ class JoinSuite extends QueryTest with SharedSparkSession with AdaptiveSparkPlan
 
   test("SPARK-32330: Preserve shuffled hash join build side partitioning") {
     withSQLConf(
+        SQLConf.ADAPTIVE_EXECUTION_ENABLED.key -> "false",
         SQLConf.AUTO_BROADCASTJOIN_THRESHOLD.key -> "50",
         SQLConf.SHUFFLE_PARTITIONS.key -> "2",
         SQLConf.PREFER_SORTMERGEJOIN.key -> "false") {
@@ -1130,6 +1131,7 @@ class JoinSuite extends QueryTest with SharedSparkSession with AdaptiveSparkPlan
 
     // Test broadcast hash join
     withSQLConf(
+      SQLConf.ADAPTIVE_EXECUTION_ENABLED.key -> "false",
       SQLConf.AUTO_BROADCASTJOIN_THRESHOLD.key -> "50") {
       Seq("inner", "left_outer").foreach(joinType => {
         val plan = df1.join(df2, $"k1" === $"k2", joinType)
@@ -1146,6 +1148,7 @@ class JoinSuite extends QueryTest with SharedSparkSession with AdaptiveSparkPlan
 
     // Test shuffled hash join
     withSQLConf(
+      SQLConf.ADAPTIVE_EXECUTION_ENABLED.key -> "false",
       SQLConf.AUTO_BROADCASTJOIN_THRESHOLD.key -> "50",
       SQLConf.SHUFFLE_PARTITIONS.key -> "2",
       SQLConf.PREFER_SORTMERGEJOIN.key -> "false") {
@@ -1253,6 +1256,7 @@ class JoinSuite extends QueryTest with SharedSparkSession with AdaptiveSparkPlan
       withSQLConf(
         // Set broadcast join threshold and number of shuffle partitions,
         // as shuffled hash join depends on these two configs.
+        SQLConf.ADAPTIVE_EXECUTION_ENABLED.key -> "false",
         SQLConf.AUTO_BROADCASTJOIN_THRESHOLD.key -> "80",
         SQLConf.SHUFFLE_PARTITIONS.key -> "2") {
         val smjDF = df1.join(df2, joinExprs, "full")
@@ -1284,7 +1288,9 @@ class JoinSuite extends QueryTest with SharedSparkSession with AdaptiveSparkPlan
     )
     inputDFs.foreach { case (df1, df2, joinType) =>
       // Test broadcast hash join
-      withSQLConf(SQLConf.AUTO_BROADCASTJOIN_THRESHOLD.key -> "200") {
+      withSQLConf(
+        SQLConf.AUTO_BROADCASTJOIN_THRESHOLD.key -> "200",
+        SQLConf.ADAPTIVE_EXECUTION_ENABLED.key -> "false") {
         val bhjCodegenDF = df1.join(df2, $"k1" === $"k2", joinType)
         assert(bhjCodegenDF.queryExecution.executedPlan.collect {
           case WholeStageCodegenExec(_ : BroadcastHashJoinExec) => true
@@ -1305,6 +1311,7 @@ class JoinSuite extends QueryTest with SharedSparkSession with AdaptiveSparkPlan
         // Set broadcast join threshold and number of shuffle partitions,
         // as shuffled hash join depends on these two configs.
         SQLConf.AUTO_BROADCASTJOIN_THRESHOLD.key -> "50",
+        SQLConf.ADAPTIVE_EXECUTION_ENABLED.key -> "false",
         SQLConf.SHUFFLE_PARTITIONS.key -> "2") {
         val shjCodegenDF = df1.join(df2, $"k1" === $"k2", joinType)
         assert(shjCodegenDF.queryExecution.executedPlan.collect {
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/PlannerSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/PlannerSuite.scala
index 5e30f84..4e01d1c 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/execution/PlannerSuite.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/PlannerSuite.scala
@@ -877,7 +877,9 @@ class PlannerSuite extends SharedSparkSession with AdaptiveSparkPlanHelper {
   }
 
   test("aliases in the project should not introduce extra shuffle") {
-    withSQLConf(SQLConf.AUTO_BROADCASTJOIN_THRESHOLD.key -> "-1") {
+    withSQLConf(
+      SQLConf.AUTO_BROADCASTJOIN_THRESHOLD.key -> "-1",
+      SQLConf.ADAPTIVE_EXECUTION_ENABLED.key -> "false") {
       withTempView("df1", "df2") {
         spark.range(10).selectExpr("id AS key", "0").repartition($"key").createTempView("df1")
         spark.range(20).selectExpr("id AS key", "0").repartition($"key").createTempView("df2")
@@ -897,7 +899,9 @@ class PlannerSuite extends SharedSparkSession with AdaptiveSparkPlanHelper {
 
   test("SPARK-33399: aliases should be handled properly in PartitioningCollection output" +
     " partitioning") {
-    withSQLConf(SQLConf.AUTO_BROADCASTJOIN_THRESHOLD.key -> "-1") {
+    withSQLConf(
+      SQLConf.AUTO_BROADCASTJOIN_THRESHOLD.key -> "-1",
+      SQLConf.ADAPTIVE_EXECUTION_ENABLED.key -> "false") {
       withTempView("t1", "t2", "t3") {
         spark.range(10).repartition($"id").createTempView("t1")
         spark.range(20).repartition($"id").createTempView("t2")
@@ -927,7 +931,9 @@ class PlannerSuite extends SharedSparkSession with AdaptiveSparkPlanHelper {
   }
 
   test("SPARK-33399: aliases should be handled properly in HashPartitioning") {
-    withSQLConf(SQLConf.AUTO_BROADCASTJOIN_THRESHOLD.key -> "-1") {
+    withSQLConf(
+      SQLConf.AUTO_BROADCASTJOIN_THRESHOLD.key -> "-1",
+      SQLConf.ADAPTIVE_EXECUTION_ENABLED.key -> "false") {
       withTempView("t1", "t2", "t3") {
         spark.range(10).repartition($"id").createTempView("t1")
         spark.range(20).repartition($"id").createTempView("t2")
@@ -955,7 +961,9 @@ class PlannerSuite extends SharedSparkSession with AdaptiveSparkPlanHelper {
   }
 
   test("SPARK-33399: alias handling should happen properly for RangePartitioning") {
-    withSQLConf(SQLConf.AUTO_BROADCASTJOIN_THRESHOLD.key -> "-1") {
+    withSQLConf(
+      SQLConf.AUTO_BROADCASTJOIN_THRESHOLD.key -> "-1",
+      SQLConf.ADAPTIVE_EXECUTION_ENABLED.key -> "false") {
       val df = spark.range(1, 100)
         .select(col("id").as("id1")).groupBy("id1").count()
       // Plan for this will be Range -> ProjectWithAlias -> HashAggregate -> HashAggregate
@@ -976,7 +984,9 @@ class PlannerSuite extends SharedSparkSession with AdaptiveSparkPlanHelper {
 
   test("SPARK-33399: aliased should be handled properly " +
     "for partitioning and sortorder involving complex expressions") {
-    withSQLConf(SQLConf.AUTO_BROADCASTJOIN_THRESHOLD.key -> "-1") {
+    withSQLConf(
+      SQLConf.AUTO_BROADCASTJOIN_THRESHOLD.key -> "-1",
+      SQLConf.ADAPTIVE_EXECUTION_ENABLED.key -> "false") {
       withTempView("t1", "t2", "t3") {
         spark.range(10).select(col("id").as("id1")).createTempView("t1")
         spark.range(20).select(col("id").as("id2")).createTempView("t2")
@@ -1014,7 +1024,9 @@ class PlannerSuite extends SharedSparkSession with AdaptiveSparkPlanHelper {
   }
 
   test("SPARK-33399: alias handling should happen properly for SinglePartition") {
-    withSQLConf(SQLConf.AUTO_BROADCASTJOIN_THRESHOLD.key -> "-1") {
+    withSQLConf(
+      SQLConf.AUTO_BROADCASTJOIN_THRESHOLD.key -> "-1",
+      SQLConf.ADAPTIVE_EXECUTION_ENABLED.key -> "false") {
       val df = spark.range(1, 100, 1, 1)
         .select(col("id").as("id1")).groupBy("id1").count()
       val planned = df.queryExecution.executedPlan
@@ -1031,7 +1043,9 @@ class PlannerSuite extends SharedSparkSession with AdaptiveSparkPlanHelper {
 
   test("SPARK-33399: No extra exchanges in case of" +
     " [Inner Join -> Project with aliases -> HashAggregate]") {
-    withSQLConf(SQLConf.AUTO_BROADCASTJOIN_THRESHOLD.key -> "-1") {
+    withSQLConf(
+      SQLConf.AUTO_BROADCASTJOIN_THRESHOLD.key -> "-1",
+      SQLConf.ADAPTIVE_EXECUTION_ENABLED.key -> "false") {
       withTempView("t1", "t2") {
         spark.range(10).repartition($"id").createTempView("t1")
         spark.range(20).repartition($"id").createTempView("t2")
@@ -1060,7 +1074,9 @@ class PlannerSuite extends SharedSparkSession with AdaptiveSparkPlanHelper {
   }
 
   test("SPARK-33400: Normalization of sortOrder should take care of sameOrderExprs") {
-    withSQLConf(SQLConf.AUTO_BROADCASTJOIN_THRESHOLD.key -> "-1") {
+    withSQLConf(
+      SQLConf.AUTO_BROADCASTJOIN_THRESHOLD.key -> "-1",
+      SQLConf.ADAPTIVE_EXECUTION_ENABLED.key -> "false") {
       withTempView("t1", "t2", "t3") {
         spark.range(10).repartition($"id").createTempView("t1")
         spark.range(20).repartition($"id").createTempView("t2")
@@ -1091,7 +1107,9 @@ class PlannerSuite extends SharedSparkSession with AdaptiveSparkPlanHelper {
   }
 
   test("sort order doesn't have repeated expressions") {
-    withSQLConf(SQLConf.AUTO_BROADCASTJOIN_THRESHOLD.key -> "-1") {
+    withSQLConf(
+      SQLConf.AUTO_BROADCASTJOIN_THRESHOLD.key -> "-1",
+      SQLConf.ADAPTIVE_EXECUTION_ENABLED.key -> "false") {
       withTempView("t1", "t2") {
         spark.range(10).repartition($"id").createTempView("t1")
         spark.range(20).repartition($"id").createTempView("t2")
@@ -1117,7 +1135,9 @@ class PlannerSuite extends SharedSparkSession with AdaptiveSparkPlanHelper {
   }
 
   test("aliases to expressions should not be replaced") {
-    withSQLConf(SQLConf.AUTO_BROADCASTJOIN_THRESHOLD.key -> "-1") {
+    withSQLConf(
+      SQLConf.AUTO_BROADCASTJOIN_THRESHOLD.key -> "-1",
+      SQLConf.ADAPTIVE_EXECUTION_ENABLED.key -> "false") {
       withTempView("df1", "df2") {
         spark.range(10).selectExpr("id AS key", "0").repartition($"key").createTempView("df1")
         spark.range(20).selectExpr("id AS key", "0").repartition($"key").createTempView("df2")
@@ -1143,7 +1163,9 @@ class PlannerSuite extends SharedSparkSession with AdaptiveSparkPlanHelper {
   }
 
   test("aliases in the aggregate expressions should not introduce extra shuffle") {
-    withSQLConf(SQLConf.AUTO_BROADCASTJOIN_THRESHOLD.key -> "-1") {
+    withSQLConf(
+      SQLConf.AUTO_BROADCASTJOIN_THRESHOLD.key -> "-1",
+      SQLConf.ADAPTIVE_EXECUTION_ENABLED.key -> "false") {
       val t1 = spark.range(10).selectExpr("floor(id/4) as k1")
       val t2 = spark.range(20).selectExpr("floor(id/4) as k2")
 
@@ -1160,7 +1182,9 @@ class PlannerSuite extends SharedSparkSession with AdaptiveSparkPlanHelper {
   }
 
   test("aliases in the object hash/sort aggregate expressions should not introduce extra shuffle") {
-    withSQLConf(SQLConf.AUTO_BROADCASTJOIN_THRESHOLD.key -> "-1") {
+    withSQLConf(
+      SQLConf.AUTO_BROADCASTJOIN_THRESHOLD.key -> "-1",
+      SQLConf.ADAPTIVE_EXECUTION_ENABLED.key -> "false") {
       Seq(true, false).foreach { useObjectHashAgg =>
         withSQLConf(SQLConf.USE_OBJECT_HASH_AGG.key -> useObjectHashAgg.toString) {
           val t1 = spark.range(10).selectExpr("floor(id/4) as k1")
@@ -1185,21 +1209,22 @@ class PlannerSuite extends SharedSparkSession with AdaptiveSparkPlanHelper {
   }
 
   test("aliases in the sort aggregate expressions should not introduce extra sort") {
-    withSQLConf(SQLConf.AUTO_BROADCASTJOIN_THRESHOLD.key -> "-1") {
-      withSQLConf(SQLConf.USE_OBJECT_HASH_AGG.key -> "false") {
-        val t1 = spark.range(10).selectExpr("floor(id/4) as k1")
-        val t2 = spark.range(20).selectExpr("floor(id/4) as k2")
+    withSQLConf(
+      SQLConf.AUTO_BROADCASTJOIN_THRESHOLD.key -> "-1",
+      SQLConf.ADAPTIVE_EXECUTION_ENABLED.key -> "false",
+      SQLConf.USE_OBJECT_HASH_AGG.key -> "false") {
+      val t1 = spark.range(10).selectExpr("floor(id/4) as k1")
+      val t2 = spark.range(20).selectExpr("floor(id/4) as k2")
 
-        val agg1 = t1.groupBy("k1").agg(collect_list("k1")).withColumnRenamed("k1", "k3")
-        val agg2 = t2.groupBy("k2").agg(collect_list("k2"))
+      val agg1 = t1.groupBy("k1").agg(collect_list("k1")).withColumnRenamed("k1", "k3")
+      val agg2 = t2.groupBy("k2").agg(collect_list("k2"))
 
-        val planned = agg1.join(agg2, $"k3" === $"k2").queryExecution.executedPlan
-        assert(planned.collect { case s: SortAggregateExec => s }.nonEmpty)
+      val planned = agg1.join(agg2, $"k3" === $"k2").queryExecution.executedPlan
+      assert(planned.collect { case s: SortAggregateExec => s }.nonEmpty)
 
-        // We expect two SortExec nodes on each side of join.
-        val sorts = planned.collect { case s: SortExec => s }
-        assert(sorts.size == 4)
-      }
+      // We expect two SortExec nodes on each side of join.
+      val sorts = planned.collect { case s: SortExec => s }
+      assert(sorts.size == 4)
     }
   }
 
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/sources/BucketedReadSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/sources/BucketedReadSuite.scala
index 167e87d..0ff9303 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/sources/BucketedReadSuite.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/sources/BucketedReadSuite.scala
@@ -28,7 +28,7 @@ import org.apache.spark.sql.catalyst.expressions
 import org.apache.spark.sql.catalyst.expressions._
 import org.apache.spark.sql.catalyst.plans.physical.HashPartitioning
 import org.apache.spark.sql.execution.{FileSourceScanExec, SortExec, SparkPlan}
-import org.apache.spark.sql.execution.adaptive.AdaptiveSparkPlanExec
+import org.apache.spark.sql.execution.adaptive.{AdaptiveSparkPlanExec, DisableAdaptiveExecutionSuite}
 import org.apache.spark.sql.execution.datasources.BucketingUtils
 import org.apache.spark.sql.execution.exchange.ShuffleExchangeExec
 import org.apache.spark.sql.execution.joins.SortMergeJoinExec
@@ -39,7 +39,8 @@ import org.apache.spark.sql.test.{SharedSparkSession, SQLTestUtils}
 import org.apache.spark.util.Utils
 import org.apache.spark.util.collection.BitSet
 
-class BucketedReadWithoutHiveSupportSuite extends BucketedReadSuite with SharedSparkSession {
+class BucketedReadWithoutHiveSupportSuite
+  extends BucketedReadSuite with DisableAdaptiveExecutionSuite with SharedSparkSession {
   protected override def beforeAll(): Unit = {
     super.beforeAll()
     assert(spark.sparkContext.conf.get(CATALOG_IMPLEMENTATION) == "in-memory")
diff --git a/sql/core/src/test/scala/org/apache/spark/status/api/v1/sql/SqlResourceWithActualMetricsSuite.scala b/sql/core/src/test/scala/org/apache/spark/status/api/v1/sql/SqlResourceWithActualMetricsSuite.scala
index 0c0e3ac..1510e89 100644
--- a/sql/core/src/test/scala/org/apache/spark/status/api/v1/sql/SqlResourceWithActualMetricsSuite.scala
+++ b/sql/core/src/test/scala/org/apache/spark/status/api/v1/sql/SqlResourceWithActualMetricsSuite.scala
@@ -26,7 +26,9 @@ import org.json4s.jackson.JsonMethods
 import org.apache.spark.SparkConf
 import org.apache.spark.deploy.history.HistoryServerSuite.getContentAndCode
 import org.apache.spark.sql.DataFrame
+import org.apache.spark.sql.catalyst.plans.SQLHelper
 import org.apache.spark.sql.execution.metric.SQLMetricsTestUtils
+import org.apache.spark.sql.internal.SQLConf.ADAPTIVE_EXECUTION_ENABLED
 import org.apache.spark.sql.test.SharedSparkSession
 
 case class Person(id: Int, name: String, age: Int)
@@ -35,7 +37,8 @@ case class Salary(personId: Int, salary: Double)
 /**
  * Sql Resource Public API Unit Tests running query and extracting the metrics.
  */
-class SqlResourceWithActualMetricsSuite extends SharedSparkSession with SQLMetricsTestUtils {
+class SqlResourceWithActualMetricsSuite
+  extends SharedSparkSession with SQLMetricsTestUtils with SQLHelper {
 
   import testImplicits._
 
@@ -52,8 +55,10 @@ class SqlResourceWithActualMetricsSuite extends SharedSparkSession with SQLMetri
 
   test("Check Sql Rest Api Endpoints") {
     // Materalize result DataFrame
-    val count = getDF().count()
-    assert(count == 2, s"Expected Query Count is 2 but received: $count")
+    withSQLConf(ADAPTIVE_EXECUTION_ENABLED.key -> "false") {
+      val count = getDF().count()
+      assert(count == 2, s"Expected Query Count is 2 but received: $count")
+    }
 
     // Spark apps launched by local-mode seems not having `attemptId` as default
     // so UT is just added for existing endpoints.


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@spark.apache.org
For additional commands, e-mail: commits-help@spark.apache.org