You are viewing a plain text version of this content. The canonical link for it is here.
Posted to reviews@spark.apache.org by GitBox <gi...@apache.org> on 2020/03/05 04:47:30 UTC

[GitHub] [spark] JkSelf commented on a change in pull request #27793: [SPARK-31037][SQL] refine AQE config names

JkSelf commented on a change in pull request #27793: [SPARK-31037][SQL] refine AQE config names
URL: https://github.com/apache/spark/pull/27793#discussion_r388076296
 
 

 ##########
 File path: sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala
 ##########
 @@ -388,99 +397,98 @@ object SQLConf {
     .internal()
     .doc("Adaptive query execution is skipped when the query does not have exchanges or " +
       "sub-queries. By setting this config to true (together with " +
-      s"'${ADAPTIVE_EXECUTION_ENABLED.key}' enabled), Spark will force apply adaptive query " +
+      s"'${ADAPTIVE_EXECUTION_ENABLED.key}' set to true), Spark will force apply adaptive query " +
       "execution for all supported queries.")
     .version("3.0.0")
     .booleanConf
     .createWithDefault(false)
 
-  val REDUCE_POST_SHUFFLE_PARTITIONS_ENABLED =
-    buildConf("spark.sql.adaptive.shuffle.reducePostShufflePartitions")
-      .doc(s"When true and '${ADAPTIVE_EXECUTION_ENABLED.key}' is enabled, this enables reducing " +
-        "the number of post-shuffle partitions based on map output statistics.")
+  val ADVISORY_SHUFFLE_PARTITION_SIZE_IN_BYTES =
+    buildConf("spark.sql.adaptive.advisoryShufflePartitionSizeInBytes")
+      .doc("The advisory size in bytes of the shuffle partition during adaptive optimization. " +
+        "It takes effect when Spark coalesces small shuffle partitions or splits skewed shuffle " +
+        "partition.")
       .version("3.0.0")
-      .booleanConf
-      .createWithDefault(true)
+      .fallbackConf(SHUFFLE_TARGET_POSTSHUFFLE_INPUT_SIZE)
 
-  val FETCH_SHUFFLE_BLOCKS_IN_BATCH_ENABLED =
-    buildConf("spark.sql.adaptive.shuffle.fetchShuffleBlocksInBatch")
-      .doc("Whether to fetch the continuous shuffle blocks in batch. Instead of fetching blocks " +
-        "one by one, fetching continuous shuffle blocks for the same map task in batch can " +
-        "reduce IO and improve performance. Note, multiple continuous blocks exist in single " +
-        s"fetch request only happen when '${ADAPTIVE_EXECUTION_ENABLED.key}' and " +
-        s"'${REDUCE_POST_SHUFFLE_PARTITIONS_ENABLED.key}' is enabled, this feature also depends " +
-        "on a relocatable serializer, the concatenation support codec in use and the new version " +
-        "shuffle fetch protocol.")
+  val COALESCE_SHUFFLE_PARTITIONS_ENABLED =
+    buildConf("spark.sql.adaptive.coalesceShufflePartitions.enabled")
+      .doc(s"When true and '${ADAPTIVE_EXECUTION_ENABLED.key}' is true, Spark will coalesce " +
+        "contiguous shuffle partitions according to the target size (specified by " +
+        s"'${ADVISORY_SHUFFLE_PARTITION_SIZE_IN_BYTES.key}'), to avoid too many small tasks.")
       .version("3.0.0")
       .booleanConf
       .createWithDefault(true)
 
-  val SHUFFLE_MIN_NUM_POSTSHUFFLE_PARTITIONS =
-    buildConf("spark.sql.adaptive.shuffle.minNumPostShufflePartitions")
-      .doc("The advisory minimum number of post-shuffle partitions used when " +
-        s"'${ADAPTIVE_EXECUTION_ENABLED.key}' and " +
-        s"'${REDUCE_POST_SHUFFLE_PARTITIONS_ENABLED.key}' is enabled.")
+  val COALESCE_SHUFFLE_PARTITIONS_MIN_PARTITION_NUM =
+    buildConf("spark.sql.adaptive.coalesceShufflePartitions.minPartitionNum")
+      .doc("The minimum number of shuffle partitions after coalescing. This configuration only " +
+        s"has an effect when '${ADAPTIVE_EXECUTION_ENABLED.key}' and " +
+        s"'${COALESCE_SHUFFLE_PARTITIONS_ENABLED.key}' are both true.")
       .version("3.0.0")
       .intConf
-      .checkValue(_ > 0, "The minimum shuffle partition number " +
-        "must be a positive integer.")
+      .checkValue(_ > 0, "The minimum number of partitions must be positive.")
       .createWithDefault(1)
 
-  val SHUFFLE_TARGET_POSTSHUFFLE_INPUT_SIZE =
-    buildConf("spark.sql.adaptive.shuffle.targetPostShuffleInputSize")
-      .doc("The target post-shuffle input size in bytes of a task. This configuration only has " +
-        s"an effect when '${ADAPTIVE_EXECUTION_ENABLED.key}' and " +
-        s"'${REDUCE_POST_SHUFFLE_PARTITIONS_ENABLED.key}' is enabled.")
-      .version("1.6.0")
-      .bytesConf(ByteUnit.BYTE)
-      .createWithDefaultString("64MB")
-
-  val SHUFFLE_MAX_NUM_POSTSHUFFLE_PARTITIONS =
-    buildConf("spark.sql.adaptive.shuffle.maxNumPostShufflePartitions")
-      .doc("The advisory maximum number of post-shuffle partitions used in adaptive execution. " +
-        "This is used as the initial number of pre-shuffle partitions. By default it equals to " +
-        "spark.sql.shuffle.partitions. This configuration only has an effect when " +
-        s"'${ADAPTIVE_EXECUTION_ENABLED.key}' and " +
-        s"'${REDUCE_POST_SHUFFLE_PARTITIONS_ENABLED.key}' is enabled.")
+  val COALESCE_SHUFFLE_PARTITIONS_INITIAL_PARTITION_NUM =
+    buildConf("spark.sql.adaptive.coalesceShufflePartitions.initialPartitionNum")
+      .doc("The initial number of shuffle partitions before coalescing. By default it equals to " +
+        s"${SHUFFLE_PARTITIONS.key}. This configuration only has an effect when " +
+        s"'${ADAPTIVE_EXECUTION_ENABLED.key}' and '${COALESCE_SHUFFLE_PARTITIONS_ENABLED.key}' " +
+        "are both true.")
       .version("3.0.0")
       .intConf
-      .checkValue(_ > 0, "The maximum shuffle partition number " +
-        "must be a positive integer.")
+      .checkValue(_ > 0, "The initial number of partitions must be positive.")
       .createOptional
 
+  val FETCH_SHUFFLE_BLOCKS_IN_BATCH =
+    buildConf("spark.sql.adaptive.fetchShuffleBlocksInBatch")
+      .internal()
+      .doc("Whether to fetch the contiguous shuffle blocks in batch. Instead of fetching blocks " +
+        "one by one, fetching contiguous shuffle blocks for the same map task in batch can " +
+        "reduce IO and improve performance. Note, multiple contiguous blocks exist in single " +
+        s"fetch request only happen when '${ADAPTIVE_EXECUTION_ENABLED.key}' and " +
+        s"'${COALESCE_SHUFFLE_PARTITIONS_ENABLED.key}' are both true. This feature also depends " +
+        "on a relocatable serializer, the concatenation support codec in use and the new version " +
+        "shuffle fetch protocol.")
+      .version("3.0.0")
+      .booleanConf
+      .createWithDefault(true)
+
   val LOCAL_SHUFFLE_READER_ENABLED =
-    buildConf("spark.sql.adaptive.shuffle.localShuffleReader.enabled")
-    .doc(s"When true and '${ADAPTIVE_EXECUTION_ENABLED.key}' is enabled, this enables the " +
-      "optimization of converting the shuffle reader to local shuffle reader for the shuffle " +
-      "exchange of the broadcast hash join in probe side.")
-    .version("3.0.0")
-    .booleanConf
-    .createWithDefault(true)
+    buildConf("spark.sql.adaptive.localShuffleReader.enabled")
+      .doc(s"When true and '${ADAPTIVE_EXECUTION_ENABLED.key}' is true, Spark tries to use local " +
+        "shuffle reader to read the shuffle data of the probe side of a broadcast-hash join.")
 
 Review comment:
   local shuffle reader may optimize the local reader both build side and probe side?

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org