You are viewing a plain text version of this content. The canonical link for it is here.
Posted to reviews@spark.apache.org by GitBox <gi...@apache.org> on 2020/10/27 22:35:50 UTC

[GitHub] [spark] zhouyejoe opened a new pull request #30163: [SPARK-32918][SHUFFLE] RPC implementation to support control plane coordination for push-based shuffle

zhouyejoe opened a new pull request #30163:
URL: https://github.com/apache/spark/pull/30163


   ### What changes were proposed in this pull request?
   This is one of the patches for SPIP SPARK-30602 which is needed for push-based shuffle.
   Summary of changes:
   This PR introduces a new RPC to be called within Driver. When the expected shuffle push wait time reaches, Driver will call this RPC to facilitate coordination of shuffle map/reduce stages and notify external shuffle services to finalize shuffle block merge for a given shuffle. Shuffle services also respond back the metadata about a merged shuffle partition back to the caller.
   
   ### Why are the changes needed?
   Refer to the SPIP in SPARK-30602.
   
   ### Does this PR introduce _any_ user-facing change?
   No
   
   ### How was this patch tested?
   This code snippets won't be called by any existing code and will be tested after the coordinated driver changes gets merged in SPARK-32920.
   


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] mridulm commented on pull request #30163: [SPARK-32918][SHUFFLE] RPC implementation to support control plane coordination for push-based shuffle

Posted by GitBox <gi...@apache.org>.
mridulm commented on pull request #30163:
URL: https://github.com/apache/spark/pull/30163#issuecomment-732430692


   Thanks for the +1 @tgravescs ! The scala 2.13 error is unrelated.
   Merging to master.


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] Victsm commented on a change in pull request #30163: [SPARK-32918][SHUFFLE] RPC implementation to support control plane coordination for push-based shuffle

Posted by GitBox <gi...@apache.org>.
Victsm commented on a change in pull request #30163:
URL: https://github.com/apache/spark/pull/30163#discussion_r523709402



##########
File path: common/network-shuffle/src/main/java/org/apache/spark/network/shuffle/MergeFinalizerListener.java
##########
@@ -0,0 +1,41 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.network.shuffle;
+
+import java.util.EventListener;
+
+import org.apache.spark.network.shuffle.protocol.MergeStatuses;
+
+/**
+ * :: DeveloperApi ::
+ *
+ * Listener providing a callback function to invoke when driver receives the response for the finalize

Review comment:
       Please fix the line length issue reported by java linter.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] mridulm commented on pull request #30163: [SPARK-32918][SHUFFLE] RPC implementation to support control plane coordination for push-based shuffle

Posted by GitBox <gi...@apache.org>.
mridulm commented on pull request #30163:
URL: https://github.com/apache/spark/pull/30163#issuecomment-731174002


   @tgravescs, @attilapiros, @Ngone51 are there any other comments ?
   Thanks.


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] zhouyejoe commented on a change in pull request #30163: [SPARK-32918][SHUFFLE] RPC implementation to support control plane coordination for push-based shuffle

Posted by GitBox <gi...@apache.org>.
zhouyejoe commented on a change in pull request #30163:
URL: https://github.com/apache/spark/pull/30163#discussion_r527157009



##########
File path: common/network-shuffle/src/main/java/org/apache/spark/network/shuffle/ExternalBlockStoreClient.java
##########
@@ -158,6 +158,34 @@ public void pushBlocks(
     }
   }
 
+  @Override
+  public void finalizeShuffleMerge(
+      String host,
+      int port,
+      int shuffleId,
+      MergeFinalizerListener listener) {
+    checkInit();
+    try {
+      TransportClient client = clientFactory.createClient(host, port);
+      ByteBuffer finalizeShuffleMerge = new FinalizeShuffleMerge(appId, shuffleId).toByteBuffer();
+      client.sendRpc(finalizeShuffleMerge, new RpcResponseCallback() {
+        @Override
+        public void onSuccess(ByteBuffer response) {
+          listener.onShuffleMergeSuccess(
+            (MergeStatuses) BlockTransferMessage.Decoder.fromByteBuffer(response));
+        }
+
+        @Override
+        public void onFailure(Throwable e) {
+          listener.onShuffleMergeFailure(e);
+        }
+      });
+    } catch (Exception e) {
+      logger.error("Exception while sending finalizeShuffleMerge request to {}:{}",

Review comment:
       Added the onShuffleMergeFailure.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] tgravescs commented on pull request #30163: [SPARK-32918][SHUFFLE] RPC implementation to support control plane coordination for push-based shuffle

Posted by GitBox <gi...@apache.org>.
tgravescs commented on pull request #30163:
URL: https://github.com/apache/spark/pull/30163#issuecomment-732254079


   kicked the builds again as the error should have been fixed by https://github.com/apache/spark/commit/6da8ade5f46cac69820ef0f6987806ffa78873f1


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] zhouyejoe commented on pull request #30163: [SPARK-32918][SHUFFLE] RPC implementation to support control plane coordination for push-based shuffle

Posted by GitBox <gi...@apache.org>.
zhouyejoe commented on pull request #30163:
URL: https://github.com/apache/spark/pull/30163#issuecomment-732468532


   Thanks everyone on reviewing this PR.


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] otterc commented on a change in pull request #30163: [SPARK-32918][SHUFFLE] RPC implementation to support control plane coordination for push-based shuffle

Posted by GitBox <gi...@apache.org>.
otterc commented on a change in pull request #30163:
URL: https://github.com/apache/spark/pull/30163#discussion_r513075191



##########
File path: common/network-shuffle/src/main/java/org/apache/spark/network/shuffle/MergeFinalizerListener.java
##########
@@ -0,0 +1,35 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.network.shuffle;
+
+import java.util.EventListener;
+
+import org.apache.spark.network.shuffle.protocol.MergeStatuses;
+
+public interface MergeFinalizerListener extends EventListener {
+  /**
+   * Called once upon successful response on finalize shuffle merge on a remote shuffle service.
+   * The returned {@link MergeStatuses} is passed to the listener for further processing
+   */
+  void onShuffleMergeSuccess(MergeStatuses statuses);
+
+  /**
+   * Called once upon failure

Review comment:
       Nit: Complete this sentence.

##########
File path: common/network-shuffle/src/main/java/org/apache/spark/network/shuffle/MergeFinalizerListener.java
##########
@@ -0,0 +1,35 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.network.shuffle;
+
+import java.util.EventListener;
+
+import org.apache.spark.network.shuffle.protocol.MergeStatuses;
+
+public interface MergeFinalizerListener extends EventListener {

Review comment:
       Nit: Please add Javadoc for this interface




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] Victsm commented on pull request #30163: [SPARK-32918][SHUFFLE] RPC implementation to support control plane coordination for push-based shuffle

Posted by GitBox <gi...@apache.org>.
Victsm commented on pull request #30163:
URL: https://github.com/apache/spark/pull/30163#issuecomment-727691393


   @attilapiros thanks for that feedback, and we acknowledge the sentiment for the issues with small patches in a large feature.
   This is an oversight on my end when initially breaking SPARK-30602 into sub-tasks.
   I thought SPARK-32918 could be a reasonable sized patch but it turned out to be not the case.
   Both SPARK-32918 and SPARK-32919 are meant to be getting some of the prerequisites in place before getting to SPARK-32920 and SPARK-32921, which are relative major changes to key Spark driver components i.e. DAGScheduler and MapOutputTracker.
   The original intent was to make patches for SPARK-32920 and SPARK-32921 focusing on changes to these corresponding components only, given the potentially vigorous reviews we would receive for changing these components.


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] zhouyejoe commented on a change in pull request #30163: [SPARK-32918][SHUFFLE] RPC implementation to support control plane coordination for push-based shuffle

Posted by GitBox <gi...@apache.org>.
zhouyejoe commented on a change in pull request #30163:
URL: https://github.com/apache/spark/pull/30163#discussion_r523716272



##########
File path: common/network-shuffle/src/main/java/org/apache/spark/network/shuffle/ExternalBlockStoreClient.java
##########
@@ -181,7 +181,8 @@ public void onFailure(Throwable e) {
         }
       });
     } catch (Exception e) {
-      logger.error("Exception while sending finalizeShuffleMerge request", e);
+      logger.error(String.format("Exception while sending finalizeShuffleMerge request to %s:%s",

Review comment:
       Updated.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] AmplabJenkins commented on pull request #30163: [SPARK-32918][SHUFFLE] RPC implementation to support control plane coordination for push-based shuffle

Posted by GitBox <gi...@apache.org>.
AmplabJenkins commented on pull request #30163:
URL: https://github.com/apache/spark/pull/30163#issuecomment-717582688


   Can one of the admins verify this patch?


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] attilapiros commented on a change in pull request #30163: [SPARK-32918][SHUFFLE] RPC implementation to support control plane coordination for push-based shuffle

Posted by GitBox <gi...@apache.org>.
attilapiros commented on a change in pull request #30163:
URL: https://github.com/apache/spark/pull/30163#discussion_r523741417



##########
File path: common/network-shuffle/src/main/java/org/apache/spark/network/shuffle/ExternalBlockStoreClient.java
##########
@@ -158,6 +158,34 @@ public void pushBlocks(
     }
   }
 
+  @Override
+  public void finalizeShuffleMerge(
+      String host,
+      int port,
+      int shuffleId,
+      MergeFinalizerListener listener) {
+    checkInit();
+    try {
+      TransportClient client = clientFactory.createClient(host, port);
+      ByteBuffer finalizeShuffleMerge = new FinalizeShuffleMerge(appId, shuffleId).toByteBuffer();
+      client.sendRpc(finalizeShuffleMerge, new RpcResponseCallback() {
+        @Override
+        public void onSuccess(ByteBuffer response) {
+          listener.onShuffleMergeSuccess(
+            (MergeStatuses) BlockTransferMessage.Decoder.fromByteBuffer(response));
+        }
+
+        @Override
+        public void onFailure(Throwable e) {
+          listener.onShuffleMergeFailure(e);
+        }
+      });
+    } catch (Exception e) {
+      logger.error("Exception while sending finalizeShuffleMerge request to {}:{}",

Review comment:
       I am uncertain here: is not this one kind of shuffle merge fail too? 
   So if you do not call `onShuffleMergeFailure` here then this error is not propagated. 
   Would not this cause an endless waiting for some kind of asynchronous result at the caller?




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] mridulm commented on pull request #30163: [SPARK-32918][SHUFFLE] RPC implementation to support control plane coordination for push-based shuffle

Posted by GitBox <gi...@apache.org>.
mridulm commented on pull request #30163:
URL: https://github.com/apache/spark/pull/30163#issuecomment-732432676


   Thanks for working on this @zhouyejoe !
   And thanks for the reviews @tgravescs, @attilapiros, @Ngone51,  @otterc and @Victsm !


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] Victsm commented on a change in pull request #30163: [SPARK-32918][SHUFFLE] RPC implementation to support control plane coordination for push-based shuffle

Posted by GitBox <gi...@apache.org>.
Victsm commented on a change in pull request #30163:
URL: https://github.com/apache/spark/pull/30163#discussion_r523862805



##########
File path: common/network-shuffle/src/main/java/org/apache/spark/network/shuffle/ExternalBlockStoreClient.java
##########
@@ -158,6 +158,34 @@ public void pushBlocks(
     }
   }
 
+  @Override
+  public void finalizeShuffleMerge(
+      String host,
+      int port,
+      int shuffleId,
+      MergeFinalizerListener listener) {
+    checkInit();
+    try {
+      TransportClient client = clientFactory.createClient(host, port);
+      ByteBuffer finalizeShuffleMerge = new FinalizeShuffleMerge(appId, shuffleId).toByteBuffer();
+      client.sendRpc(finalizeShuffleMerge, new RpcResponseCallback() {
+        @Override
+        public void onSuccess(ByteBuffer response) {
+          listener.onShuffleMergeSuccess(
+            (MergeStatuses) BlockTransferMessage.Decoder.fromByteBuffer(response));
+        }
+
+        @Override
+        public void onFailure(Throwable e) {
+          listener.onShuffleMergeFailure(e);
+        }
+      });
+    } catch (Exception e) {
+      logger.error("Exception while sending finalizeShuffleMerge request to {}:{}",

Review comment:
       Yes, this is a good catch.
   `onShuffleMergeFailure` should be called here as well.
   It won't cause an endless waiting in the current implementation though, as the caller (a separate thread in DAGScheduler) would bound the wait for getting merge results from all related shuffle service locations.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] Ngone51 commented on a change in pull request #30163: [SPARK-32918][SHUFFLE] RPC implementation to support control plane coordination for push-based shuffle

Posted by GitBox <gi...@apache.org>.
Ngone51 commented on a change in pull request #30163:
URL: https://github.com/apache/spark/pull/30163#discussion_r516624455



##########
File path: common/network-shuffle/src/main/java/org/apache/spark/network/shuffle/ExternalBlockStoreClient.java
##########
@@ -158,6 +158,42 @@ public void pushBlocks(
     }
   }
 
+  /**
+   * Invoked by Spark driver to notify external shuffle services to finalize the shuffle merge
+   * for a given shuffle. This allows the driver to start the shuffle reducer stage after properly
+   * finishing the shuffle merge process associated with the shuffle mapper stage.
+   *
+   * @param host host of shuffle server
+   * @param port port of shuffle server.
+   * @param shuffleId shuffle ID of the shuffle to be finalized
+   * @param listener the listener to receive MergeStatuses
+   */
+  public void finalizeShuffleMerge(

Review comment:
       +1, we can get the client through `sc.env.blockManager.blockStoreClient` and add the method to `BlockStoreClient`.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] AmplabJenkins commented on pull request #30163: [SPARK-32918][SHUFFLE] RPC implementation to support control plane coordination for push-based shuffle

Posted by GitBox <gi...@apache.org>.
AmplabJenkins commented on pull request #30163:
URL: https://github.com/apache/spark/pull/30163#issuecomment-717583081


   Can one of the admins verify this patch?


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] zhouyejoe commented on a change in pull request #30163: [SPARK-32918][SHUFFLE] RPC implementation to support control plane coordination for push-based shuffle

Posted by GitBox <gi...@apache.org>.
zhouyejoe commented on a change in pull request #30163:
URL: https://github.com/apache/spark/pull/30163#discussion_r522744016



##########
File path: common/network-shuffle/src/main/java/org/apache/spark/network/shuffle/ExternalBlockStoreClient.java
##########
@@ -158,6 +158,42 @@ public void pushBlocks(
     }
   }
 
+  /**
+   * Invoked by Spark driver to notify external shuffle services to finalize the shuffle merge
+   * for a given shuffle. This allows the driver to start the shuffle reducer stage after properly
+   * finishing the shuffle merge process associated with the shuffle mapper stage.
+   *
+   * @param host host of shuffle server
+   * @param port port of shuffle server.
+   * @param shuffleId shuffle ID of the shuffle to be finalized
+   * @param listener the listener to receive MergeStatuses
+   */
+  public void finalizeShuffleMerge(

Review comment:
       Tried it out and it worked with our WIP branch. Updated the PR. Please review. Thanks.

##########
File path: common/network-shuffle/src/main/java/org/apache/spark/network/shuffle/MergeFinalizerListener.java
##########
@@ -0,0 +1,35 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.network.shuffle;
+
+import java.util.EventListener;
+
+import org.apache.spark.network.shuffle.protocol.MergeStatuses;
+
+public interface MergeFinalizerListener extends EventListener {

Review comment:
       Added.

##########
File path: common/network-shuffle/src/main/java/org/apache/spark/network/shuffle/MergeFinalizerListener.java
##########
@@ -0,0 +1,41 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.network.shuffle;
+
+import java.util.EventListener;
+
+import org.apache.spark.network.shuffle.protocol.MergeStatuses;
+
+/**
+ * :: DeveloperApi ::
+ *
+ * Listener providing a callback function to invoke when driver receives the response for the finalize
+ * shuffle merge request sent to remote shuffle service.
+ */

Review comment:
       Added

##########
File path: common/network-shuffle/src/main/java/org/apache/spark/network/shuffle/BlockStoreClient.java
##########
@@ -156,4 +156,22 @@ public void pushBlocks(
       BlockFetchingListener listener) {
     throw new UnsupportedOperationException();
   }
+
+  /**
+   * Invoked by Spark driver to notify external shuffle services to finalize the shuffle merge
+   * for a given shuffle. This allows the driver to start the shuffle reducer stage after properly
+   * finishing the shuffle merge process associated with the shuffle mapper stage.
+   *
+   * @param host host of shuffle server
+   * @param port port of shuffle server.
+   * @param shuffleId shuffle ID of the shuffle to be finalized
+   * @param listener the listener to receive MergeStatuses
+   */

Review comment:
       Added.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] Victsm commented on a change in pull request #30163: [SPARK-32918][SHUFFLE] RPC implementation to support control plane coordination for push-based shuffle

Posted by GitBox <gi...@apache.org>.
Victsm commented on a change in pull request #30163:
URL: https://github.com/apache/spark/pull/30163#discussion_r513080596



##########
File path: common/network-shuffle/src/main/java/org/apache/spark/network/shuffle/ExternalBlockStoreClient.java
##########
@@ -158,6 +158,42 @@ public void pushBlocks(
     }
   }
 
+  /**
+   * Invoked by Spark driver to notify external shuffle services to finalize the shuffle merge
+   * for a given shuffle. This allows the driver to start the shuffle reducer stage after properly
+   * finishing the shuffle merge process associated with the shuffle mapper stage.
+   *
+   * @param host host of shuffle server
+   * @param port port of shuffle server.
+   * @param shuffleId shuffle ID of the shuffle to be finalized
+   * @param listener the listener to receive MergeStatuses
+   */
+  public void finalizeShuffleMerge(

Review comment:
       The part we want to get community inputs is whether to put this API inside `ExternalBlockStoreClient` or a separate client.
   This RPC will be used by the Spark driver (DAGScheduler) to finalize the shuffle merge when it decides to do so and to retrieve the corresponding `MergeStatuses` from individual shuffle services for a given shuffle.
   Right now, we have to initialize an `ExternalBLockStoreClient` on the DAGScheduler side with some dummy parameters like the following:
   `
     private lazy val externalShuffleClient: Option[ExternalShuffleClient] =
   
       if (pushBasedShuffleEnabled) {
         val transConf = SparkTransportConf.fromSparkConf(sc.conf, "shuffle", 1)
         val shuffleClient = new ExternalShuffleClient(transConf, env.securityManager,
           env.securityManager.isAuthenticationEnabled(),
           sc.conf.get(config.SHUFFLE_REGISTRATION_TIMEOUT))
         shuffleClient.init(sc.conf.getAppId)
         Some(shuffleClient)
       } else {
         None
       }
   `
   
   CC @Ngone51 @jiangxb1987 @attilapiros @tgravescs 




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] tgravescs commented on a change in pull request #30163: [SPARK-32918][SHUFFLE] RPC implementation to support control plane coordination for push-based shuffle

Posted by GitBox <gi...@apache.org>.
tgravescs commented on a change in pull request #30163:
URL: https://github.com/apache/spark/pull/30163#discussion_r523265073



##########
File path: common/network-shuffle/src/main/java/org/apache/spark/network/shuffle/BlockStoreClient.java
##########
@@ -156,4 +156,22 @@ public void pushBlocks(
       BlockFetchingListener listener) {
     throw new UnsupportedOperationException();
   }
+
+  /**
+   * Invoked by Spark driver to notify external shuffle services to finalize the shuffle merge
+   * for a given shuffle. This allows the driver to start the shuffle reducer stage after properly
+   * finishing the shuffle merge process associated with the shuffle mapper stage.
+   *
+   * @param host host of shuffle server
+   * @param port port of shuffle server.
+   * @param shuffleId shuffle ID of the shuffle to be finalized
+   * @param listener the listener to receive MergeStatuses
+   */

Review comment:
       add in the  @since 3.1.0 in the java doc

##########
File path: common/network-shuffle/src/main/java/org/apache/spark/network/shuffle/ExternalBlockStoreClient.java
##########
@@ -158,6 +158,33 @@ public void pushBlocks(
     }
   }
 
+  @Override
+  public void finalizeShuffleMerge(
+      String host,
+      int port,
+      int shuffleId,
+      MergeFinalizerListener listener) {
+    checkInit();
+    try {
+      TransportClient client = clientFactory.createClient(host, port);
+      ByteBuffer finalizeShuffleMerge = new FinalizeShuffleMerge(appId, shuffleId).toByteBuffer();
+      client.sendRpc(finalizeShuffleMerge, new RpcResponseCallback() {
+        @Override
+        public void onSuccess(ByteBuffer response) {
+          listener.onShuffleMergeSuccess(
+            (MergeStatuses) BlockTransferMessage.Decoder.fromByteBuffer(response));
+        }
+
+        @Override
+        public void onFailure(Throwable e) {
+          listener.onShuffleMergeFailure(e);
+        }
+      });
+    } catch (Exception e) {
+      logger.error("Exception while sending finalizeShuffleMerge request", e);

Review comment:
       perhaps add the host and port to the log message so we know which one failed




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] asfgit closed pull request #30163: [SPARK-32918][SHUFFLE] RPC implementation to support control plane coordination for push-based shuffle

Posted by GitBox <gi...@apache.org>.
asfgit closed pull request #30163:
URL: https://github.com/apache/spark/pull/30163


   


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] Victsm commented on a change in pull request #30163: [SPARK-32918][SHUFFLE] RPC implementation to support control plane coordination for push-based shuffle

Posted by GitBox <gi...@apache.org>.
Victsm commented on a change in pull request #30163:
URL: https://github.com/apache/spark/pull/30163#discussion_r523715366



##########
File path: common/network-shuffle/src/main/java/org/apache/spark/network/shuffle/ExternalBlockStoreClient.java
##########
@@ -181,7 +181,8 @@ public void onFailure(Throwable e) {
         }
       });
     } catch (Exception e) {
-      logger.error("Exception while sending finalizeShuffleMerge request", e);
+      logger.error(String.format("Exception while sending finalizeShuffleMerge request to %s:%s",

Review comment:
       Nit: use {} instead to be more concise.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] AmplabJenkins removed a comment on pull request #30163: [SPARK-32918][SHUFFLE] RPC implementation to support control plane coordination for push-based shuffle

Posted by GitBox <gi...@apache.org>.
AmplabJenkins removed a comment on pull request #30163:
URL: https://github.com/apache/spark/pull/30163#issuecomment-717582688


   Can one of the admins verify this patch?


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] mridulm commented on a change in pull request #30163: [SPARK-32918][SHUFFLE] RPC implementation to support control plane coordination for push-based shuffle

Posted by GitBox <gi...@apache.org>.
mridulm commented on a change in pull request #30163:
URL: https://github.com/apache/spark/pull/30163#discussion_r523086823



##########
File path: common/network-shuffle/src/main/java/org/apache/spark/network/shuffle/MergeFinalizerListener.java
##########
@@ -0,0 +1,41 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.network.shuffle;
+
+import java.util.EventListener;
+
+import org.apache.spark.network.shuffle.protocol.MergeStatuses;
+
+/**
+ * :: DeveloperApi ::
+ *
+ * Listener providing a callback function to invoke when driver receives the response for the finalize
+ * shuffle merge request sent to remote shuffle service.
+ */

Review comment:
       Add `@since`




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] zhouyejoe commented on a change in pull request #30163: [SPARK-32918][SHUFFLE] RPC implementation to support control plane coordination for push-based shuffle

Posted by GitBox <gi...@apache.org>.
zhouyejoe commented on a change in pull request #30163:
URL: https://github.com/apache/spark/pull/30163#discussion_r517852660



##########
File path: common/network-shuffle/src/main/java/org/apache/spark/network/shuffle/ExternalBlockStoreClient.java
##########
@@ -158,6 +158,42 @@ public void pushBlocks(
     }
   }
 
+  /**
+   * Invoked by Spark driver to notify external shuffle services to finalize the shuffle merge
+   * for a given shuffle. This allows the driver to start the shuffle reducer stage after properly
+   * finishing the shuffle merge process associated with the shuffle mapper stage.
+   *
+   * @param host host of shuffle server
+   * @param port port of shuffle server.
+   * @param shuffleId shuffle ID of the shuffle to be finalized
+   * @param listener the listener to receive MergeStatuses
+   */
+  public void finalizeShuffleMerge(

Review comment:
       Good suggestion. I will try it out and update the PR accordingly.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org