You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@spark.apache.org by mr...@apache.org on 2020/11/23 21:17:40 UTC
[spark] branch master updated: [SPARK-32918][SHUFFLE] RPC
implementation to support control plane coordination for push-based shuffle
This is an automated email from the ASF dual-hosted git repository.
mridulm80 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/spark.git
The following commit(s) were added to refs/heads/master by this push:
new 1bd897c [SPARK-32918][SHUFFLE] RPC implementation to support control plane coordination for push-based shuffle
1bd897c is described below
commit 1bd897cbc4fe30eb8b7740c7232aae87081e8e33
Author: Ye Zhou <ye...@linkedin.com>
AuthorDate: Mon Nov 23 15:16:20 2020 -0600
[SPARK-32918][SHUFFLE] RPC implementation to support control plane coordination for push-based shuffle
### What changes were proposed in this pull request?
This is one of the patches for SPIP SPARK-30602 which is needed for push-based shuffle.
Summary of changes:
This PR introduces a new RPC to be called within Driver. When the expected shuffle push wait time reaches, Driver will call this RPC to facilitate coordination of shuffle map/reduce stages and notify external shuffle services to finalize shuffle block merge for a given shuffle. Shuffle services also respond back the metadata about a merged shuffle partition back to the caller.
### Why are the changes needed?
Refer to the SPIP in SPARK-30602.
### Does this PR introduce _any_ user-facing change?
No
### How was this patch tested?
This code snippets won't be called by any existing code and will be tested after the coordinated driver changes gets merged in SPARK-32920.
Lead-authored-by: Min Shen mshenlinkedin.com
Closes #30163 from zhouyejoe/SPARK-32918.
Lead-authored-by: Ye Zhou <ye...@linkedin.com>
Co-authored-by: Min Shen <ms...@linkedin.com>
Signed-off-by: Mridul Muralidharan <mridul<at>gmail.com>
---
.../spark/network/shuffle/BlockStoreClient.java | 22 +++++++++++
.../network/shuffle/ExternalBlockStoreClient.java | 29 +++++++++++++++
.../network/shuffle/MergeFinalizerListener.java | 43 ++++++++++++++++++++++
3 files changed, 94 insertions(+)
diff --git a/common/network-shuffle/src/main/java/org/apache/spark/network/shuffle/BlockStoreClient.java b/common/network-shuffle/src/main/java/org/apache/spark/network/shuffle/BlockStoreClient.java
index 37befcd..a6bdc13 100644
--- a/common/network-shuffle/src/main/java/org/apache/spark/network/shuffle/BlockStoreClient.java
+++ b/common/network-shuffle/src/main/java/org/apache/spark/network/shuffle/BlockStoreClient.java
@@ -147,6 +147,8 @@ public abstract class BlockStoreClient implements Closeable {
* @param blockIds block ids to be pushed
* @param buffers buffers to be pushed
* @param listener the listener to receive block push status.
+ *
+ * @since 3.1.0
*/
public void pushBlocks(
String host,
@@ -156,4 +158,24 @@ public abstract class BlockStoreClient implements Closeable {
BlockFetchingListener listener) {
throw new UnsupportedOperationException();
}
+
+ /**
+ * Invoked by Spark driver to notify external shuffle services to finalize the shuffle merge
+ * for a given shuffle. This allows the driver to start the shuffle reducer stage after properly
+ * finishing the shuffle merge process associated with the shuffle mapper stage.
+ *
+ * @param host host of shuffle server
+ * @param port port of shuffle server.
+ * @param shuffleId shuffle ID of the shuffle to be finalized
+ * @param listener the listener to receive MergeStatuses
+ *
+ * @since 3.1.0
+ */
+ public void finalizeShuffleMerge(
+ String host,
+ int port,
+ int shuffleId,
+ MergeFinalizerListener listener) {
+ throw new UnsupportedOperationException();
+ }
}
diff --git a/common/network-shuffle/src/main/java/org/apache/spark/network/shuffle/ExternalBlockStoreClient.java b/common/network-shuffle/src/main/java/org/apache/spark/network/shuffle/ExternalBlockStoreClient.java
index eca35ed..56c06e6 100644
--- a/common/network-shuffle/src/main/java/org/apache/spark/network/shuffle/ExternalBlockStoreClient.java
+++ b/common/network-shuffle/src/main/java/org/apache/spark/network/shuffle/ExternalBlockStoreClient.java
@@ -159,6 +159,35 @@ public class ExternalBlockStoreClient extends BlockStoreClient {
}
@Override
+ public void finalizeShuffleMerge(
+ String host,
+ int port,
+ int shuffleId,
+ MergeFinalizerListener listener) {
+ checkInit();
+ try {
+ TransportClient client = clientFactory.createClient(host, port);
+ ByteBuffer finalizeShuffleMerge = new FinalizeShuffleMerge(appId, shuffleId).toByteBuffer();
+ client.sendRpc(finalizeShuffleMerge, new RpcResponseCallback() {
+ @Override
+ public void onSuccess(ByteBuffer response) {
+ listener.onShuffleMergeSuccess(
+ (MergeStatuses) BlockTransferMessage.Decoder.fromByteBuffer(response));
+ }
+
+ @Override
+ public void onFailure(Throwable e) {
+ listener.onShuffleMergeFailure(e);
+ }
+ });
+ } catch (Exception e) {
+ logger.error("Exception while sending finalizeShuffleMerge request to {}:{}",
+ host, port, e);
+ listener.onShuffleMergeFailure(e);
+ }
+ }
+
+ @Override
public MetricSet shuffleMetrics() {
checkInit();
return clientFactory.getAllMetrics();
diff --git a/common/network-shuffle/src/main/java/org/apache/spark/network/shuffle/MergeFinalizerListener.java b/common/network-shuffle/src/main/java/org/apache/spark/network/shuffle/MergeFinalizerListener.java
new file mode 100644
index 0000000..08e13ee
--- /dev/null
+++ b/common/network-shuffle/src/main/java/org/apache/spark/network/shuffle/MergeFinalizerListener.java
@@ -0,0 +1,43 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.network.shuffle;
+
+import java.util.EventListener;
+
+import org.apache.spark.network.shuffle.protocol.MergeStatuses;
+
+/**
+ * :: DeveloperApi ::
+ *
+ * Listener providing a callback function to invoke when driver receives the response for the
+ * finalize shuffle merge request sent to remote shuffle service.
+ *
+ * @since 3.1.0
+ */
+public interface MergeFinalizerListener extends EventListener {
+ /**
+ * Called once upon successful response on finalize shuffle merge on a remote shuffle service.
+ * The returned {@link MergeStatuses} is passed to the listener for further processing
+ */
+ void onShuffleMergeSuccess(MergeStatuses statuses);
+
+ /**
+ * Called once upon failure response on finalize shuffle merge on a remote shuffle service.
+ */
+ void onShuffleMergeFailure(Throwable e);
+}
---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@spark.apache.org
For additional commands, e-mail: commits-help@spark.apache.org