You are viewing a plain text version of this content. The canonical link for it is here.
Posted to reviews@spark.apache.org by GitBox <gi...@apache.org> on 2020/07/23 19:14:54 UTC

[GitHub] [spark] viirya commented on a change in pull request #28911: [SPARK-32077][CORE] Support host-local shuffle data reading when external shuffle service is disabled

viirya commented on a change in pull request #28911:
URL: https://github.com/apache/spark/pull/28911#discussion_r459671216



##########
File path: common/network-shuffle/src/main/java/org/apache/spark/network/shuffle/BlockStoreClient.java
##########
@@ -61,4 +78,62 @@ public MetricSet shuffleMetrics() {
     // Return an empty MetricSet by default.
     return () -> Collections.emptyMap();
   }
+
+  /**
+   * Request the local disk directories for executors which are located at the same host with
+   * the current BlockStoreClient(it can be ExternalBlockStoreClient or NettyBlockTransferService).
+   *
+   *
+   * @param host the host of BlockManager or ExternalShuffleService. It's the same with current
+   *             BlockStoreClient.
+   * @param port the port of BlockManager or ExternalShuffleService.
+   * @param execIds a collection of executor Ids, which specifies the target executors that we
+   *                want to get their local directories. There could be multiple executor Ids if
+   *                BlockStoreClient is implemented by ExternalBlockStoreClient since the request
+   *                handler, ExternalShuffleService, can serve multiple executors on the same node.
+   *                Or, only one executor Id if BlockStoreClient is implemented by
+   *                NettyBlockTransferService.
+   * @param hostLocalDirsCompletable a CompletableFuture which contains a map from executor Id to its
+   *                                 local directories if the request handler replies successfully.
+   *                                 Otherwise, it contains a specific error.
+   */
+  public void getHostLocalDirs(
+      String host,
+      int port,
+      String[] execIds,
+      CompletableFuture<Map<String, String[]>> hostLocalDirsCompletable) {
+    assert appId != null : "Called before init()";
+    GetLocalDirsForExecutors getLocalDirsMessage = new GetLocalDirsForExecutors(appId, execIds);
+    try {
+      TransportClient client = clientFactory.createClient(host, port);
+      client.sendRpc(getLocalDirsMessage.toByteBuffer(), new RpcResponseCallback() {
+        @Override
+        public void onSuccess(ByteBuffer response) {
+          try {
+            BlockTransferMessage msgObj = BlockTransferMessage.Decoder.fromByteBuffer(response);
+            hostLocalDirsCompletable.complete(
+                    ((LocalDirsForExecutors) msgObj).getLocalDirsByExec());
+          } catch (Throwable t) {
+            logger.warn("Error trying to get the host local dirs for " +
+                            Arrays.toString(getLocalDirsMessage.execIds) + " via external shuffle service",

Review comment:
       "via external shuffle service" -> "via BlockStoreClient"?

##########
File path: common/network-shuffle/src/main/java/org/apache/spark/network/shuffle/BlockStoreClient.java
##########
@@ -61,4 +78,62 @@ public MetricSet shuffleMetrics() {
     // Return an empty MetricSet by default.
     return () -> Collections.emptyMap();
   }
+
+  /**
+   * Request the local disk directories for executors which are located at the same host with
+   * the current BlockStoreClient(it can be ExternalBlockStoreClient or NettyBlockTransferService).
+   *
+   *
+   * @param host the host of BlockManager or ExternalShuffleService. It's the same with current
+   *             BlockStoreClient.
+   * @param port the port of BlockManager or ExternalShuffleService.
+   * @param execIds a collection of executor Ids, which specifies the target executors that we
+   *                want to get their local directories. There could be multiple executor Ids if
+   *                BlockStoreClient is implemented by ExternalBlockStoreClient since the request
+   *                handler, ExternalShuffleService, can serve multiple executors on the same node.
+   *                Or, only one executor Id if BlockStoreClient is implemented by
+   *                NettyBlockTransferService.
+   * @param hostLocalDirsCompletable a CompletableFuture which contains a map from executor Id to its
+   *                                 local directories if the request handler replies successfully.
+   *                                 Otherwise, it contains a specific error.
+   */
+  public void getHostLocalDirs(
+      String host,
+      int port,
+      String[] execIds,
+      CompletableFuture<Map<String, String[]>> hostLocalDirsCompletable) {
+    assert appId != null : "Called before init()";
+    GetLocalDirsForExecutors getLocalDirsMessage = new GetLocalDirsForExecutors(appId, execIds);
+    try {
+      TransportClient client = clientFactory.createClient(host, port);
+      client.sendRpc(getLocalDirsMessage.toByteBuffer(), new RpcResponseCallback() {
+        @Override
+        public void onSuccess(ByteBuffer response) {
+          try {
+            BlockTransferMessage msgObj = BlockTransferMessage.Decoder.fromByteBuffer(response);
+            hostLocalDirsCompletable.complete(
+                    ((LocalDirsForExecutors) msgObj).getLocalDirsByExec());
+          } catch (Throwable t) {
+            logger.warn("Error trying to get the host local dirs for " +
+                            Arrays.toString(getLocalDirsMessage.execIds) + " via external shuffle service",
+                    t.getCause());
+            hostLocalDirsCompletable.completeExceptionally(t);
+          } finally {
+            client.close();
+          }
+        }
+
+        @Override
+        public void onFailure(Throwable t) {
+          logger.warn("Error trying to get the host local dirs for " +
+            Arrays.toString(getLocalDirsMessage.execIds) + " via external shuffle service",

Review comment:
       ditto.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org