You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@spark.apache.org by mr...@apache.org on 2020/11/23 21:17:40 UTC

[spark] branch master updated: [SPARK-32918][SHUFFLE] RPC implementation to support control plane coordination for push-based shuffle

This is an automated email from the ASF dual-hosted git repository.

mridulm80 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/spark.git


The following commit(s) were added to refs/heads/master by this push:
     new 1bd897c  [SPARK-32918][SHUFFLE] RPC implementation to support control plane coordination for push-based shuffle
1bd897c is described below

commit 1bd897cbc4fe30eb8b7740c7232aae87081e8e33
Author: Ye Zhou <ye...@linkedin.com>
AuthorDate: Mon Nov 23 15:16:20 2020 -0600

    [SPARK-32918][SHUFFLE] RPC implementation to support control plane coordination for push-based shuffle
    
    ### What changes were proposed in this pull request?
    This is one of the patches for SPIP SPARK-30602 which is needed for push-based shuffle.
    Summary of changes:
    This PR introduces a new RPC to be called within Driver. When the expected shuffle push wait time reaches, Driver will call this RPC to facilitate coordination of shuffle map/reduce stages and notify external shuffle services to finalize shuffle block merge for a given shuffle. Shuffle services also respond back the metadata about a merged shuffle partition back to the caller.
    
    ### Why are the changes needed?
    Refer to the SPIP in SPARK-30602.
    
    ### Does this PR introduce _any_ user-facing change?
    No
    
    ### How was this patch tested?
    This code snippets won't be called by any existing code and will be tested after the coordinated driver changes gets merged in SPARK-32920.
    
    Lead-authored-by: Min Shen mshenlinkedin.com
    
    Closes #30163 from zhouyejoe/SPARK-32918.
    
    Lead-authored-by: Ye Zhou <ye...@linkedin.com>
    Co-authored-by: Min Shen <ms...@linkedin.com>
    Signed-off-by: Mridul Muralidharan <mridul<at>gmail.com>
---
 .../spark/network/shuffle/BlockStoreClient.java    | 22 +++++++++++
 .../network/shuffle/ExternalBlockStoreClient.java  | 29 +++++++++++++++
 .../network/shuffle/MergeFinalizerListener.java    | 43 ++++++++++++++++++++++
 3 files changed, 94 insertions(+)

diff --git a/common/network-shuffle/src/main/java/org/apache/spark/network/shuffle/BlockStoreClient.java b/common/network-shuffle/src/main/java/org/apache/spark/network/shuffle/BlockStoreClient.java
index 37befcd..a6bdc13 100644
--- a/common/network-shuffle/src/main/java/org/apache/spark/network/shuffle/BlockStoreClient.java
+++ b/common/network-shuffle/src/main/java/org/apache/spark/network/shuffle/BlockStoreClient.java
@@ -147,6 +147,8 @@ public abstract class BlockStoreClient implements Closeable {
    * @param blockIds block ids to be pushed
    * @param buffers buffers to be pushed
    * @param listener the listener to receive block push status.
+   *
+   * @since 3.1.0
    */
   public void pushBlocks(
       String host,
@@ -156,4 +158,24 @@ public abstract class BlockStoreClient implements Closeable {
       BlockFetchingListener listener) {
     throw new UnsupportedOperationException();
   }
+
+  /**
+   * Invoked by Spark driver to notify external shuffle services to finalize the shuffle merge
+   * for a given shuffle. This allows the driver to start the shuffle reducer stage after properly
+   * finishing the shuffle merge process associated with the shuffle mapper stage.
+   *
+   * @param host host of shuffle server
+   * @param port port of shuffle server.
+   * @param shuffleId shuffle ID of the shuffle to be finalized
+   * @param listener the listener to receive MergeStatuses
+   *
+   * @since 3.1.0
+   */
+  public void finalizeShuffleMerge(
+      String host,
+      int port,
+      int shuffleId,
+      MergeFinalizerListener listener) {
+    throw new UnsupportedOperationException();
+  }
 }
diff --git a/common/network-shuffle/src/main/java/org/apache/spark/network/shuffle/ExternalBlockStoreClient.java b/common/network-shuffle/src/main/java/org/apache/spark/network/shuffle/ExternalBlockStoreClient.java
index eca35ed..56c06e6 100644
--- a/common/network-shuffle/src/main/java/org/apache/spark/network/shuffle/ExternalBlockStoreClient.java
+++ b/common/network-shuffle/src/main/java/org/apache/spark/network/shuffle/ExternalBlockStoreClient.java
@@ -159,6 +159,35 @@ public class ExternalBlockStoreClient extends BlockStoreClient {
   }
 
   @Override
+  public void finalizeShuffleMerge(
+      String host,
+      int port,
+      int shuffleId,
+      MergeFinalizerListener listener) {
+    checkInit();
+    try {
+      TransportClient client = clientFactory.createClient(host, port);
+      ByteBuffer finalizeShuffleMerge = new FinalizeShuffleMerge(appId, shuffleId).toByteBuffer();
+      client.sendRpc(finalizeShuffleMerge, new RpcResponseCallback() {
+        @Override
+        public void onSuccess(ByteBuffer response) {
+          listener.onShuffleMergeSuccess(
+            (MergeStatuses) BlockTransferMessage.Decoder.fromByteBuffer(response));
+        }
+
+        @Override
+        public void onFailure(Throwable e) {
+          listener.onShuffleMergeFailure(e);
+        }
+      });
+    } catch (Exception e) {
+      logger.error("Exception while sending finalizeShuffleMerge request to {}:{}",
+        host, port, e);
+      listener.onShuffleMergeFailure(e);
+    }
+  }
+
+  @Override
   public MetricSet shuffleMetrics() {
     checkInit();
     return clientFactory.getAllMetrics();
diff --git a/common/network-shuffle/src/main/java/org/apache/spark/network/shuffle/MergeFinalizerListener.java b/common/network-shuffle/src/main/java/org/apache/spark/network/shuffle/MergeFinalizerListener.java
new file mode 100644
index 0000000..08e13ee
--- /dev/null
+++ b/common/network-shuffle/src/main/java/org/apache/spark/network/shuffle/MergeFinalizerListener.java
@@ -0,0 +1,43 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.network.shuffle;
+
+import java.util.EventListener;
+
+import org.apache.spark.network.shuffle.protocol.MergeStatuses;
+
+/**
+ * :: DeveloperApi ::
+ *
+ * Listener providing a callback function to invoke when driver receives the response for the
+ * finalize shuffle merge request sent to remote shuffle service.
+ *
+ * @since 3.1.0
+ */
+public interface MergeFinalizerListener extends EventListener {
+  /**
+   * Called once upon successful response on finalize shuffle merge on a remote shuffle service.
+   * The returned {@link MergeStatuses} is passed to the listener for further processing
+   */
+  void onShuffleMergeSuccess(MergeStatuses statuses);
+
+  /**
+   * Called once upon failure response on finalize shuffle merge on a remote shuffle service.
+   */
+  void onShuffleMergeFailure(Throwable e);
+}


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@spark.apache.org
For additional commands, e-mail: commits-help@spark.apache.org