You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@spark.apache.org by mr...@apache.org on 2021/08/16 15:26:03 UTC

[spark] branch branch-3.2 updated: [SPARK-36374][SHUFFLE][DOC] Push-based shuffle high level user documentation

This is an automated email from the ASF dual-hosted git repository.

mridulm80 pushed a commit to branch branch-3.2
in repository https://gitbox.apache.org/repos/asf/spark.git


The following commit(s) were added to refs/heads/branch-3.2 by this push:
     new 233af3d  [SPARK-36374][SHUFFLE][DOC] Push-based shuffle high level user documentation
233af3d is described below

commit 233af3d2396391185c45b69c4bd2bf9ca8f1af67
Author: Venkata krishnan Sowrirajan <vs...@linkedin.com>
AuthorDate: Mon Aug 16 10:24:40 2021 -0500

    [SPARK-36374][SHUFFLE][DOC] Push-based shuffle high level user documentation
    
    ### What changes were proposed in this pull request?
    
    Document the push-based shuffle feature with a high level overview of the feature and corresponding configuration options for both shuffle server side as well as client side. This is how the changes to the doc looks on the browser ([img](https://user-images.githubusercontent.com/8871522/129231582-ad86ee2f-246f-4b42-9528-4ccd693e86d2.png))
    
    ### Why are the changes needed?
    
    Helps users understand the feature
    
    ### Does this PR introduce _any_ user-facing change?
    
    Docs
    
    ### How was this patch tested?
    
    N/A
    
    Closes #33615 from venkata91/SPARK-36374.
    
    Authored-by: Venkata krishnan Sowrirajan <vs...@linkedin.com>
    Signed-off-by: Mridul Muralidharan <mridul<at>gmail.com>
    (cherry picked from commit 2270ecf32f7ae478570145219d2ce71a642076cf)
    Signed-off-by: Mridul Muralidharan <mridulatgmail.com>
---
 .../apache/spark/network/util/TransportConf.java   |  28 ++++--
 .../shuffle/RemoteBlockPushResolverSuite.java      |   2 +-
 .../org/apache/spark/internal/config/package.scala |  63 ++++++------
 docs/configuration.md                              | 106 +++++++++++++++++++++
 4 files changed, 157 insertions(+), 42 deletions(-)

diff --git a/common/network-common/src/main/java/org/apache/spark/network/util/TransportConf.java b/common/network-common/src/main/java/org/apache/spark/network/util/TransportConf.java
index 69b8b25..ed0ca918 100644
--- a/common/network-common/src/main/java/org/apache/spark/network/util/TransportConf.java
+++ b/common/network-common/src/main/java/org/apache/spark/network/util/TransportConf.java
@@ -390,24 +390,32 @@ public class TransportConf {
   /**
    * The minimum size of a chunk when dividing a merged shuffle file into multiple chunks during
    * push-based shuffle.
-   * A merged shuffle file consists of multiple small shuffle blocks. Fetching the
-   * complete merged shuffle file in a single response increases the memory requirements for the
-   * clients. Instead of serving the entire merged file, the shuffle service serves the
-   * merged file in `chunks`. A `chunk` constitutes few shuffle blocks in entirety and this
-   * configuration controls how big a chunk can get. A corresponding index file for each merged
-   * shuffle file will be generated indicating chunk boundaries.
+   * A merged shuffle file consists of multiple small shuffle blocks. Fetching the complete
+   * merged shuffle file in a single disk I/O increases the memory requirements for both the
+   * clients and the external shuffle service. Instead, the external shuffle service serves
+   * the merged file in MB-sized chunks. This configuration controls how big a chunk can get.
+   * A corresponding index file for each merged shuffle file will be generated indicating chunk
+   * boundaries.
+   *
+   * Setting this too high would increase the memory requirements on both the clients and the
+   * external shuffle service.
+   *
+   * Setting this too low would increase the overall number of RPC requests to external shuffle
+   * service unnecessarily.
    */
   public int minChunkSizeInMergedShuffleFile() {
     return Ints.checkedCast(JavaUtils.byteStringAsBytes(
-      conf.get("spark.shuffle.server.minChunkSizeInMergedShuffleFile", "2m")));
+      conf.get("spark.shuffle.push.server.minChunkSizeInMergedShuffleFile", "2m")));
   }
 
   /**
-   * The size of cache in memory which is used in push-based shuffle for storing merged index files.
+   * The maximum size of cache in memory which is used in push-based shuffle for storing merged
+   * index files. This cache is in addition to the one configured via
+   * spark.shuffle.service.index.cache.size.
    */
   public long mergedIndexCacheSize() {
     return JavaUtils.byteStringAsBytes(
-      conf.get("spark.shuffle.server.mergedIndexCacheSize", "100m"));
+      conf.get("spark.shuffle.push.server.mergedIndexCacheSize", "100m"));
   }
 
   /**
@@ -417,7 +425,7 @@ public class TransportConf {
    * blocks for this shuffle partition.
    */
   public int ioExceptionsThresholdDuringMerge() {
-    return conf.getInt("spark.shuffle.server.ioExceptionsThresholdDuringMerge", 4);
+    return conf.getInt("spark.shuffle.push.server.ioExceptionsThresholdDuringMerge", 4);
   }
 
   /**
diff --git a/common/network-shuffle/src/test/java/org/apache/spark/network/shuffle/RemoteBlockPushResolverSuite.java b/common/network-shuffle/src/test/java/org/apache/spark/network/shuffle/RemoteBlockPushResolverSuite.java
index 46d6366..d7881f0 100644
--- a/common/network-shuffle/src/test/java/org/apache/spark/network/shuffle/RemoteBlockPushResolverSuite.java
+++ b/common/network-shuffle/src/test/java/org/apache/spark/network/shuffle/RemoteBlockPushResolverSuite.java
@@ -87,7 +87,7 @@ public class RemoteBlockPushResolverSuite {
   public void before() throws IOException {
     localDirs = createLocalDirs(2);
     MapConfigProvider provider = new MapConfigProvider(
-      ImmutableMap.of("spark.shuffle.server.minChunkSizeInMergedShuffleFile", "4"));
+      ImmutableMap.of("spark.shuffle.push.server.minChunkSizeInMergedShuffleFile", "4"));
     conf = new TransportConf("shuffle", provider);
     pushResolver = new RemoteBlockPushResolver(conf);
     registerExecutor(TEST_APP, prepareLocalDirs(localDirs, MERGE_DIRECTORY), MERGE_DIRECTORY_META);
diff --git a/core/src/main/scala/org/apache/spark/internal/config/package.scala b/core/src/main/scala/org/apache/spark/internal/config/package.scala
index 17c585d..7ed1f1d 100644
--- a/core/src/main/scala/org/apache/spark/internal/config/package.scala
+++ b/core/src/main/scala/org/apache/spark/internal/config/package.scala
@@ -2097,31 +2097,33 @@ package object config {
 
   private[spark] val PUSH_BASED_SHUFFLE_ENABLED =
     ConfigBuilder("spark.shuffle.push.enabled")
-      .doc("Set to 'true' to enable push-based shuffle on the client side and this works in " +
+      .doc("Set to true to enable push-based shuffle on the client side and this works in " +
         "conjunction with the server side flag spark.shuffle.server.mergedShuffleFileManagerImpl " +
         "which needs to be set with the appropriate " +
         "org.apache.spark.network.shuffle.MergedShuffleFileManager implementation for push-based " +
         "shuffle to be enabled")
-      .version("3.1.0")
+      .version("3.2.0")
       .booleanConf
       .createWithDefault(false)
 
   private[spark] val PUSH_BASED_SHUFFLE_MERGE_RESULTS_TIMEOUT =
-    ConfigBuilder("spark.shuffle.push.merge.results.timeout")
-      .doc("Specify the max amount of time DAGScheduler waits for the merge results from " +
-        "all remote shuffle services for a given shuffle. DAGScheduler will start to submit " +
-        "following stages if not all results are received within the timeout.")
+    ConfigBuilder("spark.shuffle.push.results.timeout")
+      .internal()
+      .doc("The maximum amount of time driver waits in seconds for the merge results to be" +
+        " received from all remote external shuffle services for a given shuffle. Driver" +
+        " submits following stages if not all results are received within the timeout. Setting" +
+        " this too long could potentially lead to performance regression")
       .version("3.2.0")
       .timeConf(TimeUnit.SECONDS)
       .checkValue(_ >= 0L, "Timeout must be >= 0.")
       .createWithDefaultString("10s")
 
   private[spark] val PUSH_BASED_SHUFFLE_MERGE_FINALIZE_TIMEOUT =
-    ConfigBuilder("spark.shuffle.push.merge.finalize.timeout")
-      .doc("Specify the amount of time DAGScheduler waits after all mappers finish for " +
-        "a given shuffle map stage before it starts sending merge finalize requests to " +
-        "remote shuffle services. This allows the shuffle services some extra time to " +
-        "merge as many blocks as possible.")
+    ConfigBuilder("spark.shuffle.push.finalize.timeout")
+      .doc("The amount of time driver waits, after all mappers have finished for a given" +
+        " shuffle map stage, before it sends merge finalize requests to remote external shuffle" +
+        " services. This gives the external shuffle services extra time to merge blocks. Setting" +
+        " this too long could potentially lead to performance regression")
       .version("3.2.0")
       .timeConf(TimeUnit.SECONDS)
       .checkValue(_ >= 0L, "Timeout must be >= 0.")
@@ -2129,54 +2131,53 @@ package object config {
 
   private[spark] val SHUFFLE_MERGER_MAX_RETAINED_LOCATIONS =
     ConfigBuilder("spark.shuffle.push.maxRetainedMergerLocations")
-      .doc("Maximum number of shuffle push merger locations cached for push based shuffle. " +
-        "Currently, shuffle push merger locations are nothing but external shuffle services " +
-        "which are responsible for handling pushed blocks and merging them and serving " +
-        "merged blocks for later shuffle fetch.")
-      .version("3.1.0")
+      .doc("Maximum number of merger locations cached for push-based shuffle. Currently, merger" +
+        " locations are hosts of external shuffle services responsible for handling pushed" +
+        " blocks, merging them and serving merged blocks for later shuffle fetch.")
+      .version("3.2.0")
       .intConf
       .createWithDefault(500)
 
   private[spark] val SHUFFLE_MERGER_LOCATIONS_MIN_THRESHOLD_RATIO =
     ConfigBuilder("spark.shuffle.push.mergersMinThresholdRatio")
-      .doc("The minimum number of shuffle merger locations required to enable push based " +
-        "shuffle for a stage. This is specified as a ratio of the number of partitions in " +
-        "the child stage. For example, a reduce stage which has 100 partitions and uses the " +
-        "default value 0.05 requires at least 5 unique merger locations to enable push based " +
-        "shuffle. Merger locations are currently defined as external shuffle services.")
-      .version("3.1.0")
+      .doc("Ratio used to compute the minimum number of shuffle merger locations required for" +
+        " a stage based on the number of partitions for the reducer stage. For example, a reduce" +
+        " stage which has 100 partitions and uses the default value 0.05 requires at least 5" +
+        " unique merger locations to enable push-based shuffle. Merger locations are currently" +
+        " defined as external shuffle services.")
+      .version("3.2.0")
       .doubleConf
       .createWithDefault(0.05)
 
   private[spark] val SHUFFLE_MERGER_LOCATIONS_MIN_STATIC_THRESHOLD =
     ConfigBuilder("spark.shuffle.push.mergersMinStaticThreshold")
       .doc(s"The static threshold for number of shuffle push merger locations should be " +
-        "available in order to enable push based shuffle for a stage. Note this config " +
+        "available in order to enable push-based shuffle for a stage. Note this config " +
         s"works in conjunction with ${SHUFFLE_MERGER_LOCATIONS_MIN_THRESHOLD_RATIO.key}. " +
         "Maximum of spark.shuffle.push.mergersMinStaticThreshold and " +
         s"${SHUFFLE_MERGER_LOCATIONS_MIN_THRESHOLD_RATIO.key} ratio number of mergers needed to " +
-        "enable push based shuffle for a stage. For eg: with 1000 partitions for the child " +
+        "enable push-based shuffle for a stage. For eg: with 1000 partitions for the child " +
         "stage with spark.shuffle.push.mergersMinStaticThreshold as 5 and " +
         s"${SHUFFLE_MERGER_LOCATIONS_MIN_THRESHOLD_RATIO.key} set to 0.05, we would need " +
-        "at least 50 mergers to enable push based shuffle for that stage.")
-      .version("3.1.0")
+        "at least 50 mergers to enable push-based shuffle for that stage.")
+      .version("3.2.0")
       .intConf
       .createWithDefault(5)
 
   private[spark] val SHUFFLE_NUM_PUSH_THREADS =
     ConfigBuilder("spark.shuffle.push.numPushThreads")
       .doc("Specify the number of threads in the block pusher pool. These threads assist " +
-        "in creating connections and pushing blocks to remote shuffle services. By default, the " +
-        "threadpool size is equal to the number of spark executor cores.")
+        "in creating connections and pushing blocks to remote external shuffle services. By" +
+        " default, the threadpool size is equal to the number of spark executor cores.")
       .version("3.2.0")
       .intConf
       .createOptional
 
   private[spark] val SHUFFLE_MAX_BLOCK_SIZE_TO_PUSH =
     ConfigBuilder("spark.shuffle.push.maxBlockSizeToPush")
-      .doc("The max size of an individual block to push to the remote shuffle services. Blocks " +
-       "larger than this threshold are not pushed to be merged remotely. These shuffle blocks " +
-       "will be fetched by the executors in the original manner.")
+      .doc("The max size of an individual block to push to the remote external shuffle services." +
+        " Blocks larger than this threshold are not pushed to be merged remotely. These shuffle" +
+        " blocks will be fetched by the executors in the original manner.")
       .version("3.2.0")
       .bytesConf(ByteUnit.BYTE)
       .createWithDefaultString("1m")
diff --git a/docs/configuration.md b/docs/configuration.md
index a4fdc4c..770c8b4 100644
--- a/docs/configuration.md
+++ b/docs/configuration.md
@@ -3152,3 +3152,109 @@ The stage level scheduling feature allows users to specify task and executor res
 This is only available for the RDD API in Scala, Java, and Python.  It is available on YARN and Kubernetes when dynamic allocation is enabled. See the [YARN](running-on-yarn.html#stage-level-scheduling-overview) page or [Kubernetes](running-on-kubernetes.html#stage-level-scheduling-overview) page for more implementation details.
 
 See the `RDD.withResources` and `ResourceProfileBuilder` API's for using this feature. The current implementation acquires new executors for each `ResourceProfile`  created and currently has to be an exact match. Spark does not try to fit tasks into an executor that require a different ResourceProfile than the executor was created with. Executors that are not in use will idle timeout with the dynamic allocation logic. The default configuration for this feature is to only allow one Resour [...]
+
+# Push-based shuffle overview
+
+Push-based shuffle helps improve the reliability and performance of spark shuffle. It takes a best-effort approach to push the shuffle blocks generated by the map tasks to remote external shuffle services to be merged per shuffle partition. Reduce tasks fetch a combination of merged shuffle partitions and original shuffle blocks as their input data, resulting in converting small random disk reads by external shuffle services into large sequential reads. Possibility of better data localit [...]
+
+<p> Push-based shuffle improves performance for long running jobs/queries which involves large disk I/O during shuffle. Currently it is not well suited for jobs/queries which runs quickly dealing with lesser amount of shuffle data. This will be further improved in the future releases.</p>
+
+<p> <b> Currently push-based shuffle is only supported for Spark on YARN with external shuffle service. </b></p>
+
+### External Shuffle service(server) side configuration options
+
+<table class="table">
+<tr><th>Property Name</th><th>Default</th><th>Meaning</th><th>Since Version</th></tr>
+<tr>
+  <td><code>spark.shuffle.push.server.mergedShuffleFileManagerImpl</code></td>
+  <td>
+    <code>org.apache.spark.network.shuffle.<br />NoOpMergedShuffleFileManager</code>
+  </td>
+  <td>
+    Class name of the implementation of <code>MergedShuffleFileManager</code> that manages push-based shuffle. This acts as a server side config to disable or enable push-based shuffle. By default, push-based shuffle is disabled at the server side. <p> To enable push-based shuffle on the server side, set this config to <code>org.apache.spark.network.shuffle.RemoteBlockPushResolver</code></p>
+  </td>
+  <td>3.2.0</td>
+</tr>
+<tr>
+  <td><code>spark.shuffle.push.server.minChunkSizeInMergedShuffleFile</code></td>
+  <td><code>2m</code></td>
+  <td>
+    <p> The minimum size of a chunk when dividing a merged shuffle file into multiple chunks during push-based shuffle. A merged shuffle file consists of multiple small shuffle blocks. Fetching the complete merged shuffle file in a single disk I/O increases the memory requirements for both the clients and the external shuffle services. Instead, the external shuffle service serves the merged file in <code>MB-sized chunks</code>.<br /> This configuration controls how big a chunk can get. A [...]
+    <p> Setting this too high would increase the memory requirements on both the clients and the external shuffle service. </p>
+    <p> Setting this too low would increase the overall number of RPC requests to external shuffle service unnecessarily.</p>
+  </td>
+  <td>3.2.0</td>
+</tr>
+<tr>
+  <td><code>spark.shuffle.push.server.mergedIndexCacheSize</code></td>
+  <td><code>100m</code></td>
+  <td>
+    The maximum size of cache in memory which could be used in push-based shuffle for storing merged index files. This cache is in addition to the one configured via <code>spark.shuffle.service.index.cache.size</code>.
+  </td>
+  <td>3.2.0</td>
+</tr>
+</table>
+
+### Client side configuration options
+
+<table class="table">
+<tr><th>Property Name</th><th>Default</th><th>Meaning</th><th>Since Version</th></tr>
+<tr>
+  <td><code>spark.shuffle.push.enabled</code></td>
+  <td><code>false</code></td>
+  <td>
+    Set to true to enable push-based shuffle on the client side and works in conjunction with the server side flag <code>spark.shuffle.server.mergedShuffleFileManagerImpl</code>.
+  </td>
+  <td>3.2.0</td>
+</tr>
+<tr>
+  <td><code>spark.shuffle.push.finalize.timeout</code></td>
+  <td><code>10s</code></td>
+  <td>
+    The amount of time driver waits in seconds, after all mappers have finished for a given shuffle map stage, before it sends merge finalize requests to remote external shuffle services. This gives the external shuffle services extra time to merge blocks. Setting this too long could potentially lead to performance regression.
+  </td>
+ <td>3.2.0</td>
+</tr>
+<tr>
+  <td><code>spark.shuffle.push.maxRetainedMergerLocations</code></td>
+  <td><code>500</code></td>
+  <td>
+    Maximum number of merger locations cached for push-based shuffle. Currently, merger locations are hosts of external shuffle services responsible for handling pushed blocks, merging them and serving merged blocks for later shuffle fetch.
+  </td>
+  <td>3.2.0</td>
+</tr>
+<tr>
+  <td><code>spark.shuffle.push.mergersMinThresholdRatio</code></td>
+  <td><code>0.05</code></td>
+  <td>
+    Ratio used to compute the minimum number of shuffle merger locations required for a stage based on the number of partitions for the reducer stage. For example, a reduce stage which has 100 partitions and uses the default value 0.05 requires at least 5 unique merger locations to enable push-based shuffle.
+  </td>
+ <td>3.2.0</td>
+</tr>
+<tr>
+  <td><code>spark.shuffle.push.mergersMinStaticThreshold</code></td>
+  <td><code>5</code></td>
+  <td>
+    The static threshold for number of shuffle push merger locations should be available in order to enable push-based shuffle for a stage. Note this config works in conjunction with <code>spark.shuffle.push.mergersMinThresholdRatio</code>. Maximum of <code>spark.shuffle.push.mergersMinStaticThreshold</code> and <code>spark.shuffle.push.mergersMinThresholdRatio</code> ratio number of mergers needed to enable push-based shuffle for a stage. For example: with 1000 partitions for the child  [...]
+  </td>
+  <td>3.2.0</td>
+</tr>
+<tr>
+  <td><code>spark.shuffle.push.maxBlockSizeToPush</code></td>
+  <td><code>1m</code></td>
+  <td>
+    <p> The max size of an individual block to push to the remote external shuffle services. Blocks larger than this threshold are not pushed to be merged remotely. These shuffle blocks will be fetched in the original manner. </p>
+    <p> Setting this too high would result in more blocks to be pushed to remote external shuffle services but those are already efficiently fetched with the existing mechanisms resulting in additional overhead of pushing the large blocks to remote external shuffle services. It is recommended to set <code>spark.shuffle.push.maxBlockSizeToPush</code> lesser than <code>spark.shuffle.push.maxBlockBatchSize</code> config's value. </p>
+    <p> Setting this too low would result in lesser number of blocks getting merged and directly fetched from mapper external shuffle service results in higher small random reads affecting overall disk I/O performance. </p>
+  </td>
+  <td>3.2.0</td>
+</tr>
+<tr>
+  <td><code>spark.shuffle.push.maxBlockBatchSize</code></td>
+  <td><code>3m</code></td>
+  <td>
+    The max size of a batch of shuffle blocks to be grouped into a single push request. Default is set to <code>3m</code> in order to keep it slightly higher than <code>spark.storage.memoryMapThreshold</code> default which is <code>2m</code> as it is very likely that each batch of block gets memory mapped which incurs higher overhead.
+  </td>
+  <td>3.2.0</td>
+</tr>
+</table>

---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@spark.apache.org
For additional commands, e-mail: commits-help@spark.apache.org