You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@spark.apache.org by mr...@apache.org on 2021/08/10 21:55:46 UTC

[spark] branch branch-3.2 updated: [SPARK-36378][SHUFFLE] Switch to using RPCResponse to communicate common block push failures to the client

This is an automated email from the ASF dual-hosted git repository.

mridulm80 pushed a commit to branch branch-3.2
in repository https://gitbox.apache.org/repos/asf/spark.git


The following commit(s) were added to refs/heads/branch-3.2 by this push:
     new c6b683e  [SPARK-36378][SHUFFLE] Switch to using RPCResponse to communicate common block push failures to the client
c6b683e is described below

commit c6b683e5a2b547b0259c83c90e69739252eb8fbf
Author: Min Shen <ms...@linkedin.com>
AuthorDate: Tue Aug 10 16:46:55 2021 -0500

    [SPARK-36378][SHUFFLE] Switch to using RPCResponse to communicate common block push failures to the client
    
    We have run performance evaluations on the version of push-based shuffle committed to upstream so far, and have identified a few places for further improvements:
    1. On the server side, we have noticed that the usage of `String.format`, especially when receiving a block push request, has a much higher overhead compared with string concatenation.
    2. On the server side, the usage of `Throwables.getStackTraceAsString` in the `ErrorHandler.shouldRetryError` and `ErrorHandler.shouldLogError` has generated quite some overhead.
    
    These 2 issues are related to how we are currently handling certain common block push failures.
    We are communicating such failures via `RPCFailure` by transmitting the exception stack trace.
    This generates the overhead on both server and client side for creating these exceptions and makes checking the type of failures fragile and inefficient with string matching of exception stack trace.
    To address these, this PR also proposes to encode the common block push failure as an error code and send that back to the client with a proper RPC message.
    
    Improve shuffle service efficiency for push-based shuffle.
    Improve code robustness for handling block push failures.
    
    No
    
    Existing unit tests.
    
    Closes #33613 from Victsm/SPARK-36378.
    
    Lead-authored-by: Min Shen <ms...@linkedin.com>
    Co-authored-by: Min Shen <vi...@gmail.com>
    Signed-off-by: Mridul Muralidharan <mridul<at>gmail.com>
    (cherry picked from commit 3f09093a21306b0fbcb132d4c9f285e56ac6b43c)
    Signed-off-by: Mridul Muralidharan <mridulatgmail.com>
---
 .../spark/network/client/StreamCallbackWithID.java |  10 ++
 .../network/server/BlockPushNonFatalFailure.java   | 157 +++++++++++++++++++++
 .../network/server/TransportRequestHandler.java    |  10 +-
 .../apache/spark/network/shuffle/ErrorHandler.java |  33 +----
 .../network/shuffle/ExternalBlockStoreClient.java  |   2 +-
 .../network/shuffle/OneForOneBlockPusher.java      |  85 ++++++-----
 .../network/shuffle/RemoteBlockPushResolver.java   | 147 ++++++++++++-------
 .../shuffle/protocol/BlockPushReturnCode.java      |  94 ++++++++++++
 .../shuffle/protocol/BlockTransferMessage.java     |   4 +-
 .../spark/network/shuffle/ErrorHandlerSuite.java   |  23 +--
 .../network/shuffle/OneForOneBlockPusherSuite.java |  14 +-
 .../shuffle/RemoteBlockPushResolverSuite.java      | 105 +++++++++-----
 .../apache/spark/shuffle/ShuffleBlockPusher.scala  |  12 +-
 .../spark/shuffle/ShuffleBlockPusherSuite.scala    |  38 ++---
 14 files changed, 547 insertions(+), 187 deletions(-)

diff --git a/common/network-common/src/main/java/org/apache/spark/network/client/StreamCallbackWithID.java b/common/network-common/src/main/java/org/apache/spark/network/client/StreamCallbackWithID.java
index bd173b6..3ee524a 100644
--- a/common/network-common/src/main/java/org/apache/spark/network/client/StreamCallbackWithID.java
+++ b/common/network-common/src/main/java/org/apache/spark/network/client/StreamCallbackWithID.java
@@ -17,6 +17,16 @@
 
 package org.apache.spark.network.client;
 
+import java.nio.ByteBuffer;
+
 public interface StreamCallbackWithID extends StreamCallback {
   String getID();
+
+  /**
+   * Response to return to client upon the completion of a stream. Currently only invoked in
+   * {@link org.apache.spark.network.server.TransportRequestHandler#processStreamUpload}
+   */
+  default ByteBuffer getCompletionResponse() {
+    return ByteBuffer.allocate(0);
+  }
 }
diff --git a/common/network-common/src/main/java/org/apache/spark/network/server/BlockPushNonFatalFailure.java b/common/network-common/src/main/java/org/apache/spark/network/server/BlockPushNonFatalFailure.java
new file mode 100644
index 0000000..5906fa2
--- /dev/null
+++ b/common/network-common/src/main/java/org/apache/spark/network/server/BlockPushNonFatalFailure.java
@@ -0,0 +1,157 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.network.server;
+
+import java.nio.ByteBuffer;
+
+import com.google.common.base.Preconditions;
+
+/**
+ * A special RuntimeException thrown when shuffle service experiences a non-fatal failure
+ * with handling block push requests with push-based shuffle. Due to the best-effort nature
+ * of push-based shuffle, there are cases where the exceptions gets thrown under certain
+ * relatively common cases such as when a pushed block is received after the corresponding
+ * shuffle is merge finalized or when a pushed block experiences merge collision. Under these
+ * scenarios, we throw this special RuntimeException.
+ */
+public class BlockPushNonFatalFailure extends RuntimeException {
+  /**
+   * String constant used for generating exception messages indicating a block to be merged
+   * arrives too late on the server side. When we get a block push failure because of the
+   * block arrives too late, we will not retry pushing the block nor log the exception on
+   * the client side.
+   */
+  public static final String TOO_LATE_BLOCK_PUSH_MESSAGE_SUFFIX =
+    " is received after merged shuffle is finalized";
+
+  /**
+   * String constant used for generating exception messages indicating a block to be merged
+   * is a stale block push in the case of indeterminate stage retries on the server side.
+   * When we get a block push failure because of the block push being stale, we will not
+   * retry pushing the block nor log the exception on the client side.
+   */
+  public static final String STALE_BLOCK_PUSH_MESSAGE_SUFFIX =
+    " is a stale block push from an indeterminate stage retry";
+
+  /**
+   * String constant used for generating exception messages indicating the server couldn't
+   * append a block after all available attempts due to collision with other blocks belonging
+   * to the same shuffle partition. When we get a block push failure because of the block
+   * couldn't be written due to this reason, we will not log the exception on the client side.
+   */
+  public static final String BLOCK_APPEND_COLLISION_MSG_SUFFIX =
+    " experienced merge collision on the server side";
+
+  /**
+   * The error code of the failure, encoded as a ByteBuffer to be responded back to the client.
+   * Instead of responding a RPCFailure with the exception stack trace as the payload,
+   * which makes checking the content of the exception very tedious on the client side,
+   * we can respond a proper RPCResponse to make it more robust and efficient. This
+   * field is only set on the shuffle server side when the exception is originally generated.
+   */
+  private ByteBuffer response;
+
+  /**
+   * The error code of the failure. This field is only set on the client side when a
+   * BlockPushNonFatalFailure is recreated from the error code received from the server.
+   */
+  private ReturnCode returnCode;
+
+  public BlockPushNonFatalFailure(ByteBuffer response, String msg) {
+    super(msg);
+    this.response = response;
+  }
+
+  public BlockPushNonFatalFailure(ReturnCode returnCode, String msg) {
+    super(msg);
+    this.returnCode = returnCode;
+  }
+
+  /**
+   * Since this type of exception is used to only convey the error code, we reduce the
+   * exception initialization overhead by skipping filling the stack trace.
+   */
+  @Override
+  public synchronized Throwable fillInStackTrace() {
+    return this;
+  }
+
+  public ByteBuffer getResponse() {
+    // Ensure we do not invoke this method if response is not set
+    Preconditions.checkNotNull(response);
+    return response;
+  }
+
+  public ReturnCode getReturnCode() {
+    // Ensure we do not invoke this method if returnCode is not set
+    Preconditions.checkNotNull(returnCode);
+    return returnCode;
+  }
+
+  public enum ReturnCode {
+    /**
+     * Indicate the case of a successful merge of a pushed block.
+     */
+    SUCCESS(0, ""),
+    /**
+     * Indicate a block to be merged arrives too late on the server side, i.e. after the
+     * corresponding shuffle has been merge finalized. When the client gets this code, it
+     * will not retry pushing the block.
+     */
+    TOO_LATE_BLOCK_PUSH(1, TOO_LATE_BLOCK_PUSH_MESSAGE_SUFFIX),
+    /**
+     * Indicating the server couldn't append a block after all available attempts due to
+     * collision with other blocks belonging to the same shuffle partition.
+     */
+    BLOCK_APPEND_COLLISION_DETECTED(2, BLOCK_APPEND_COLLISION_MSG_SUFFIX),
+    /**
+     * Indicate a block received on the server side is a stale block push in the case of
+     * indeterminate stage retries. When the client receives this code, it will not retry
+     * pushing the block.
+     */
+    STALE_BLOCK_PUSH(3, STALE_BLOCK_PUSH_MESSAGE_SUFFIX);
+
+    private final byte id;
+    // Error message suffix used to generate an error message for a given ReturnCode and
+    // a given block ID
+    private final String errorMsgSuffix;
+
+    ReturnCode(int id, String errorMsgSuffix) {
+      assert id < 128 : "Cannot have more than 128 block push return code";
+      this.id = (byte) id;
+      this.errorMsgSuffix = errorMsgSuffix;
+    }
+
+    public byte id() { return id; }
+  }
+
+  public static ReturnCode getReturnCode(byte id) {
+    switch (id) {
+      case 0: return ReturnCode.SUCCESS;
+      case 1: return ReturnCode.TOO_LATE_BLOCK_PUSH;
+      case 2: return ReturnCode.BLOCK_APPEND_COLLISION_DETECTED;
+      case 3: return ReturnCode.STALE_BLOCK_PUSH;
+      default: throw new IllegalArgumentException("Unknown block push return code: " + id);
+    }
+  }
+
+  public static String getErrorMsg(String blockId, ReturnCode errorCode) {
+    Preconditions.checkArgument(errorCode != ReturnCode.SUCCESS);
+    return "Block " + blockId + errorCode.errorMsgSuffix;
+  }
+}
diff --git a/common/network-common/src/main/java/org/apache/spark/network/server/TransportRequestHandler.java b/common/network-common/src/main/java/org/apache/spark/network/server/TransportRequestHandler.java
index ab2deac..5c07f20 100644
--- a/common/network-common/src/main/java/org/apache/spark/network/server/TransportRequestHandler.java
+++ b/common/network-common/src/main/java/org/apache/spark/network/server/TransportRequestHandler.java
@@ -213,7 +213,15 @@ public class TransportRequestHandler extends MessageHandler<RequestMessage> {
         public void onComplete(String streamId) throws IOException {
            try {
              streamHandler.onComplete(streamId);
-             callback.onSuccess(ByteBuffer.allocate(0));
+             callback.onSuccess(streamHandler.getCompletionResponse());
+           } catch (BlockPushNonFatalFailure ex) {
+             // Respond an RPC message with the error code to client instead of using exceptions
+             // encoded in the RPCFailure. This type of exceptions gets thrown more frequently
+             // than a regular exception on the shuffle server side due to the best-effort nature
+             // of push-based shuffle and requires special handling on the client side. Using a
+             // proper RPCResponse is more efficient.
+             callback.onSuccess(ex.getResponse());
+             streamHandler.onFailure(streamId, ex);
            } catch (Exception ex) {
              IOException ioExc = new IOException("Failure post-processing complete stream;" +
                " failing this rpc and leaving channel active", ex);
diff --git a/common/network-shuffle/src/main/java/org/apache/spark/network/shuffle/ErrorHandler.java b/common/network-shuffle/src/main/java/org/apache/spark/network/shuffle/ErrorHandler.java
index 0149ad7..271d762 100644
--- a/common/network-shuffle/src/main/java/org/apache/spark/network/shuffle/ErrorHandler.java
+++ b/common/network-shuffle/src/main/java/org/apache/spark/network/shuffle/ErrorHandler.java
@@ -23,6 +23,9 @@ import java.net.ConnectException;
 import com.google.common.base.Throwables;
 
 import org.apache.spark.annotation.Evolving;
+import org.apache.spark.network.server.BlockPushNonFatalFailure;
+
+import static org.apache.spark.network.server.BlockPushNonFatalFailure.ReturnCode.*;
 
 /**
  * Plugs into {@link RetryingBlockTransferor} to further control when an exception should be retried
@@ -54,27 +57,6 @@ public interface ErrorHandler {
    */
   class BlockPushErrorHandler implements ErrorHandler {
     /**
-     * String constant used for generating exception messages indicating a block to be merged
-     * arrives too late or stale block push in the case of indeterminate stage retries on the
-     * server side, and also for later checking such exceptions on the client side. When we get
-     * a block push failure because of the block push being stale or arrives too late, we will
-     * not retry pushing the block nor log the exception on the client side.
-     */
-    public static final String TOO_LATE_OR_STALE_BLOCK_PUSH_MESSAGE_SUFFIX =
-      "received after merged shuffle is finalized or stale block push as shuffle blocks of a"
-        + " higher shuffleMergeId for the shuffle is being pushed";
-
-    /**
-     * String constant used for generating exception messages indicating the server couldn't
-     * append a block after all available attempts due to collision with other blocks belonging
-     * to the same shuffle partition, and also for later checking such exceptions on the client
-     * side. When we get a block push failure because of the block couldn't be written due to
-     * this reason, we will not log the exception on the client side.
-     */
-    public static final String BLOCK_APPEND_COLLISION_DETECTED_MSG_PREFIX =
-      "Couldn't find an opportunity to write block";
-
-    /**
      * String constant used for generating exception messages indicating the server encountered
      * IOExceptions multiple times, greater than the configured threshold, while trying to merged
      * shuffle blocks of the same shuffle partition. When the client receives this this response,
@@ -105,16 +87,15 @@ public interface ErrorHandler {
         return false;
       }
 
-      String errorStackTrace = Throwables.getStackTraceAsString(t);
       // If the block is too late or stale block push, there is no need to retry it
-      return !errorStackTrace.contains(TOO_LATE_OR_STALE_BLOCK_PUSH_MESSAGE_SUFFIX);
+      return !(t instanceof BlockPushNonFatalFailure &&
+        (((BlockPushNonFatalFailure) t).getReturnCode() == TOO_LATE_BLOCK_PUSH ||
+          ((BlockPushNonFatalFailure) t).getReturnCode() == STALE_BLOCK_PUSH));
     }
 
     @Override
     public boolean shouldLogError(Throwable t) {
-      String errorStackTrace = Throwables.getStackTraceAsString(t);
-      return !(errorStackTrace.contains(BLOCK_APPEND_COLLISION_DETECTED_MSG_PREFIX) ||
-        errorStackTrace.contains(TOO_LATE_OR_STALE_BLOCK_PUSH_MESSAGE_SUFFIX));
+      return !(t instanceof BlockPushNonFatalFailure);
     }
   }
 
diff --git a/common/network-shuffle/src/main/java/org/apache/spark/network/shuffle/ExternalBlockStoreClient.java b/common/network-shuffle/src/main/java/org/apache/spark/network/shuffle/ExternalBlockStoreClient.java
index 826402c..4c0e9f3 100644
--- a/common/network-shuffle/src/main/java/org/apache/spark/network/shuffle/ExternalBlockStoreClient.java
+++ b/common/network-shuffle/src/main/java/org/apache/spark/network/shuffle/ExternalBlockStoreClient.java
@@ -218,7 +218,7 @@ public class ExternalBlockStoreClient extends BlockStoreClient {
           public void onSuccess(int numChunks, ManagedBuffer buffer) {
             logger.trace("Successfully got merged block meta for shuffleId {} shuffleMergeId {}"
               + " reduceId {}", shuffleId, shuffleMergeId, reduceId);
-            listener.onSuccess(shuffleId, reduceId, shuffleMergeId,
+            listener.onSuccess(shuffleId, shuffleMergeId, reduceId,
               new MergedBlockMeta(numChunks, buffer));
           }
 
diff --git a/common/network-shuffle/src/main/java/org/apache/spark/network/shuffle/OneForOneBlockPusher.java b/common/network-shuffle/src/main/java/org/apache/spark/network/shuffle/OneForOneBlockPusher.java
index f9d313c..8885dc9 100644
--- a/common/network-shuffle/src/main/java/org/apache/spark/network/shuffle/OneForOneBlockPusher.java
+++ b/common/network-shuffle/src/main/java/org/apache/spark/network/shuffle/OneForOneBlockPusher.java
@@ -21,6 +21,7 @@ import java.nio.ByteBuffer;
 import java.util.Arrays;
 import java.util.Map;
 
+import com.google.common.base.Preconditions;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -28,6 +29,10 @@ import org.apache.spark.network.buffer.ManagedBuffer;
 import org.apache.spark.network.buffer.NioManagedBuffer;
 import org.apache.spark.network.client.RpcResponseCallback;
 import org.apache.spark.network.client.TransportClient;
+import org.apache.spark.network.server.BlockPushNonFatalFailure;
+import org.apache.spark.network.server.BlockPushNonFatalFailure.ReturnCode;
+import org.apache.spark.network.shuffle.protocol.BlockPushReturnCode;
+import org.apache.spark.network.shuffle.protocol.BlockTransferMessage;
 import org.apache.spark.network.shuffle.protocol.PushBlockStream;
 
 /**
@@ -77,42 +82,58 @@ public class OneForOneBlockPusher {
 
     @Override
     public void onSuccess(ByteBuffer response) {
-      // On receipt of a successful block push
-      listener.onBlockPushSuccess(blockId, new NioManagedBuffer(ByteBuffer.allocate(0)));
+      BlockPushReturnCode pushResponse =
+        (BlockPushReturnCode) BlockTransferMessage.Decoder.fromByteBuffer(response);
+      // If the return code is not SUCCESS, the server has responded some error code. Handle
+      // the error accordingly.
+      ReturnCode returnCode = BlockPushNonFatalFailure.getReturnCode(pushResponse.returnCode);
+      if (returnCode != ReturnCode.SUCCESS) {
+        String blockId = pushResponse.failureBlockId;
+        Preconditions.checkArgument(!blockId.isEmpty());
+        checkAndFailRemainingBlocks(index, new BlockPushNonFatalFailure(returnCode,
+          BlockPushNonFatalFailure.getErrorMsg(blockId, returnCode)));
+      } else {
+        // On receipt of a successful block push
+        listener.onBlockPushSuccess(blockId, new NioManagedBuffer(ByteBuffer.allocate(0)));
+      }
     }
 
     @Override
     public void onFailure(Throwable e) {
-      // Since block push is best effort, i.e., if we encounter a block push failure that's still
-      // retriable according to ErrorHandler (not a connection exception and the block is not too
-      // late), we should not fail all remaining block pushes even though
-      // RetryingBlockTransferor might consider this failure not retriable (exceeding max retry
-      // count etc). The best effort nature makes block push tolerable of a partial completion.
-      // Thus, we only fail the block that's actually failed in this case. Note that, on the
-      // RetryingBlockTransferor side, if retry is initiated, it would still invalidate the
-      // previous active retry listener, and retry pushing all outstanding blocks. However, since
-      // the blocks to be pushed are preloaded into memory and the first attempt of pushing these
-      // blocks might have already succeeded, retry pushing all the outstanding blocks should be
-      // very cheap (on the client side, the block data is in memory; on the server side, the block
-      // will be recognized as a duplicate which triggers noop handling). Here, by failing only the
-      // one block that's actually failed, we are essentially preventing forwarding unnecessary
-      // block push failures to the parent listener of the retry listener.
-      //
-      // Take the following as an example. For the common exception during block push handling,
-      // i.e. block collision, it is considered as retriable by ErrorHandler but not retriable
-      // by RetryingBlockTransferor. When we encounter a failure of this type, we only fail the
-      // one block encountering this issue not the remaining blocks in the same batch. On the
-      // RetryingBlockTransferor side, since this exception is considered as not retriable, it
-      // would immediately invoke parent listener's onBlockTransferFailure. However, the remaining
-      // blocks in the same batch would remain current and active and they won't be impacted by
-      // this exception.
-      if (PUSH_ERROR_HANDLER.shouldRetryError(e)) {
-        String[] targetBlockId = Arrays.copyOfRange(blockIds, index, index + 1);
-        failRemainingBlocks(targetBlockId, e);
-      } else {
-        String[] targetBlockId = Arrays.copyOfRange(blockIds, index, blockIds.length);
-        failRemainingBlocks(targetBlockId, e);
-      }
+      checkAndFailRemainingBlocks(index, e);
+    }
+  }
+
+  private void checkAndFailRemainingBlocks(int index, Throwable e) {
+    // Since block push is best effort, i.e., if we encounter a block push failure that's still
+    // retriable according to ErrorHandler (not a connection exception and the block is not too
+    // late), we should not fail all remaining block pushes even though
+    // RetryingBlockTransferor might consider this failure not retriable (exceeding max retry
+    // count etc). The best effort nature makes block push tolerable of a partial completion.
+    // Thus, we only fail the block that's actually failed in this case. Note that, on the
+    // RetryingBlockTransferor side, if retry is initiated, it would still invalidate the
+    // previous active retry listener, and retry pushing all outstanding blocks. However, since
+    // the blocks to be pushed are preloaded into memory and the first attempt of pushing these
+    // blocks might have already succeeded, retry pushing all the outstanding blocks should be
+    // very cheap (on the client side, the block data is in memory; on the server side, the block
+    // will be recognized as a duplicate which triggers noop handling). Here, by failing only the
+    // one block that's actually failed, we are essentially preventing forwarding unnecessary
+    // block push failures to the parent listener of the retry listener.
+    //
+    // Take the following as an example. For the common exception during block push handling,
+    // i.e. block collision, it is considered as retriable by ErrorHandler but not retriable
+    // by RetryingBlockTransferor. When we encounter a failure of this type, we only fail the
+    // one block encountering this issue not the remaining blocks in the same batch. On the
+    // RetryingBlockTransferor side, since this exception is considered as not retriable, it
+    // would immediately invoke parent listener's onBlockTransferFailure. However, the remaining
+    // blocks in the same batch would remain current and active and they won't be impacted by
+    // this exception.
+    if (PUSH_ERROR_HANDLER.shouldRetryError(e)) {
+      String[] targetBlockId = Arrays.copyOfRange(blockIds, index, index + 1);
+      failRemainingBlocks(targetBlockId, e);
+    } else {
+      String[] targetBlockId = Arrays.copyOfRange(blockIds, index, blockIds.length);
+      failRemainingBlocks(targetBlockId, e);
     }
   }
 
diff --git a/common/network-shuffle/src/main/java/org/apache/spark/network/shuffle/RemoteBlockPushResolver.java b/common/network-shuffle/src/main/java/org/apache/spark/network/shuffle/RemoteBlockPushResolver.java
index 4f26ddf..80174d1 100644
--- a/common/network-shuffle/src/main/java/org/apache/spark/network/shuffle/RemoteBlockPushResolver.java
+++ b/common/network-shuffle/src/main/java/org/apache/spark/network/shuffle/RemoteBlockPushResolver.java
@@ -56,6 +56,9 @@ import org.slf4j.LoggerFactory;
 import org.apache.spark.network.buffer.FileSegmentManagedBuffer;
 import org.apache.spark.network.buffer.ManagedBuffer;
 import org.apache.spark.network.client.StreamCallbackWithID;
+import org.apache.spark.network.server.BlockPushNonFatalFailure;
+import org.apache.spark.network.server.BlockPushNonFatalFailure.ReturnCode;
+import org.apache.spark.network.shuffle.protocol.BlockPushReturnCode;
 import org.apache.spark.network.shuffle.protocol.ExecutorShuffleInfo;
 import org.apache.spark.network.shuffle.protocol.FinalizeShuffleMerge;
 import org.apache.spark.network.shuffle.protocol.MergeStatuses;
@@ -81,6 +84,10 @@ public class RemoteBlockPushResolver implements MergedShuffleFileManager {
   private static final int UNDEFINED_ATTEMPT_ID = -1;
   // Shuffles of determinate stages will have shuffleMergeId set to 0
   private static final int DETERMINATE_SHUFFLE_MERGE_ID = 0;
+  private static final ErrorHandler.BlockPushErrorHandler ERROR_HANDLER = createErrorHandler();
+  // ByteBuffer to respond to client upon a successful merge of a pushed block
+  private static final ByteBuffer SUCCESS_RESPONSE =
+    new BlockPushReturnCode(ReturnCode.SUCCESS.id(), "").toByteBuffer().asReadOnlyBuffer();
 
   // ConcurrentHashMap doesn't allow null for keys or values which is why this is required.
   // Marker to identify finalized indeterminate shuffle partitions in the case of indeterminate
@@ -101,7 +108,6 @@ public class RemoteBlockPushResolver implements MergedShuffleFileManager {
   private final TransportConf conf;
   private final int minChunkSize;
   private final int ioExceptionsThresholdDuringMerge;
-  private final ErrorHandler.BlockPushErrorHandler errorHandler;
 
   @SuppressWarnings("UnstableApiUsage")
   private final LoadingCache<File, ShuffleIndexInformation> indexCache;
@@ -125,7 +131,19 @@ public class RemoteBlockPushResolver implements MergedShuffleFileManager {
       .maximumWeight(conf.mergedIndexCacheSize())
       .weigher((Weigher<File, ShuffleIndexInformation>) (file, indexInfo) -> indexInfo.getSize())
       .build(indexCacheLoader);
-    this.errorHandler = new ErrorHandler.BlockPushErrorHandler();
+  }
+
+  @VisibleForTesting
+  protected static ErrorHandler.BlockPushErrorHandler createErrorHandler() {
+    return new ErrorHandler.BlockPushErrorHandler() {
+      // Explicitly use a shuffle service side error handler for handling exceptions.
+      // BlockPushNonException on the server side only has the response field set. It
+      // might require different handling logic compared with a client side error handler.
+      @Override
+      public boolean shouldLogError(Throwable t) {
+        return !(t instanceof BlockPushNonFatalFailure);
+      }
+    };
   }
 
   @VisibleForTesting
@@ -146,7 +164,8 @@ public class RemoteBlockPushResolver implements MergedShuffleFileManager {
       AppShuffleInfo appShuffleInfo,
       int shuffleId,
       int shuffleMergeId,
-      int reduceId) throws StaleBlockPushException {
+      int reduceId,
+      String blockId) throws BlockPushNonFatalFailure {
     ConcurrentMap<Integer, AppShuffleMergePartitionsInfo> shuffles = appShuffleInfo.shuffles;
     AppShuffleMergePartitionsInfo shufflePartitionsWithMergeId =
       shuffles.compute(shuffleId, (id, appShuffleMergePartitionsInfo) -> {
@@ -158,7 +177,9 @@ public class RemoteBlockPushResolver implements MergedShuffleFileManager {
           // In that case the block is considered late. In the case of indeterminate stages, most
           // recent shuffleMergeId finalized would be pointing to INDETERMINATE_SHUFFLE_FINALIZED
           if (dataFile.exists()) {
-            return null;
+            throw new BlockPushNonFatalFailure(new BlockPushReturnCode(
+              ReturnCode.TOO_LATE_BLOCK_PUSH.id(), blockId).toByteBuffer(),
+              BlockPushNonFatalFailure.getErrorMsg(blockId, ReturnCode.TOO_LATE_BLOCK_PUSH));
           } else {
             logger.info("Creating a new attempt for shuffle blocks push request for shuffle {}"
               + " with shuffleMergeId {} for application {}_{}", shuffleId, shuffleMergeId,
@@ -170,10 +191,9 @@ public class RemoteBlockPushResolver implements MergedShuffleFileManager {
           // current incoming one
           int latestShuffleMergeId = appShuffleMergePartitionsInfo.shuffleMergeId;
           if (latestShuffleMergeId > shuffleMergeId) {
-            throw new StaleBlockPushException(String.format("Rejecting shuffle blocks push request"
-              + " for shuffle %s with shuffleMergeId %s for application %s_%s as a higher"
-              + " shuffleMergeId %s request is already seen", shuffleId, shuffleMergeId,
-              appShuffleInfo.appId, appShuffleInfo.attemptId, latestShuffleMergeId));
+            throw new BlockPushNonFatalFailure(
+              new BlockPushReturnCode(ReturnCode.STALE_BLOCK_PUSH.id(), blockId).toByteBuffer(),
+              BlockPushNonFatalFailure.getErrorMsg(blockId, ReturnCode.STALE_BLOCK_PUSH));
           } else if (latestShuffleMergeId == shuffleMergeId) {
             return appShuffleMergePartitionsInfo;
           } else {
@@ -194,7 +214,9 @@ public class RemoteBlockPushResolver implements MergedShuffleFileManager {
     // It only gets here when the shuffle is already finalized.
     if (null == shufflePartitionsWithMergeId ||
         INDETERMINATE_SHUFFLE_FINALIZED == shufflePartitionsWithMergeId.shuffleMergePartitions) {
-      return null;
+      throw new BlockPushNonFatalFailure(
+        new BlockPushReturnCode(ReturnCode.TOO_LATE_BLOCK_PUSH.id(), blockId).toByteBuffer(),
+        BlockPushNonFatalFailure.getErrorMsg(blockId, ReturnCode.TOO_LATE_BLOCK_PUSH));
     }
 
     Map<Integer, AppShufflePartitionInfo> shuffleMergePartitions =
@@ -379,9 +401,6 @@ public class RemoteBlockPushResolver implements MergedShuffleFileManager {
   @Override
   public StreamCallbackWithID receiveBlockDataAsStream(PushBlockStream msg) {
     AppShuffleInfo appShuffleInfo = validateAndGetAppShuffleInfo(msg.appId);
-    final String streamId = String.format("%s_%d_%d_%d_%d",
-      OneForOneBlockPusher.SHUFFLE_PUSH_BLOCK_PREFIX, msg.shuffleId, msg.shuffleMergeId,
-      msg.mapIndex, msg.reduceId);
     if (appShuffleInfo.attemptId != msg.appAttemptId) {
       // If this Block belongs to a former application attempt, it is considered late,
       // as only the blocks from the current application attempt will be merged
@@ -391,14 +410,20 @@ public class RemoteBlockPushResolver implements MergedShuffleFileManager {
           + "with the current attempt id %s stored in shuffle service for application %s",
           msg.appAttemptId, appShuffleInfo.attemptId, msg.appId));
     }
+    // Use string concatenation here to avoid the overhead with String.format on every
+    // pushed block.
+    final String streamId = OneForOneBlockPusher.SHUFFLE_PUSH_BLOCK_PREFIX + "_"
+      + msg.shuffleId + "_" + msg.shuffleMergeId + "_" + msg.mapIndex + "_" + msg.reduceId;
     // Retrieve merged shuffle file metadata
     AppShufflePartitionInfo partitionInfoBeforeCheck;
+    BlockPushNonFatalFailure failure = null;
     try {
       partitionInfoBeforeCheck = getOrCreateAppShufflePartitionInfo(appShuffleInfo, msg.shuffleId,
-        msg.shuffleMergeId, msg.reduceId);
-    } catch(StaleBlockPushException sbp) {
+        msg.shuffleMergeId, msg.reduceId, streamId);
+    } catch (BlockPushNonFatalFailure bpf) {
       // Set partitionInfoBeforeCheck to null so that stale block push gets handled.
       partitionInfoBeforeCheck = null;
+      failure = bpf;
     }
     // Here partitionInfo will be null in 3 cases:
     // 1) The request is received for a block that has already been merged, this is possible due
@@ -442,17 +467,15 @@ public class RemoteBlockPushResolver implements MergedShuffleFileManager {
     // getting killed. When this happens, we need to distinguish the duplicate blocks as they
     // arrive. More details on this is explained in later comments.
 
-    // Track if the block is received after shuffle merge finalized or from an older
-    // shuffleMergeId attempt.
-    final boolean isStaleBlockOrTooLate = partitionInfoBeforeCheck == null;
     // Check if the given block is already merged by checking the bitmap against the given map
     // index
-    final AppShufflePartitionInfo partitionInfo = isStaleBlockOrTooLate ? null :
+    final AppShufflePartitionInfo partitionInfo = failure != null ? null :
       partitionInfoBeforeCheck.mapTracker.contains(msg.mapIndex) ? null : partitionInfoBeforeCheck;
     if (partitionInfo != null) {
       return new PushBlockStreamCallback(
         this, appShuffleInfo, streamId, partitionInfo, msg.mapIndex);
     } else {
+      final BlockPushNonFatalFailure finalFailure = failure;
       // For a duplicate block or a block which is late or stale block from an older
       // shuffleMergeId, respond back with a callback that handles them differently.
       return new StreamCallbackWithID() {
@@ -469,11 +492,10 @@ public class RemoteBlockPushResolver implements MergedShuffleFileManager {
 
         @Override
         public void onComplete(String streamId) {
-          if (isStaleBlockOrTooLate) {
-            // Throw an exception here so the block data is drained from channel and server
-            // responds RpcFailure to the client.
-            throw new RuntimeException(String.format("Block %s %s", streamId,
-              ErrorHandler.BlockPushErrorHandler.TOO_LATE_OR_STALE_BLOCK_PUSH_MESSAGE_SUFFIX));
+          // Throw non-fatal failure here so the block data is drained from channel and server
+          // responds the error code to the client.
+          if (finalFailure != null) {
+            throw finalFailure;
           }
           // For duplicate block that is received before the shuffle merge finalizes, the
           // server should respond success to the client.
@@ -482,6 +504,11 @@ public class RemoteBlockPushResolver implements MergedShuffleFileManager {
         @Override
         public void onFailure(String streamId, Throwable cause) {
         }
+
+        @Override
+        public ByteBuffer getCompletionResponse() {
+          return SUCCESS_RESPONSE.duplicate();
+        }
       };
     }
   }
@@ -675,6 +702,11 @@ public class RemoteBlockPushResolver implements MergedShuffleFileManager {
       return streamId;
     }
 
+    @Override
+    public ByteBuffer getCompletionResponse() {
+      return SUCCESS_RESPONSE.duplicate();
+    }
+
     /**
      * Write a ByteBuffer to the merged shuffle file. Here we keep track of the length of the
      * block data written to file. In case of failure during writing block to file, we use the
@@ -752,20 +784,26 @@ public class RemoteBlockPushResolver implements MergedShuffleFileManager {
     }
 
     /**
+     * If appShuffleMergePartitionsInfo's shuffleMergeId is
+     * greater than the request shuffleMergeId then it is a stale block push.
+     */
+    private boolean isStale(
+        AppShuffleMergePartitionsInfo appShuffleMergePartitionsInfo,
+        int shuffleMergeId) {
+      return appShuffleMergePartitionsInfo.shuffleMergeId > shuffleMergeId;
+    }
+
+    /**
      * If appShuffleMergePartitionsInfo is null or shuffleMergePartitions is set to
      * INDETERMINATE_SHUFFLE_FINALIZED or if the reduceId is not in the map then the
-     * shuffle is already finalized. Therefore the block push is too late. If
-     * appShuffleMergePartitionsInfo's shuffleMergeId is
-     * greater than the request shuffleMergeId then it is a stale block push.
+     * shuffle is already finalized. Therefore the block push is too late.
      */
-    private boolean isStaleOrTooLate(
+    private boolean isTooLate(
         AppShuffleMergePartitionsInfo appShuffleMergePartitionsInfo,
-        int shuffleMergeId,
         int reduceId) {
       return null == appShuffleMergePartitionsInfo ||
         INDETERMINATE_SHUFFLE_FINALIZED == appShuffleMergePartitionsInfo.shuffleMergePartitions ||
-          appShuffleMergePartitionsInfo.shuffleMergeId > shuffleMergeId ||
-          !appShuffleMergePartitionsInfo.shuffleMergePartitions.containsKey(reduceId);
+        !appShuffleMergePartitionsInfo.shuffleMergePartitions.containsKey(reduceId);
     }
 
     @Override
@@ -785,8 +823,9 @@ public class RemoteBlockPushResolver implements MergedShuffleFileManager {
       // to disk as well. This way, we avoid having to buffer the entirety of every blocks in
       // memory, while still providing the necessary guarantee.
       synchronized (partitionInfo) {
-        if (isStaleOrTooLate(appShuffleInfo.shuffles.get(partitionInfo.shuffleId),
-            partitionInfo.shuffleMergeId, partitionInfo.reduceId)) {
+        AppShuffleMergePartitionsInfo info = appShuffleInfo.shuffles.get(partitionInfo.shuffleId);
+        if (isStale(info, partitionInfo.shuffleMergeId) ||
+            isTooLate(info, partitionInfo.reduceId)) {
           deferredBufs = null;
           return;
         }
@@ -857,12 +896,19 @@ public class RemoteBlockPushResolver implements MergedShuffleFileManager {
         // was not received yet or this was the latest stage attempt (or latest shuffleMergeId)
         // generating shuffle output for the shuffle ID. By the time we finish reading this
         // message, the block request is either stale or too late. We should thus respond
-        // RpcFailure to the client.
-        if (isStaleOrTooLate(appShuffleInfo.shuffles.get(partitionInfo.shuffleId),
-            partitionInfo.shuffleMergeId, partitionInfo.reduceId)) {
+        // the error code to the client.
+        AppShuffleMergePartitionsInfo info = appShuffleInfo.shuffles.get(partitionInfo.shuffleId);
+        if (isTooLate(info, partitionInfo.reduceId)) {
           deferredBufs = null;
-          throw new RuntimeException(String.format("Block %s is %s", streamId,
-            ErrorHandler.BlockPushErrorHandler.TOO_LATE_OR_STALE_BLOCK_PUSH_MESSAGE_SUFFIX));
+          throw new BlockPushNonFatalFailure(
+            new BlockPushReturnCode(ReturnCode.TOO_LATE_BLOCK_PUSH.id(), streamId).toByteBuffer(),
+            BlockPushNonFatalFailure.getErrorMsg(streamId, ReturnCode.TOO_LATE_BLOCK_PUSH));
+        }
+        if (isStale(info, partitionInfo.shuffleMergeId)) {
+          deferredBufs = null;
+          throw new BlockPushNonFatalFailure(
+            new BlockPushReturnCode(ReturnCode.STALE_BLOCK_PUSH.id(), streamId).toByteBuffer(),
+            BlockPushNonFatalFailure.getErrorMsg(streamId, ReturnCode.STALE_BLOCK_PUSH));
         }
 
         // Check if we can commit this block
@@ -911,9 +957,10 @@ public class RemoteBlockPushResolver implements MergedShuffleFileManager {
           }
         } else {
           deferredBufs = null;
-          throw new RuntimeException(String.format("%s %s to merged shuffle",
-            ErrorHandler.BlockPushErrorHandler.BLOCK_APPEND_COLLISION_DETECTED_MSG_PREFIX,
-            streamId));
+          throw new BlockPushNonFatalFailure(
+            new BlockPushReturnCode(ReturnCode.BLOCK_APPEND_COLLISION_DETECTED.id(), streamId)
+              .toByteBuffer(), BlockPushNonFatalFailure.getErrorMsg(
+                streamId, ReturnCode.BLOCK_APPEND_COLLISION_DETECTED));
         }
       }
       isWriting = false;
@@ -921,7 +968,7 @@ public class RemoteBlockPushResolver implements MergedShuffleFileManager {
 
     @Override
     public void onFailure(String streamId, Throwable throwable) throws IOException {
-      if (mergeManager.errorHandler.shouldLogError(throwable)) {
+      if (ERROR_HANDLER.shouldLogError(throwable)) {
         logger.error("Encountered issue when merging {}", streamId, throwable);
       } else {
         logger.debug("Encountered issue when merging {}", streamId, throwable);
@@ -932,13 +979,15 @@ public class RemoteBlockPushResolver implements MergedShuffleFileManager {
       // an opportunity to write the block data to disk, we should also ignore here.
       if (isWriting) {
         synchronized (partitionInfo) {
-          if (!isStaleOrTooLate(appShuffleInfo.shuffles.get(partitionInfo.shuffleId),
-              partitionInfo.shuffleMergeId, partitionInfo.reduceId)) {
-              logger.debug("{} encountered failure", partitionInfo);
-              partitionInfo.setCurrentMapIndex(-1);
-            }
+          AppShuffleMergePartitionsInfo info =
+            appShuffleInfo.shuffles.get(partitionInfo.shuffleId);
+          if (!isTooLate(info, partitionInfo.reduceId) &&
+              !isStale(info, partitionInfo.shuffleMergeId)) {
+            logger.debug("{} encountered failure", partitionInfo);
+            partitionInfo.setCurrentMapIndex(-1);
           }
         }
+      }
       isWriting = false;
     }
 
@@ -1356,10 +1405,4 @@ public class RemoteBlockPushResolver implements MergedShuffleFileManager {
       return pos;
     }
   }
-
-  public static class StaleBlockPushException extends RuntimeException {
-    public StaleBlockPushException(String message) {
-      super(message);
-    }
-  }
 }
diff --git a/common/network-shuffle/src/main/java/org/apache/spark/network/shuffle/protocol/BlockPushReturnCode.java b/common/network-shuffle/src/main/java/org/apache/spark/network/shuffle/protocol/BlockPushReturnCode.java
new file mode 100644
index 0000000..0455d67
--- /dev/null
+++ b/common/network-shuffle/src/main/java/org/apache/spark/network/shuffle/protocol/BlockPushReturnCode.java
@@ -0,0 +1,94 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.network.shuffle.protocol;
+
+import java.util.Objects;
+
+import com.google.common.base.Preconditions;
+import io.netty.buffer.ByteBuf;
+import org.apache.commons.lang3.builder.ToStringBuilder;
+import org.apache.commons.lang3.builder.ToStringStyle;
+
+import org.apache.spark.network.protocol.Encoders;
+import org.apache.spark.network.server.BlockPushNonFatalFailure;
+
+/**
+ * Error code indicating a non-fatal failure of a block push request.
+ * Due to the best-effort nature of push-based shuffle, these failures
+ * do not impact the completion of the block push process. The list of
+ * such errors is in
+ * {@link org.apache.spark.network.server.BlockPushNonFatalFailure.ReturnCode}.
+ *
+ * @since 3.2.0
+ */
+public class BlockPushReturnCode extends BlockTransferMessage {
+  public final byte returnCode;
+  // Block ID of the block that experiences a non-fatal block push failure.
+  // Will be an empty string for any successfully pushed block.
+  public final String failureBlockId;
+
+  public BlockPushReturnCode(byte returnCode, String failureBlockId) {
+    Preconditions.checkNotNull(BlockPushNonFatalFailure.getReturnCode(returnCode));
+    this.returnCode = returnCode;
+    this.failureBlockId = failureBlockId;
+  }
+
+  @Override
+  protected Type type() {
+    return Type.PUSH_BLOCK_RETURN_CODE;
+  }
+
+  @Override
+  public int hashCode() {
+    return Objects.hash(returnCode, failureBlockId);
+  }
+
+  @Override
+  public String toString() {
+    return new ToStringBuilder(this, ToStringStyle.SHORT_PREFIX_STYLE)
+      .append("returnCode", returnCode)
+      .append("failureBlockId", failureBlockId)
+      .toString();
+  }
+
+  @Override
+  public boolean equals(Object other) {
+    if (other != null && other instanceof BlockPushReturnCode) {
+      BlockPushReturnCode o = (BlockPushReturnCode) other;
+      return returnCode == o.returnCode && Objects.equals(failureBlockId, o.failureBlockId);
+    }
+    return false;
+  }
+
+  @Override
+  public int encodedLength() {
+    return 1 + Encoders.Strings.encodedLength(failureBlockId);
+  }
+
+  @Override
+  public void encode(ByteBuf buf) {
+    buf.writeByte(returnCode);
+    Encoders.Strings.encode(buf, failureBlockId);
+  }
+
+  public static BlockPushReturnCode decode(ByteBuf buf) {
+    byte type = buf.readByte();
+    String failureBlockId = Encoders.Strings.decode(buf);
+    return new BlockPushReturnCode(type, failureBlockId);
+  }
+}
diff --git a/common/network-shuffle/src/main/java/org/apache/spark/network/shuffle/protocol/BlockTransferMessage.java b/common/network-shuffle/src/main/java/org/apache/spark/network/shuffle/protocol/BlockTransferMessage.java
index 453791d..ad959c7e 100644
--- a/common/network-shuffle/src/main/java/org/apache/spark/network/shuffle/protocol/BlockTransferMessage.java
+++ b/common/network-shuffle/src/main/java/org/apache/spark/network/shuffle/protocol/BlockTransferMessage.java
@@ -49,7 +49,8 @@ public abstract class BlockTransferMessage implements Encodable {
     HEARTBEAT(5), UPLOAD_BLOCK_STREAM(6), REMOVE_BLOCKS(7), BLOCKS_REMOVED(8),
     FETCH_SHUFFLE_BLOCKS(9), GET_LOCAL_DIRS_FOR_EXECUTORS(10), LOCAL_DIRS_FOR_EXECUTORS(11),
     PUSH_BLOCK_STREAM(12), FINALIZE_SHUFFLE_MERGE(13), MERGE_STATUSES(14),
-    FETCH_SHUFFLE_BLOCK_CHUNKS(15), DIAGNOSE_CORRUPTION(16), CORRUPTION_CAUSE(17);
+    FETCH_SHUFFLE_BLOCK_CHUNKS(15), DIAGNOSE_CORRUPTION(16), CORRUPTION_CAUSE(17),
+    PUSH_BLOCK_RETURN_CODE(18);
 
     private final byte id;
 
@@ -86,6 +87,7 @@ public abstract class BlockTransferMessage implements Encodable {
         case 15: return FetchShuffleBlockChunks.decode(buf);
         case 16: return DiagnoseCorruption.decode(buf);
         case 17: return CorruptionCause.decode(buf);
+        case 18: return BlockPushReturnCode.decode(buf);
         default: throw new IllegalArgumentException("Unknown message type: " + type);
       }
     }
diff --git a/common/network-shuffle/src/test/java/org/apache/spark/network/shuffle/ErrorHandlerSuite.java b/common/network-shuffle/src/test/java/org/apache/spark/network/shuffle/ErrorHandlerSuite.java
index c8066d1..56c9a97 100644
--- a/common/network-shuffle/src/test/java/org/apache/spark/network/shuffle/ErrorHandlerSuite.java
+++ b/common/network-shuffle/src/test/java/org/apache/spark/network/shuffle/ErrorHandlerSuite.java
@@ -21,6 +21,9 @@ import java.net.ConnectException;
 
 import org.junit.Test;
 
+import org.apache.spark.network.server.BlockPushNonFatalFailure;
+import org.apache.spark.network.server.BlockPushNonFatalFailure.ReturnCode;
+
 import static org.junit.Assert.*;
 
 /**
@@ -31,11 +34,13 @@ public class ErrorHandlerSuite {
   @Test
   public void testErrorRetry() {
     ErrorHandler.BlockPushErrorHandler pushHandler = new ErrorHandler.BlockPushErrorHandler();
-    assertFalse(pushHandler.shouldRetryError(new RuntimeException(new IllegalArgumentException(
-      ErrorHandler.BlockPushErrorHandler.TOO_LATE_OR_STALE_BLOCK_PUSH_MESSAGE_SUFFIX))));
+    assertFalse(pushHandler.shouldRetryError(new BlockPushNonFatalFailure(
+      ReturnCode.TOO_LATE_BLOCK_PUSH, "")));
+    assertFalse(pushHandler.shouldRetryError(new BlockPushNonFatalFailure(
+      ReturnCode.STALE_BLOCK_PUSH, "")));
     assertFalse(pushHandler.shouldRetryError(new RuntimeException(new ConnectException())));
-    assertTrue(pushHandler.shouldRetryError(new RuntimeException(new IllegalArgumentException(
-      ErrorHandler.BlockPushErrorHandler.BLOCK_APPEND_COLLISION_DETECTED_MSG_PREFIX))));
+    assertTrue(pushHandler.shouldRetryError(new BlockPushNonFatalFailure(
+      ReturnCode.BLOCK_APPEND_COLLISION_DETECTED, "")));
     assertTrue(pushHandler.shouldRetryError(new Throwable()));
 
     ErrorHandler.BlockFetchErrorHandler fetchHandler = new ErrorHandler.BlockFetchErrorHandler();
@@ -46,10 +51,12 @@ public class ErrorHandlerSuite {
   @Test
   public void testErrorLogging() {
     ErrorHandler.BlockPushErrorHandler pushHandler = new ErrorHandler.BlockPushErrorHandler();
-    assertFalse(pushHandler.shouldLogError(new RuntimeException(new IllegalArgumentException(
-      ErrorHandler.BlockPushErrorHandler.TOO_LATE_OR_STALE_BLOCK_PUSH_MESSAGE_SUFFIX))));
-    assertFalse(pushHandler.shouldLogError(new RuntimeException(new IllegalArgumentException(
-      ErrorHandler.BlockPushErrorHandler.BLOCK_APPEND_COLLISION_DETECTED_MSG_PREFIX))));
+    assertFalse(pushHandler.shouldLogError(new BlockPushNonFatalFailure(
+      ReturnCode.TOO_LATE_BLOCK_PUSH, "")));
+    assertFalse(pushHandler.shouldLogError(new BlockPushNonFatalFailure(
+      ReturnCode.STALE_BLOCK_PUSH, "")));
+    assertFalse(pushHandler.shouldLogError(new BlockPushNonFatalFailure(
+      ReturnCode.BLOCK_APPEND_COLLISION_DETECTED, "")));
     assertTrue(pushHandler.shouldLogError(new Throwable()));
 
     ErrorHandler.BlockFetchErrorHandler fetchHandler = new ErrorHandler.BlockFetchErrorHandler();
diff --git a/common/network-shuffle/src/test/java/org/apache/spark/network/shuffle/OneForOneBlockPusherSuite.java b/common/network-shuffle/src/test/java/org/apache/spark/network/shuffle/OneForOneBlockPusherSuite.java
index d2fd5d9..2aadb77 100644
--- a/common/network-shuffle/src/test/java/org/apache/spark/network/shuffle/OneForOneBlockPusherSuite.java
+++ b/common/network-shuffle/src/test/java/org/apache/spark/network/shuffle/OneForOneBlockPusherSuite.java
@@ -36,7 +36,10 @@ import org.apache.spark.network.buffer.NettyManagedBuffer;
 import org.apache.spark.network.buffer.NioManagedBuffer;
 import org.apache.spark.network.client.RpcResponseCallback;
 import org.apache.spark.network.client.TransportClient;
+import org.apache.spark.network.server.BlockPushNonFatalFailure;
+import org.apache.spark.network.server.BlockPushNonFatalFailure.ReturnCode;
 import org.apache.spark.network.shuffle.protocol.BlockTransferMessage;
+import org.apache.spark.network.shuffle.protocol.BlockPushReturnCode;
 import org.apache.spark.network.shuffle.protocol.PushBlockStream;
 
 
@@ -140,15 +143,16 @@ public class OneForOneBlockPusherSuite {
       BlockTransferMessage message = BlockTransferMessage.Decoder.fromByteBuffer(header);
       RpcResponseCallback callback = (RpcResponseCallback) invocation.getArguments()[2];
       Map.Entry<String, ManagedBuffer> entry = blockIterator.next();
+      String blockId = entry.getKey();
       ManagedBuffer block = entry.getValue();
       if (block != null && block.nioByteBuffer().capacity() > 0) {
-        callback.onSuccess(header);
+        callback.onSuccess(new BlockPushReturnCode(ReturnCode.SUCCESS.id(), "").toByteBuffer());
       } else if (block != null) {
-        callback.onFailure(new RuntimeException("Failed " + entry.getKey()
-          + ErrorHandler.BlockPushErrorHandler.BLOCK_APPEND_COLLISION_DETECTED_MSG_PREFIX));
+        callback.onSuccess(new BlockPushReturnCode(
+          ReturnCode.BLOCK_APPEND_COLLISION_DETECTED.id(), blockId).toByteBuffer());
       } else {
-        callback.onFailure(new RuntimeException("Quick fail " + entry.getKey()
-          + ErrorHandler.BlockPushErrorHandler.TOO_LATE_OR_STALE_BLOCK_PUSH_MESSAGE_SUFFIX));
+        callback.onFailure(new BlockPushNonFatalFailure(
+          ReturnCode.TOO_LATE_BLOCK_PUSH, ""));
       }
       assertEquals(msgIterator.next(), message);
       return null;
diff --git a/common/network-shuffle/src/test/java/org/apache/spark/network/shuffle/RemoteBlockPushResolverSuite.java b/common/network-shuffle/src/test/java/org/apache/spark/network/shuffle/RemoteBlockPushResolverSuite.java
index 6bf39c8..46d6366 100644
--- a/common/network-shuffle/src/test/java/org/apache/spark/network/shuffle/RemoteBlockPushResolverSuite.java
+++ b/common/network-shuffle/src/test/java/org/apache/spark/network/shuffle/RemoteBlockPushResolverSuite.java
@@ -48,7 +48,10 @@ import static org.junit.Assert.*;
 
 import org.apache.spark.network.buffer.FileSegmentManagedBuffer;
 import org.apache.spark.network.client.StreamCallbackWithID;
+import org.apache.spark.network.server.BlockPushNonFatalFailure;
 import org.apache.spark.network.shuffle.RemoteBlockPushResolver.MergeShuffleFile;
+import org.apache.spark.network.shuffle.protocol.BlockPushReturnCode;
+import org.apache.spark.network.shuffle.protocol.BlockTransferMessage;
 import org.apache.spark.network.shuffle.protocol.ExecutorShuffleInfo;
 import org.apache.spark.network.shuffle.protocol.FinalizeShuffleMerge;
 import org.apache.spark.network.shuffle.protocol.MergeStatuses;
@@ -103,6 +106,18 @@ public class RemoteBlockPushResolverSuite {
     }
   }
 
+  @Test
+  public void testErrorLogging() {
+    ErrorHandler.BlockPushErrorHandler errorHandler = RemoteBlockPushResolver.createErrorHandler();
+    assertFalse(errorHandler.shouldLogError(new BlockPushNonFatalFailure(
+      BlockPushNonFatalFailure.ReturnCode.TOO_LATE_BLOCK_PUSH, "")));
+    assertFalse(errorHandler.shouldLogError(new BlockPushNonFatalFailure(
+      BlockPushNonFatalFailure.ReturnCode.STALE_BLOCK_PUSH, "")));
+    assertFalse(errorHandler.shouldLogError(new BlockPushNonFatalFailure(
+      BlockPushNonFatalFailure.ReturnCode.BLOCK_APPEND_COLLISION_DETECTED, "")));
+    assertTrue(errorHandler.shouldLogError(new Throwable()));
+  }
+
   @Test(expected = RuntimeException.class)
   public void testNoIndexFile() {
     try {
@@ -286,7 +301,7 @@ public class RemoteBlockPushResolverSuite {
     validateChunks(TEST_APP, 0, 0, 0, blockMeta, new int[]{9}, new int[][]{{0}});
   }
 
-  @Test(expected = RuntimeException.class)
+  @Test(expected = BlockPushNonFatalFailure.class)
   public void testBlockReceivedAfterMergeFinalize() throws IOException {
     ByteBuffer[] blocks = new ByteBuffer[]{
       ByteBuffer.wrap(new byte[4]),
@@ -304,13 +319,15 @@ public class RemoteBlockPushResolverSuite {
     stream1.onData(stream1.getID(), ByteBuffer.wrap(new byte[4]));
     try {
       stream1.onComplete(stream1.getID());
-    } catch (RuntimeException re) {
-      assertEquals("Block shufflePush_0_0_1_0 received after merged shuffle is finalized or stale"
-        + " block push as shuffle blocks of a higher shuffleMergeId for the shuffle is being"
-          + " pushed", re.getMessage());
+    } catch (BlockPushNonFatalFailure e) {
+      BlockPushReturnCode errorCode =
+        (BlockPushReturnCode) BlockTransferMessage.Decoder.fromByteBuffer(e.getResponse());
+      assertEquals(BlockPushNonFatalFailure.ReturnCode.TOO_LATE_BLOCK_PUSH.id(),
+        errorCode.returnCode);
+      assertEquals(errorCode.failureBlockId, stream1.getID());
       MergedBlockMeta blockMeta = pushResolver.getMergedBlockMeta(TEST_APP, 0, 0, 0);
       validateChunks(TEST_APP, 0, 0, 0, blockMeta, new int[]{9}, new int[][]{{0}});
-      throw re;
+      throw e;
     }
   }
 
@@ -348,7 +365,7 @@ public class RemoteBlockPushResolverSuite {
     assertArrayEquals(expectedBytes, mb.nioByteBuffer().array());
   }
 
-  @Test(expected = RuntimeException.class)
+  @Test(expected = BlockPushNonFatalFailure.class)
   public void testCollision() throws IOException {
     StreamCallbackWithID stream1 =
       pushResolver.receiveBlockDataAsStream(
@@ -362,15 +379,17 @@ public class RemoteBlockPushResolverSuite {
     // Since stream2 didn't get any opportunity it will throw couldn't find opportunity error
     try {
       stream2.onComplete(stream2.getID());
-    } catch (RuntimeException re) {
-      assertEquals(
-        "Couldn't find an opportunity to write block shufflePush_0_0_1_0 to merged shuffle",
-        re.getMessage());
-      throw re;
+    } catch (BlockPushNonFatalFailure e) {
+      BlockPushReturnCode errorCode =
+        (BlockPushReturnCode) BlockTransferMessage.Decoder.fromByteBuffer(e.getResponse());
+      assertEquals(BlockPushNonFatalFailure.ReturnCode.BLOCK_APPEND_COLLISION_DETECTED.id(),
+        errorCode.returnCode);
+      assertEquals(errorCode.failureBlockId, stream2.getID());
+      throw e;
     }
   }
 
-  @Test(expected = RuntimeException.class)
+  @Test(expected = BlockPushNonFatalFailure.class)
   public void testFailureInAStreamDoesNotInterfereWithStreamWhichIsWriting() throws IOException {
     StreamCallbackWithID stream1 =
       pushResolver.receiveBlockDataAsStream(
@@ -387,14 +406,16 @@ public class RemoteBlockPushResolverSuite {
     // This should be deferred
     stream3.onData(stream3.getID(), ByteBuffer.wrap(new byte[5]));
     // Since this stream didn't get any opportunity it will throw couldn't find opportunity error
-    RuntimeException failedEx = null;
+    BlockPushNonFatalFailure failedEx = null;
     try {
       stream3.onComplete(stream3.getID());
-    } catch (RuntimeException re) {
-      assertEquals(
-        "Couldn't find an opportunity to write block shufflePush_0_0_2_0 to merged shuffle",
-        re.getMessage());
-      failedEx = re;
+    } catch (BlockPushNonFatalFailure e) {
+      BlockPushReturnCode errorCode =
+        (BlockPushReturnCode) BlockTransferMessage.Decoder.fromByteBuffer(e.getResponse());
+      assertEquals(BlockPushNonFatalFailure.ReturnCode.BLOCK_APPEND_COLLISION_DETECTED.id(),
+        errorCode.returnCode);
+      assertEquals(errorCode.failureBlockId, stream3.getID());
+      failedEx = e;
     }
     // stream 1 now completes
     stream1.onData(stream1.getID(), ByteBuffer.wrap(new byte[2]));
@@ -871,7 +892,7 @@ public class RemoteBlockPushResolverSuite {
     removeApplication(TEST_APP);
   }
 
-  @Test(expected = RuntimeException.class)
+  @Test(expected = BlockPushNonFatalFailure.class)
   public void testFailureAfterDuplicateBlockDoesNotInterfereActiveStream() throws IOException {
     StreamCallbackWithID stream1 =
       pushResolver.receiveBlockDataAsStream(
@@ -895,14 +916,16 @@ public class RemoteBlockPushResolverSuite {
         new PushBlockStream(TEST_APP, NO_ATTEMPT_ID, 0, 0, 2, 0, 0));
     // This should be deferred as stream 2 is still the active stream
     stream3.onData(stream3.getID(), ByteBuffer.wrap(new byte[2]));
-    RuntimeException failedEx = null;
+    BlockPushNonFatalFailure failedEx = null;
     try {
       stream3.onComplete(stream3.getID());
-    } catch (RuntimeException re) {
-      assertEquals(
-        "Couldn't find an opportunity to write block shufflePush_0_0_2_0 to merged shuffle",
-        re.getMessage());
-      failedEx = re;
+    } catch (BlockPushNonFatalFailure e) {
+      BlockPushReturnCode errorCode =
+        (BlockPushReturnCode) BlockTransferMessage.Decoder.fromByteBuffer(e.getResponse());
+      assertEquals(BlockPushNonFatalFailure.ReturnCode.BLOCK_APPEND_COLLISION_DETECTED.id(),
+        errorCode.returnCode);
+      assertEquals(errorCode.failureBlockId, stream3.getID());
+      failedEx = e;
     }
     // Stream 2 writes more and completes
     stream2.onData(stream2.getID(), ByteBuffer.wrap(new byte[4]));
@@ -1072,10 +1095,12 @@ public class RemoteBlockPushResolverSuite {
     try {
       // stream 1 push should be rejected as it is from an older shuffleMergeId
       stream1.onComplete(stream1.getID());
-    } catch(RuntimeException re) {
-      assertEquals("Block shufflePush_0_1_0_0 is received after merged shuffle is finalized or"
-        + " stale block push as shuffle blocks of a higher shuffleMergeId for the shuffle is being"
-          + " pushed", re.getMessage());
+    } catch (BlockPushNonFatalFailure e) {
+      BlockPushReturnCode errorCode =
+        (BlockPushReturnCode) BlockTransferMessage.Decoder.fromByteBuffer(e.getResponse());
+      assertEquals(BlockPushNonFatalFailure.ReturnCode.STALE_BLOCK_PUSH.id(),
+        errorCode.returnCode);
+      assertEquals(errorCode.failureBlockId, stream1.getID());
     }
     // stream 2 now completes
     stream2.onComplete(stream2.getID());
@@ -1099,10 +1124,12 @@ public class RemoteBlockPushResolverSuite {
     try {
       // stream 1 push should be rejected as it is from an older shuffleMergeId
       stream1.onComplete(stream1.getID());
-    } catch(RuntimeException re) {
-      assertEquals("Block shufflePush_0_1_0_0 is received after merged shuffle is finalized or"
-        + " stale block push as shuffle blocks of a higher shuffleMergeId for the shuffle is being"
-          + " pushed", re.getMessage());
+    } catch (BlockPushNonFatalFailure e) {
+      BlockPushReturnCode errorCode =
+        (BlockPushReturnCode) BlockTransferMessage.Decoder.fromByteBuffer(e.getResponse());
+      assertEquals(BlockPushNonFatalFailure.ReturnCode.STALE_BLOCK_PUSH.id(),
+        errorCode.returnCode);
+      assertEquals(errorCode.failureBlockId, stream1.getID());
     }
     // stream 2 now completes
     stream2.onComplete(stream2.getID());
@@ -1153,10 +1180,12 @@ public class RemoteBlockPushResolverSuite {
     try {
       // stream 1 push should be rejected as it is from an older shuffleMergeId
       stream1.onComplete(stream1.getID());
-    } catch(RuntimeException re) {
-      assertEquals("Block shufflePush_0_1_0_0 is received after merged shuffle is finalized or"
-        + " stale block push as shuffle blocks of a higher shuffleMergeId for the shuffle is being"
-          + " pushed", re.getMessage());
+    } catch (BlockPushNonFatalFailure e) {
+      BlockPushReturnCode errorCode =
+        (BlockPushReturnCode) BlockTransferMessage.Decoder.fromByteBuffer(e.getResponse());
+      assertEquals(BlockPushNonFatalFailure.ReturnCode.STALE_BLOCK_PUSH.id(),
+        errorCode.returnCode);
+      assertEquals(errorCode.failureBlockId, stream1.getID());
     }
     // stream 2 now completes
     stream2.onComplete(stream2.getID());
diff --git a/core/src/main/scala/org/apache/spark/shuffle/ShuffleBlockPusher.scala b/core/src/main/scala/org/apache/spark/shuffle/ShuffleBlockPusher.scala
index ecaa4f0..e6af767 100644
--- a/core/src/main/scala/org/apache/spark/shuffle/ShuffleBlockPusher.scala
+++ b/core/src/main/scala/org/apache/spark/shuffle/ShuffleBlockPusher.scala
@@ -24,8 +24,6 @@ import java.util.concurrent.ExecutorService
 
 import scala.collection.mutable.{ArrayBuffer, HashMap, HashSet, Queue}
 
-import com.google.common.base.Throwables
-
 import org.apache.spark.{ShuffleDependency, SparkConf, SparkEnv}
 import org.apache.spark.annotation.Since
 import org.apache.spark.internal.Logging
@@ -33,6 +31,8 @@ import org.apache.spark.internal.config._
 import org.apache.spark.launcher.SparkLauncher
 import org.apache.spark.network.buffer.{FileSegmentManagedBuffer, ManagedBuffer, NioManagedBuffer}
 import org.apache.spark.network.netty.SparkTransportConf
+import org.apache.spark.network.server.BlockPushNonFatalFailure
+import org.apache.spark.network.server.BlockPushNonFatalFailure.ReturnCode
 import org.apache.spark.network.shuffle.BlockPushingListener
 import org.apache.spark.network.shuffle.ErrorHandler.BlockPushErrorHandler
 import org.apache.spark.network.util.TransportConf
@@ -78,10 +78,12 @@ private[spark] class ShuffleBlockPusher(conf: SparkConf) extends Logging {
         if (t.getCause != null && t.getCause.isInstanceOf[FileNotFoundException]) {
           return false
         }
-        val errorStackTraceString = Throwables.getStackTraceAsString(t)
         // If the block is too late or the invalid block push, there is no need to retry it
-        !errorStackTraceString.contains(
-          BlockPushErrorHandler.TOO_LATE_OR_STALE_BLOCK_PUSH_MESSAGE_SUFFIX)
+        !(t.isInstanceOf[BlockPushNonFatalFailure] &&
+          (t.asInstanceOf[BlockPushNonFatalFailure].getReturnCode
+            == ReturnCode.TOO_LATE_BLOCK_PUSH ||
+            t.asInstanceOf[BlockPushNonFatalFailure].getReturnCode
+            == ReturnCode.STALE_BLOCK_PUSH))
       }
     }
   }
diff --git a/core/src/test/scala/org/apache/spark/shuffle/ShuffleBlockPusherSuite.scala b/core/src/test/scala/org/apache/spark/shuffle/ShuffleBlockPusherSuite.scala
index 26cdad8..6f9b5e4 100644
--- a/core/src/test/scala/org/apache/spark/shuffle/ShuffleBlockPusherSuite.scala
+++ b/core/src/test/scala/org/apache/spark/shuffle/ShuffleBlockPusherSuite.scala
@@ -33,8 +33,9 @@ import org.scalatest.BeforeAndAfterEach
 
 import org.apache.spark._
 import org.apache.spark.network.buffer.ManagedBuffer
+import org.apache.spark.network.server.BlockPushNonFatalFailure
+import org.apache.spark.network.server.BlockPushNonFatalFailure.ReturnCode
 import org.apache.spark.network.shuffle.{BlockPushingListener, BlockStoreClient}
-import org.apache.spark.network.shuffle.ErrorHandler.BlockPushErrorHandler
 import org.apache.spark.network.util.TransportConf
 import org.apache.spark.serializer.JavaSerializer
 import org.apache.spark.shuffle.ShuffleBlockPusher.PushRequest
@@ -219,13 +220,15 @@ class ShuffleBlockPusherSuite extends SparkFunSuite with BeforeAndAfterEach {
     val pusher = new ShuffleBlockPusher(conf)
     val errorHandler = pusher.createErrorHandler()
     assert(
-      !errorHandler.shouldRetryError(new RuntimeException(
-        new IllegalArgumentException(
-          BlockPushErrorHandler.TOO_LATE_OR_STALE_BLOCK_PUSH_MESSAGE_SUFFIX))))
+      !errorHandler.shouldRetryError(new BlockPushNonFatalFailure(
+        ReturnCode.TOO_LATE_BLOCK_PUSH, "")))
+    assert(
+      !errorHandler.shouldRetryError(new BlockPushNonFatalFailure(
+        ReturnCode.STALE_BLOCK_PUSH, "")))
     assert(errorHandler.shouldRetryError(new RuntimeException(new ConnectException())))
     assert(
-      errorHandler.shouldRetryError(new RuntimeException(new IllegalArgumentException(
-        BlockPushErrorHandler.BLOCK_APPEND_COLLISION_DETECTED_MSG_PREFIX))))
+      errorHandler.shouldRetryError(new BlockPushNonFatalFailure(
+        ReturnCode.BLOCK_APPEND_COLLISION_DETECTED, "")))
     assert (errorHandler.shouldRetryError(new Throwable()))
   }
 
@@ -233,12 +236,13 @@ class ShuffleBlockPusherSuite extends SparkFunSuite with BeforeAndAfterEach {
     val pusher = new ShuffleBlockPusher(conf)
     val errorHandler = pusher.createErrorHandler()
     assert(
-      !errorHandler.shouldLogError(new RuntimeException(
-        new IllegalArgumentException(
-          BlockPushErrorHandler.TOO_LATE_OR_STALE_BLOCK_PUSH_MESSAGE_SUFFIX))))
-    assert(!errorHandler.shouldLogError(new RuntimeException(
-      new IllegalArgumentException(
-        BlockPushErrorHandler.BLOCK_APPEND_COLLISION_DETECTED_MSG_PREFIX))))
+      !errorHandler.shouldLogError(new BlockPushNonFatalFailure(
+        ReturnCode.TOO_LATE_BLOCK_PUSH, "")))
+    assert(
+      !errorHandler.shouldLogError(new BlockPushNonFatalFailure(
+        ReturnCode.STALE_BLOCK_PUSH, "")))
+    assert(!errorHandler.shouldLogError(new BlockPushNonFatalFailure(
+      ReturnCode.BLOCK_APPEND_COLLISION_DETECTED, "")))
     assert(errorHandler.shouldLogError(new Throwable()))
   }
 
@@ -255,9 +259,8 @@ class ShuffleBlockPusherSuite extends SparkFunSuite with BeforeAndAfterEach {
           if (failBlock) {
             failBlock = false
             // Fail the first block with the collision exception.
-            blockPushListener.onBlockPushFailure(blockId, new RuntimeException(
-              new IllegalArgumentException(
-                BlockPushErrorHandler.BLOCK_APPEND_COLLISION_DETECTED_MSG_PREFIX)))
+            blockPushListener.onBlockPushFailure(blockId, new BlockPushNonFatalFailure(
+              ReturnCode.BLOCK_APPEND_COLLISION_DETECTED, ""))
           } else {
             pushedBlocks += blockId
             blockPushListener.onBlockPushSuccess(blockId, mock(classOf[ManagedBuffer]))
@@ -285,9 +288,8 @@ class ShuffleBlockPusherSuite extends SparkFunSuite with BeforeAndAfterEach {
           if (failBlock) {
             failBlock = false
             // Fail the first block with the too late exception.
-            blockPushListener.onBlockPushFailure(blockId, new RuntimeException(
-              new IllegalArgumentException(
-                BlockPushErrorHandler.TOO_LATE_OR_STALE_BLOCK_PUSH_MESSAGE_SUFFIX)))
+            blockPushListener.onBlockPushFailure(blockId, new BlockPushNonFatalFailure(
+              ReturnCode.TOO_LATE_BLOCK_PUSH, ""))
           } else {
             pushedBlocks += blockId
             blockPushListener.onBlockPushSuccess(blockId, mock(classOf[ManagedBuffer]))

---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@spark.apache.org
For additional commands, e-mail: commits-help@spark.apache.org