You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@spark.apache.org by ya...@apache.org on 2021/04/25 12:28:38 UTC

[spark] branch branch-3.0 updated: [SPARK-35168][SQL] mapred.reduce.tasks should be shuffle.partitions not adaptive.coalescePartitions.initialPartitionNum

This is an automated email from the ASF dual-hosted git repository.

yao pushed a commit to branch branch-3.0
in repository https://gitbox.apache.org/repos/asf/spark.git


The following commit(s) were added to refs/heads/branch-3.0 by this push:
     new 4fa0e7f  [SPARK-35168][SQL] mapred.reduce.tasks should be shuffle.partitions not adaptive.coalescePartitions.initialPartitionNum
4fa0e7f is described below

commit 4fa0e7fb2e62710a81c33d4b6041b92c9b484b67
Author: Kent Yao <ya...@apache.org>
AuthorDate: Sun Apr 25 20:27:12 2021 +0800

    [SPARK-35168][SQL] mapred.reduce.tasks should be shuffle.partitions not adaptive.coalescePartitions.initialPartitionNum
    
    ### What changes were proposed in this pull request?
    
    ```sql
    spark-sql> set spark.sql.adaptive.coalescePartitions.initialPartitionNum=1;
    spark.sql.adaptive.coalescePartitions.initialPartitionNum	1
    Time taken: 2.18 seconds, Fetched 1 row(s)
    spark-sql> set mapred.reduce.tasks;
    21/04/21 14:27:11 WARN SetCommand: Property mapred.reduce.tasks is deprecated, showing spark.sql.shuffle.partitions instead.
    spark.sql.shuffle.partitions	1
    Time taken: 0.03 seconds, Fetched 1 row(s)
    spark-sql> set spark.sql.shuffle.partitions;
    spark.sql.shuffle.partitions	200
    Time taken: 0.024 seconds, Fetched 1 row(s)
    spark-sql> set mapred.reduce.tasks=2;
    21/04/21 14:31:52 WARN SetCommand: Property mapred.reduce.tasks is deprecated, automatically converted to spark.sql.shuffle.partitions instead.
    spark.sql.shuffle.partitions	2
    Time taken: 0.017 seconds, Fetched 1 row(s)
    spark-sql> set mapred.reduce.tasks;
    21/04/21 14:31:55 WARN SetCommand: Property mapred.reduce.tasks is deprecated, showing spark.sql.shuffle.partitions instead.
    spark.sql.shuffle.partitions	1
    Time taken: 0.017 seconds, Fetched 1 row(s)
    spark-sql>
    ```
    
    `mapred.reduce.tasks` is mapping to `spark.sql.shuffle.partitions` at write-side, but `spark.sql.adaptive.coalescePartitions.initialPartitionNum` might take precede of `spark.sql.shuffle.partitions`
    
    ### Why are the changes needed?
    
    roundtrip for `mapred.reduce.tasks`
    
    ### Does this PR introduce _any_ user-facing change?
    
    yes, `mapred.reduce.tasks` will always report `spark.sql.shuffle.partitions` whether `spark.sql.adaptive.coalescePartitions.initialPartitionNum` exists or not.
    
    ### How was this patch tested?
    
    a new test
    
    Closes #32265 from yaooqinn/SPARK-35168.
    
    Authored-by: Kent Yao <ya...@apache.org>
    Signed-off-by: Kent Yao <ya...@apache.org>
    (cherry picked from commit 5b1353f690bf416fdb3a34c94741425b95f97308)
    Signed-off-by: Kent Yao <ya...@apache.org>
---
 .../apache/spark/sql/execution/command/SetCommand.scala   |  2 +-
 .../org/apache/spark/sql/internal/SQLConfSuite.scala      | 15 +++++++++++++++
 2 files changed, 16 insertions(+), 1 deletion(-)

diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/command/SetCommand.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/command/SetCommand.scala
index 3dc1d52..6d30a19 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/command/SetCommand.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/command/SetCommand.scala
@@ -138,7 +138,7 @@ case class SetCommand(kv: Option[(String, Option[String])]) extends RunnableComm
             s"showing ${SQLConf.SHUFFLE_PARTITIONS.key} instead.")
         Seq(Row(
           SQLConf.SHUFFLE_PARTITIONS.key,
-          sparkSession.sessionState.conf.numShufflePartitions.toString))
+          sparkSession.sessionState.conf.defaultNumShufflePartitions.toString))
       }
       (keyValueOutput, runFunc)
 
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/internal/SQLConfSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/internal/SQLConfSuite.scala
index d1d64fe..73bbfa0 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/internal/SQLConfSuite.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/internal/SQLConfSuite.scala
@@ -116,6 +116,21 @@ class SQLConfSuite extends QueryTest with SharedSparkSession {
     }
   }
 
+  test(s"SPARK-35168: ${SQLConf.Deprecated.MAPRED_REDUCE_TASKS} should respect" +
+      s" ${SQLConf.SHUFFLE_PARTITIONS.key}") {
+    spark.sessionState.conf.clear()
+    try {
+      sql(s"SET ${SQLConf.ADAPTIVE_EXECUTION_ENABLED.key}=true")
+      sql(s"SET ${SQLConf.COALESCE_PARTITIONS_ENABLED.key}=true")
+      sql(s"SET ${SQLConf.COALESCE_PARTITIONS_INITIAL_PARTITION_NUM.key}=1")
+      sql(s"SET ${SQLConf.SHUFFLE_PARTITIONS.key}=2")
+      checkAnswer(sql(s"SET ${SQLConf.Deprecated.MAPRED_REDUCE_TASKS}"),
+        Row(SQLConf.SHUFFLE_PARTITIONS.key, "2"))
+    } finally {
+      spark.sessionState.conf.clear()
+    }
+  }
+
   test("SPARK-31234: reset will not change static sql configs and spark core configs") {
     val conf = spark.sparkContext.getConf.getAll.toMap
     val appName = conf.get("spark.app.name")

---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@spark.apache.org
For additional commands, e-mail: commits-help@spark.apache.org