You are viewing a plain text version of this content. The canonical link for it is here.
Posted to reviews@spark.apache.org by GitBox <gi...@apache.org> on 2020/10/27 22:56:00 UTC

[GitHub] [spark] Victsm commented on a change in pull request #30163: [SPARK-32918][SHUFFLE] RPC implementation to support control plane coordination for push-based shuffle

Victsm commented on a change in pull request #30163:
URL: https://github.com/apache/spark/pull/30163#discussion_r513080596



##########
File path: common/network-shuffle/src/main/java/org/apache/spark/network/shuffle/ExternalBlockStoreClient.java
##########
@@ -158,6 +158,42 @@ public void pushBlocks(
     }
   }
 
+  /**
+   * Invoked by Spark driver to notify external shuffle services to finalize the shuffle merge
+   * for a given shuffle. This allows the driver to start the shuffle reducer stage after properly
+   * finishing the shuffle merge process associated with the shuffle mapper stage.
+   *
+   * @param host host of shuffle server
+   * @param port port of shuffle server.
+   * @param shuffleId shuffle ID of the shuffle to be finalized
+   * @param listener the listener to receive MergeStatuses
+   */
+  public void finalizeShuffleMerge(

Review comment:
       The part we want to get community inputs is whether to put this API inside `ExternalBlockStoreClient` or a separate client.
   This RPC will be used by the Spark driver (DAGScheduler) to finalize the shuffle merge when it decides to do so and to retrieve the corresponding `MergeStatuses` from individual shuffle services for a given shuffle.
   Right now, we have to initialize an `ExternalBLockStoreClient` on the DAGScheduler side with some dummy parameters like the following:
   `
     private lazy val externalShuffleClient: Option[ExternalShuffleClient] =
   
       if (pushBasedShuffleEnabled) {
         val transConf = SparkTransportConf.fromSparkConf(sc.conf, "shuffle", 1)
         val shuffleClient = new ExternalShuffleClient(transConf, env.securityManager,
           env.securityManager.isAuthenticationEnabled(),
           sc.conf.get(config.SHUFFLE_REGISTRATION_TIMEOUT))
         shuffleClient.init(sc.conf.getAppId)
         Some(shuffleClient)
       } else {
         None
       }
   `
   
   CC @Ngone51 @jiangxb1987 @attilapiros @tgravescs 




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org