You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@spark.apache.org by do...@apache.org on 2020/12/08 03:07:59 UTC
[spark] branch branch-3.1 updated:
[SPARK-33680][SQL][TESTS][FOLLOWUP] Fix more test suites to have explicit
confs
This is an automated email from the ASF dual-hosted git repository.
dongjoon pushed a commit to branch branch-3.1
in repository https://gitbox.apache.org/repos/asf/spark.git
The following commit(s) were added to refs/heads/branch-3.1 by this push:
new 925b2a8 [SPARK-33680][SQL][TESTS][FOLLOWUP] Fix more test suites to have explicit confs
925b2a8 is described below
commit 925b2a815d609dc18451b3d679b419df3f7689cf
Author: Dongjoon Hyun <do...@apache.org>
AuthorDate: Mon Dec 7 18:59:15 2020 -0800
[SPARK-33680][SQL][TESTS][FOLLOWUP] Fix more test suites to have explicit confs
### What changes were proposed in this pull request?
This is a follow-up for SPARK-33680 to remove the assumption on the default value of `spark.sql.adaptive.enabled` .
### Why are the changes needed?
According to the test result https://github.com/apache/spark/pull/30628#issuecomment-739866168, the [previous run](https://github.com/apache/spark/pull/30628#issuecomment-739641105) didn't run all tests.
### Does this PR introduce _any_ user-facing change?
No.
### How was this patch tested?
Pass the CIs.
Closes #30655 from dongjoon-hyun/SPARK-33680.
Authored-by: Dongjoon Hyun <do...@apache.org>
Signed-off-by: Dongjoon Hyun <do...@apache.org>
(cherry picked from commit b2a79306ef7b330c5bf4dc1337ed80ebd6e08d0c)
Signed-off-by: Dongjoon Hyun <do...@apache.org>
---
.../apache/spark/sql/DataFrameAggregateSuite.scala | 4 +-
.../org/apache/spark/sql/DataFrameJoinSuite.scala | 4 +-
.../scala/org/apache/spark/sql/JoinSuite.scala | 9 ++-
.../apache/spark/sql/execution/PlannerSuite.scala | 73 +++++++++++++++-------
.../spark/sql/sources/BucketedReadSuite.scala | 5 +-
.../v1/sql/SqlResourceWithActualMetricsSuite.scala | 11 +++-
6 files changed, 74 insertions(+), 32 deletions(-)
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/DataFrameAggregateSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/DataFrameAggregateSuite.scala
index d4e64aa..78983a4 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/DataFrameAggregateSuite.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/DataFrameAggregateSuite.scala
@@ -1001,7 +1001,9 @@ class DataFrameAggregateSuite extends QueryTest
Seq(true, false).foreach { value =>
test(s"SPARK-31620: agg with subquery (whole-stage-codegen = $value)") {
- withSQLConf(SQLConf.WHOLESTAGE_CODEGEN_ENABLED.key -> value.toString) {
+ withSQLConf(
+ SQLConf.WHOLESTAGE_CODEGEN_ENABLED.key -> value.toString,
+ SQLConf.ADAPTIVE_EXECUTION_ENABLED.key -> "false") {
withTempView("t1", "t2") {
sql("create temporary view t1 as select * from values (1, 2) as t1(a, b)")
sql("create temporary view t2 as select * from values (3, 4) as t2(c, d)")
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/DataFrameJoinSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/DataFrameJoinSuite.scala
index 14d03a3..c317f56 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/DataFrameJoinSuite.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/DataFrameJoinSuite.scala
@@ -335,7 +335,9 @@ class DataFrameJoinSuite extends QueryTest
withTempDatabase { dbName =>
withTable(table1Name, table2Name) {
- withSQLConf(SQLConf.AUTO_BROADCASTJOIN_THRESHOLD.key -> "-1") {
+ withSQLConf(
+ SQLConf.AUTO_BROADCASTJOIN_THRESHOLD.key -> "-1",
+ SQLConf.ADAPTIVE_EXECUTION_ENABLED.key -> "false") {
spark.range(50).write.saveAsTable(s"$dbName.$table1Name")
spark.range(100).write.saveAsTable(s"$dbName.$table2Name")
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/JoinSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/JoinSuite.scala
index 8755dcc..a728e5c 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/JoinSuite.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/JoinSuite.scala
@@ -1107,6 +1107,7 @@ class JoinSuite extends QueryTest with SharedSparkSession with AdaptiveSparkPlan
test("SPARK-32330: Preserve shuffled hash join build side partitioning") {
withSQLConf(
+ SQLConf.ADAPTIVE_EXECUTION_ENABLED.key -> "false",
SQLConf.AUTO_BROADCASTJOIN_THRESHOLD.key -> "50",
SQLConf.SHUFFLE_PARTITIONS.key -> "2",
SQLConf.PREFER_SORTMERGEJOIN.key -> "false") {
@@ -1130,6 +1131,7 @@ class JoinSuite extends QueryTest with SharedSparkSession with AdaptiveSparkPlan
// Test broadcast hash join
withSQLConf(
+ SQLConf.ADAPTIVE_EXECUTION_ENABLED.key -> "false",
SQLConf.AUTO_BROADCASTJOIN_THRESHOLD.key -> "50") {
Seq("inner", "left_outer").foreach(joinType => {
val plan = df1.join(df2, $"k1" === $"k2", joinType)
@@ -1146,6 +1148,7 @@ class JoinSuite extends QueryTest with SharedSparkSession with AdaptiveSparkPlan
// Test shuffled hash join
withSQLConf(
+ SQLConf.ADAPTIVE_EXECUTION_ENABLED.key -> "false",
SQLConf.AUTO_BROADCASTJOIN_THRESHOLD.key -> "50",
SQLConf.SHUFFLE_PARTITIONS.key -> "2",
SQLConf.PREFER_SORTMERGEJOIN.key -> "false") {
@@ -1253,6 +1256,7 @@ class JoinSuite extends QueryTest with SharedSparkSession with AdaptiveSparkPlan
withSQLConf(
// Set broadcast join threshold and number of shuffle partitions,
// as shuffled hash join depends on these two configs.
+ SQLConf.ADAPTIVE_EXECUTION_ENABLED.key -> "false",
SQLConf.AUTO_BROADCASTJOIN_THRESHOLD.key -> "80",
SQLConf.SHUFFLE_PARTITIONS.key -> "2") {
val smjDF = df1.join(df2, joinExprs, "full")
@@ -1284,7 +1288,9 @@ class JoinSuite extends QueryTest with SharedSparkSession with AdaptiveSparkPlan
)
inputDFs.foreach { case (df1, df2, joinType) =>
// Test broadcast hash join
- withSQLConf(SQLConf.AUTO_BROADCASTJOIN_THRESHOLD.key -> "200") {
+ withSQLConf(
+ SQLConf.AUTO_BROADCASTJOIN_THRESHOLD.key -> "200",
+ SQLConf.ADAPTIVE_EXECUTION_ENABLED.key -> "false") {
val bhjCodegenDF = df1.join(df2, $"k1" === $"k2", joinType)
assert(bhjCodegenDF.queryExecution.executedPlan.collect {
case WholeStageCodegenExec(_ : BroadcastHashJoinExec) => true
@@ -1305,6 +1311,7 @@ class JoinSuite extends QueryTest with SharedSparkSession with AdaptiveSparkPlan
// Set broadcast join threshold and number of shuffle partitions,
// as shuffled hash join depends on these two configs.
SQLConf.AUTO_BROADCASTJOIN_THRESHOLD.key -> "50",
+ SQLConf.ADAPTIVE_EXECUTION_ENABLED.key -> "false",
SQLConf.SHUFFLE_PARTITIONS.key -> "2") {
val shjCodegenDF = df1.join(df2, $"k1" === $"k2", joinType)
assert(shjCodegenDF.queryExecution.executedPlan.collect {
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/PlannerSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/PlannerSuite.scala
index 5e30f84..4e01d1c 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/execution/PlannerSuite.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/PlannerSuite.scala
@@ -877,7 +877,9 @@ class PlannerSuite extends SharedSparkSession with AdaptiveSparkPlanHelper {
}
test("aliases in the project should not introduce extra shuffle") {
- withSQLConf(SQLConf.AUTO_BROADCASTJOIN_THRESHOLD.key -> "-1") {
+ withSQLConf(
+ SQLConf.AUTO_BROADCASTJOIN_THRESHOLD.key -> "-1",
+ SQLConf.ADAPTIVE_EXECUTION_ENABLED.key -> "false") {
withTempView("df1", "df2") {
spark.range(10).selectExpr("id AS key", "0").repartition($"key").createTempView("df1")
spark.range(20).selectExpr("id AS key", "0").repartition($"key").createTempView("df2")
@@ -897,7 +899,9 @@ class PlannerSuite extends SharedSparkSession with AdaptiveSparkPlanHelper {
test("SPARK-33399: aliases should be handled properly in PartitioningCollection output" +
" partitioning") {
- withSQLConf(SQLConf.AUTO_BROADCASTJOIN_THRESHOLD.key -> "-1") {
+ withSQLConf(
+ SQLConf.AUTO_BROADCASTJOIN_THRESHOLD.key -> "-1",
+ SQLConf.ADAPTIVE_EXECUTION_ENABLED.key -> "false") {
withTempView("t1", "t2", "t3") {
spark.range(10).repartition($"id").createTempView("t1")
spark.range(20).repartition($"id").createTempView("t2")
@@ -927,7 +931,9 @@ class PlannerSuite extends SharedSparkSession with AdaptiveSparkPlanHelper {
}
test("SPARK-33399: aliases should be handled properly in HashPartitioning") {
- withSQLConf(SQLConf.AUTO_BROADCASTJOIN_THRESHOLD.key -> "-1") {
+ withSQLConf(
+ SQLConf.AUTO_BROADCASTJOIN_THRESHOLD.key -> "-1",
+ SQLConf.ADAPTIVE_EXECUTION_ENABLED.key -> "false") {
withTempView("t1", "t2", "t3") {
spark.range(10).repartition($"id").createTempView("t1")
spark.range(20).repartition($"id").createTempView("t2")
@@ -955,7 +961,9 @@ class PlannerSuite extends SharedSparkSession with AdaptiveSparkPlanHelper {
}
test("SPARK-33399: alias handling should happen properly for RangePartitioning") {
- withSQLConf(SQLConf.AUTO_BROADCASTJOIN_THRESHOLD.key -> "-1") {
+ withSQLConf(
+ SQLConf.AUTO_BROADCASTJOIN_THRESHOLD.key -> "-1",
+ SQLConf.ADAPTIVE_EXECUTION_ENABLED.key -> "false") {
val df = spark.range(1, 100)
.select(col("id").as("id1")).groupBy("id1").count()
// Plan for this will be Range -> ProjectWithAlias -> HashAggregate -> HashAggregate
@@ -976,7 +984,9 @@ class PlannerSuite extends SharedSparkSession with AdaptiveSparkPlanHelper {
test("SPARK-33399: aliased should be handled properly " +
"for partitioning and sortorder involving complex expressions") {
- withSQLConf(SQLConf.AUTO_BROADCASTJOIN_THRESHOLD.key -> "-1") {
+ withSQLConf(
+ SQLConf.AUTO_BROADCASTJOIN_THRESHOLD.key -> "-1",
+ SQLConf.ADAPTIVE_EXECUTION_ENABLED.key -> "false") {
withTempView("t1", "t2", "t3") {
spark.range(10).select(col("id").as("id1")).createTempView("t1")
spark.range(20).select(col("id").as("id2")).createTempView("t2")
@@ -1014,7 +1024,9 @@ class PlannerSuite extends SharedSparkSession with AdaptiveSparkPlanHelper {
}
test("SPARK-33399: alias handling should happen properly for SinglePartition") {
- withSQLConf(SQLConf.AUTO_BROADCASTJOIN_THRESHOLD.key -> "-1") {
+ withSQLConf(
+ SQLConf.AUTO_BROADCASTJOIN_THRESHOLD.key -> "-1",
+ SQLConf.ADAPTIVE_EXECUTION_ENABLED.key -> "false") {
val df = spark.range(1, 100, 1, 1)
.select(col("id").as("id1")).groupBy("id1").count()
val planned = df.queryExecution.executedPlan
@@ -1031,7 +1043,9 @@ class PlannerSuite extends SharedSparkSession with AdaptiveSparkPlanHelper {
test("SPARK-33399: No extra exchanges in case of" +
" [Inner Join -> Project with aliases -> HashAggregate]") {
- withSQLConf(SQLConf.AUTO_BROADCASTJOIN_THRESHOLD.key -> "-1") {
+ withSQLConf(
+ SQLConf.AUTO_BROADCASTJOIN_THRESHOLD.key -> "-1",
+ SQLConf.ADAPTIVE_EXECUTION_ENABLED.key -> "false") {
withTempView("t1", "t2") {
spark.range(10).repartition($"id").createTempView("t1")
spark.range(20).repartition($"id").createTempView("t2")
@@ -1060,7 +1074,9 @@ class PlannerSuite extends SharedSparkSession with AdaptiveSparkPlanHelper {
}
test("SPARK-33400: Normalization of sortOrder should take care of sameOrderExprs") {
- withSQLConf(SQLConf.AUTO_BROADCASTJOIN_THRESHOLD.key -> "-1") {
+ withSQLConf(
+ SQLConf.AUTO_BROADCASTJOIN_THRESHOLD.key -> "-1",
+ SQLConf.ADAPTIVE_EXECUTION_ENABLED.key -> "false") {
withTempView("t1", "t2", "t3") {
spark.range(10).repartition($"id").createTempView("t1")
spark.range(20).repartition($"id").createTempView("t2")
@@ -1091,7 +1107,9 @@ class PlannerSuite extends SharedSparkSession with AdaptiveSparkPlanHelper {
}
test("sort order doesn't have repeated expressions") {
- withSQLConf(SQLConf.AUTO_BROADCASTJOIN_THRESHOLD.key -> "-1") {
+ withSQLConf(
+ SQLConf.AUTO_BROADCASTJOIN_THRESHOLD.key -> "-1",
+ SQLConf.ADAPTIVE_EXECUTION_ENABLED.key -> "false") {
withTempView("t1", "t2") {
spark.range(10).repartition($"id").createTempView("t1")
spark.range(20).repartition($"id").createTempView("t2")
@@ -1117,7 +1135,9 @@ class PlannerSuite extends SharedSparkSession with AdaptiveSparkPlanHelper {
}
test("aliases to expressions should not be replaced") {
- withSQLConf(SQLConf.AUTO_BROADCASTJOIN_THRESHOLD.key -> "-1") {
+ withSQLConf(
+ SQLConf.AUTO_BROADCASTJOIN_THRESHOLD.key -> "-1",
+ SQLConf.ADAPTIVE_EXECUTION_ENABLED.key -> "false") {
withTempView("df1", "df2") {
spark.range(10).selectExpr("id AS key", "0").repartition($"key").createTempView("df1")
spark.range(20).selectExpr("id AS key", "0").repartition($"key").createTempView("df2")
@@ -1143,7 +1163,9 @@ class PlannerSuite extends SharedSparkSession with AdaptiveSparkPlanHelper {
}
test("aliases in the aggregate expressions should not introduce extra shuffle") {
- withSQLConf(SQLConf.AUTO_BROADCASTJOIN_THRESHOLD.key -> "-1") {
+ withSQLConf(
+ SQLConf.AUTO_BROADCASTJOIN_THRESHOLD.key -> "-1",
+ SQLConf.ADAPTIVE_EXECUTION_ENABLED.key -> "false") {
val t1 = spark.range(10).selectExpr("floor(id/4) as k1")
val t2 = spark.range(20).selectExpr("floor(id/4) as k2")
@@ -1160,7 +1182,9 @@ class PlannerSuite extends SharedSparkSession with AdaptiveSparkPlanHelper {
}
test("aliases in the object hash/sort aggregate expressions should not introduce extra shuffle") {
- withSQLConf(SQLConf.AUTO_BROADCASTJOIN_THRESHOLD.key -> "-1") {
+ withSQLConf(
+ SQLConf.AUTO_BROADCASTJOIN_THRESHOLD.key -> "-1",
+ SQLConf.ADAPTIVE_EXECUTION_ENABLED.key -> "false") {
Seq(true, false).foreach { useObjectHashAgg =>
withSQLConf(SQLConf.USE_OBJECT_HASH_AGG.key -> useObjectHashAgg.toString) {
val t1 = spark.range(10).selectExpr("floor(id/4) as k1")
@@ -1185,21 +1209,22 @@ class PlannerSuite extends SharedSparkSession with AdaptiveSparkPlanHelper {
}
test("aliases in the sort aggregate expressions should not introduce extra sort") {
- withSQLConf(SQLConf.AUTO_BROADCASTJOIN_THRESHOLD.key -> "-1") {
- withSQLConf(SQLConf.USE_OBJECT_HASH_AGG.key -> "false") {
- val t1 = spark.range(10).selectExpr("floor(id/4) as k1")
- val t2 = spark.range(20).selectExpr("floor(id/4) as k2")
+ withSQLConf(
+ SQLConf.AUTO_BROADCASTJOIN_THRESHOLD.key -> "-1",
+ SQLConf.ADAPTIVE_EXECUTION_ENABLED.key -> "false",
+ SQLConf.USE_OBJECT_HASH_AGG.key -> "false") {
+ val t1 = spark.range(10).selectExpr("floor(id/4) as k1")
+ val t2 = spark.range(20).selectExpr("floor(id/4) as k2")
- val agg1 = t1.groupBy("k1").agg(collect_list("k1")).withColumnRenamed("k1", "k3")
- val agg2 = t2.groupBy("k2").agg(collect_list("k2"))
+ val agg1 = t1.groupBy("k1").agg(collect_list("k1")).withColumnRenamed("k1", "k3")
+ val agg2 = t2.groupBy("k2").agg(collect_list("k2"))
- val planned = agg1.join(agg2, $"k3" === $"k2").queryExecution.executedPlan
- assert(planned.collect { case s: SortAggregateExec => s }.nonEmpty)
+ val planned = agg1.join(agg2, $"k3" === $"k2").queryExecution.executedPlan
+ assert(planned.collect { case s: SortAggregateExec => s }.nonEmpty)
- // We expect two SortExec nodes on each side of join.
- val sorts = planned.collect { case s: SortExec => s }
- assert(sorts.size == 4)
- }
+ // We expect two SortExec nodes on each side of join.
+ val sorts = planned.collect { case s: SortExec => s }
+ assert(sorts.size == 4)
}
}
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/sources/BucketedReadSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/sources/BucketedReadSuite.scala
index 167e87d..0ff9303 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/sources/BucketedReadSuite.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/sources/BucketedReadSuite.scala
@@ -28,7 +28,7 @@ import org.apache.spark.sql.catalyst.expressions
import org.apache.spark.sql.catalyst.expressions._
import org.apache.spark.sql.catalyst.plans.physical.HashPartitioning
import org.apache.spark.sql.execution.{FileSourceScanExec, SortExec, SparkPlan}
-import org.apache.spark.sql.execution.adaptive.AdaptiveSparkPlanExec
+import org.apache.spark.sql.execution.adaptive.{AdaptiveSparkPlanExec, DisableAdaptiveExecutionSuite}
import org.apache.spark.sql.execution.datasources.BucketingUtils
import org.apache.spark.sql.execution.exchange.ShuffleExchangeExec
import org.apache.spark.sql.execution.joins.SortMergeJoinExec
@@ -39,7 +39,8 @@ import org.apache.spark.sql.test.{SharedSparkSession, SQLTestUtils}
import org.apache.spark.util.Utils
import org.apache.spark.util.collection.BitSet
-class BucketedReadWithoutHiveSupportSuite extends BucketedReadSuite with SharedSparkSession {
+class BucketedReadWithoutHiveSupportSuite
+ extends BucketedReadSuite with DisableAdaptiveExecutionSuite with SharedSparkSession {
protected override def beforeAll(): Unit = {
super.beforeAll()
assert(spark.sparkContext.conf.get(CATALOG_IMPLEMENTATION) == "in-memory")
diff --git a/sql/core/src/test/scala/org/apache/spark/status/api/v1/sql/SqlResourceWithActualMetricsSuite.scala b/sql/core/src/test/scala/org/apache/spark/status/api/v1/sql/SqlResourceWithActualMetricsSuite.scala
index 0c0e3ac..1510e89 100644
--- a/sql/core/src/test/scala/org/apache/spark/status/api/v1/sql/SqlResourceWithActualMetricsSuite.scala
+++ b/sql/core/src/test/scala/org/apache/spark/status/api/v1/sql/SqlResourceWithActualMetricsSuite.scala
@@ -26,7 +26,9 @@ import org.json4s.jackson.JsonMethods
import org.apache.spark.SparkConf
import org.apache.spark.deploy.history.HistoryServerSuite.getContentAndCode
import org.apache.spark.sql.DataFrame
+import org.apache.spark.sql.catalyst.plans.SQLHelper
import org.apache.spark.sql.execution.metric.SQLMetricsTestUtils
+import org.apache.spark.sql.internal.SQLConf.ADAPTIVE_EXECUTION_ENABLED
import org.apache.spark.sql.test.SharedSparkSession
case class Person(id: Int, name: String, age: Int)
@@ -35,7 +37,8 @@ case class Salary(personId: Int, salary: Double)
/**
* Sql Resource Public API Unit Tests running query and extracting the metrics.
*/
-class SqlResourceWithActualMetricsSuite extends SharedSparkSession with SQLMetricsTestUtils {
+class SqlResourceWithActualMetricsSuite
+ extends SharedSparkSession with SQLMetricsTestUtils with SQLHelper {
import testImplicits._
@@ -52,8 +55,10 @@ class SqlResourceWithActualMetricsSuite extends SharedSparkSession with SQLMetri
test("Check Sql Rest Api Endpoints") {
// Materalize result DataFrame
- val count = getDF().count()
- assert(count == 2, s"Expected Query Count is 2 but received: $count")
+ withSQLConf(ADAPTIVE_EXECUTION_ENABLED.key -> "false") {
+ val count = getDF().count()
+ assert(count == 2, s"Expected Query Count is 2 but received: $count")
+ }
// Spark apps launched by local-mode seems not having `attemptId` as default
// so UT is just added for existing endpoints.
---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@spark.apache.org
For additional commands, e-mail: commits-help@spark.apache.org