You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@spark.apache.org by mr...@apache.org on 2021/08/16 18:59:25 UTC

[spark] branch master updated: [SPARK-35548][CORE][SHUFFLE] Handling new attempt has started error message in BlockPushErrorHandler in client

This is an automated email from the ASF dual-hosted git repository.

mridulm80 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/spark.git


The following commit(s) were added to refs/heads/master by this push:
     new 05cd5f9  [SPARK-35548][CORE][SHUFFLE] Handling new attempt has started error message in BlockPushErrorHandler in client
05cd5f9 is described below

commit 05cd5f97c3dea25dacdbdb319243cdab9667c774
Author: zhuqi-lucas <82...@qq.com>
AuthorDate: Mon Aug 16 13:58:48 2021 -0500

    [SPARK-35548][CORE][SHUFFLE] Handling new attempt has started error message in BlockPushErrorHandler in client
    
    ### What changes were proposed in this pull request?
    Add a new type of error message in BlockPushErrorHandler which indicates the PushblockStream message is received after a new application attempt has started. This error message should be correctly handled in client without retrying the block push.
    
    ### Why are the changes needed?
    When we get a block push failure because of the too old attempt, we will not retry pushing the block nor log the exception on the client side.
    
    ### Does this PR introduce _any_ user-facing change?
    No
    
    ### How was this patch tested?
    Add the corresponding unit test.
    
    Closes #33617 from zhuqi-lucas/master.
    
    Authored-by: zhuqi-lucas <82...@qq.com>
    Signed-off-by: Mridul Muralidharan <mridul<at>gmail.com>
---
 .../network/server/BlockPushNonFatalFailure.java   | 22 +++++++++++++++-
 .../network/server/TransportRequestHandler.java    | 13 ++++++++--
 .../apache/spark/network/shuffle/ErrorHandler.java |  7 ++---
 .../network/shuffle/RemoteBlockPushResolver.java   | 30 ++++++++++++----------
 .../spark/network/shuffle/ErrorHandlerSuite.java   |  4 +++
 .../shuffle/RemoteBlockPushResolverSuite.java      | 17 +++++++-----
 .../apache/spark/shuffle/ShuffleBlockPusher.scala  | 10 +++-----
 .../spark/shuffle/ShuffleBlockPusherSuite.scala    |  6 +++++
 8 files changed, 77 insertions(+), 32 deletions(-)

diff --git a/common/network-common/src/main/java/org/apache/spark/network/server/BlockPushNonFatalFailure.java b/common/network-common/src/main/java/org/apache/spark/network/server/BlockPushNonFatalFailure.java
index 5906fa2..4bb22b2 100644
--- a/common/network-common/src/main/java/org/apache/spark/network/server/BlockPushNonFatalFailure.java
+++ b/common/network-common/src/main/java/org/apache/spark/network/server/BlockPushNonFatalFailure.java
@@ -40,6 +40,14 @@ public class BlockPushNonFatalFailure extends RuntimeException {
     " is received after merged shuffle is finalized";
 
   /**
+   * String constant used for generating exception messages indicating the application attempt is
+   * not the latest attempt on the server side. When we get a block push failure because of the too
+   * old attempt, we will not retry pushing the block nor log the exception on the client side.
+   */
+  public static final String TOO_OLD_ATTEMPT_SUFFIX =
+    " is from an older app attempt";
+
+  /**
    * String constant used for generating exception messages indicating a block to be merged
    * is a stale block push in the case of indeterminate stage retries on the server side.
    * When we get a block push failure because of the block push being stale, we will not
@@ -124,7 +132,12 @@ public class BlockPushNonFatalFailure extends RuntimeException {
      * indeterminate stage retries. When the client receives this code, it will not retry
      * pushing the block.
      */
-    STALE_BLOCK_PUSH(3, STALE_BLOCK_PUSH_MESSAGE_SUFFIX);
+    STALE_BLOCK_PUSH(3, STALE_BLOCK_PUSH_MESSAGE_SUFFIX),
+    /**
+     * Indicate the application attempt is not the latest attempt on the server side.
+     * When the client gets this code, it will not retry pushing the block.
+     */
+    TOO_OLD_ATTEMPT_PUSH(4, TOO_OLD_ATTEMPT_SUFFIX);
 
     private final byte id;
     // Error message suffix used to generate an error message for a given ReturnCode and
@@ -146,10 +159,17 @@ public class BlockPushNonFatalFailure extends RuntimeException {
       case 1: return ReturnCode.TOO_LATE_BLOCK_PUSH;
       case 2: return ReturnCode.BLOCK_APPEND_COLLISION_DETECTED;
       case 3: return ReturnCode.STALE_BLOCK_PUSH;
+      case 4: return ReturnCode.TOO_OLD_ATTEMPT_PUSH;
       default: throw new IllegalArgumentException("Unknown block push return code: " + id);
     }
   }
 
+  public static boolean shouldNotRetryErrorCode(ReturnCode returnCode) {
+    return returnCode == ReturnCode.TOO_LATE_BLOCK_PUSH ||
+      returnCode == ReturnCode.STALE_BLOCK_PUSH ||
+      returnCode == ReturnCode.TOO_OLD_ATTEMPT_PUSH;
+  }
+
   public static String getErrorMsg(String blockId, ReturnCode errorCode) {
     Preconditions.checkArgument(errorCode != ReturnCode.SUCCESS);
     return "Block " + blockId + errorCode.errorMsgSuffix;
diff --git a/common/network-common/src/main/java/org/apache/spark/network/server/TransportRequestHandler.java b/common/network-common/src/main/java/org/apache/spark/network/server/TransportRequestHandler.java
index 5c07f20..bc99248 100644
--- a/common/network-common/src/main/java/org/apache/spark/network/server/TransportRequestHandler.java
+++ b/common/network-common/src/main/java/org/apache/spark/network/server/TransportRequestHandler.java
@@ -249,8 +249,17 @@ public class TransportRequestHandler extends MessageHandler<RequestMessage> {
         wrappedCallback.onComplete(wrappedCallback.getID());
       }
     } catch (Exception e) {
-      logger.error("Error while invoking RpcHandler#receive() on RPC id " + req.requestId, e);
-      respond(new RpcFailure(req.requestId, Throwables.getStackTraceAsString(e)));
+      if (e instanceof BlockPushNonFatalFailure) {
+        // Thrown by rpcHandler.receiveStream(reverseClient, meta, callback), the same as
+        // onComplete method. Respond an RPC message with the error code to client instead of
+        // using exceptions encoded in the RPCFailure. Using a proper RPCResponse is more
+        // efficient, and now only include the too old attempt case here.
+        respond(new RpcResponse(req.requestId,
+          new NioManagedBuffer(((BlockPushNonFatalFailure) e).getResponse())));
+      } else {
+        logger.error("Error while invoking RpcHandler#receive() on RPC id " + req.requestId, e);
+        respond(new RpcFailure(req.requestId, Throwables.getStackTraceAsString(e)));
+      }
       // We choose to totally fail the channel, rather than trying to recover as we do in other
       // cases.  We don't know how many bytes of the stream the client has already sent for the
       // stream, it's not worth trying to recover.
diff --git a/common/network-shuffle/src/main/java/org/apache/spark/network/shuffle/ErrorHandler.java b/common/network-shuffle/src/main/java/org/apache/spark/network/shuffle/ErrorHandler.java
index 271d762..9136ff6 100644
--- a/common/network-shuffle/src/main/java/org/apache/spark/network/shuffle/ErrorHandler.java
+++ b/common/network-shuffle/src/main/java/org/apache/spark/network/shuffle/ErrorHandler.java
@@ -87,10 +87,11 @@ public interface ErrorHandler {
         return false;
       }
 
-      // If the block is too late or stale block push, there is no need to retry it
+      // If the block is too late or the invalid block push or the attempt is not the latest one,
+      // there is no need to retry it
       return !(t instanceof BlockPushNonFatalFailure &&
-        (((BlockPushNonFatalFailure) t).getReturnCode() == TOO_LATE_BLOCK_PUSH ||
-          ((BlockPushNonFatalFailure) t).getReturnCode() == STALE_BLOCK_PUSH));
+        BlockPushNonFatalFailure
+          .shouldNotRetryErrorCode(((BlockPushNonFatalFailure) t).getReturnCode()));
     }
 
     @Override
diff --git a/common/network-shuffle/src/main/java/org/apache/spark/network/shuffle/RemoteBlockPushResolver.java b/common/network-shuffle/src/main/java/org/apache/spark/network/shuffle/RemoteBlockPushResolver.java
index 30777ca..e7fe8e2 100644
--- a/common/network-shuffle/src/main/java/org/apache/spark/network/shuffle/RemoteBlockPushResolver.java
+++ b/common/network-shuffle/src/main/java/org/apache/spark/network/shuffle/RemoteBlockPushResolver.java
@@ -395,19 +395,17 @@ public class RemoteBlockPushResolver implements MergedShuffleFileManager {
   @Override
   public StreamCallbackWithID receiveBlockDataAsStream(PushBlockStream msg) {
     AppShuffleInfo appShuffleInfo = validateAndGetAppShuffleInfo(msg.appId);
-    if (appShuffleInfo.attemptId != msg.appAttemptId) {
-      // If this Block belongs to a former application attempt, it is considered late,
-      // as only the blocks from the current application attempt will be merged
-      // TODO: [SPARK-35548] Client should be updated to handle this error.
-      throw new IllegalArgumentException(
-        String.format("The attempt id %s in this PushBlockStream message does not match "
-          + "with the current attempt id %s stored in shuffle service for application %s",
-          msg.appAttemptId, appShuffleInfo.attemptId, msg.appId));
-    }
     // Use string concatenation here to avoid the overhead with String.format on every
     // pushed block.
     final String streamId = OneForOneBlockPusher.SHUFFLE_PUSH_BLOCK_PREFIX + "_"
       + msg.shuffleId + "_" + msg.shuffleMergeId + "_" + msg.mapIndex + "_" + msg.reduceId;
+    if (appShuffleInfo.attemptId != msg.appAttemptId) {
+      // If this Block belongs to a former application attempt, it is considered late,
+      // as only the blocks from the current application attempt will be merged
+      throw new BlockPushNonFatalFailure(new BlockPushReturnCode(ReturnCode
+        .TOO_OLD_ATTEMPT_PUSH.id(), streamId).toByteBuffer(),
+        BlockPushNonFatalFailure.getErrorMsg(streamId, ReturnCode.TOO_OLD_ATTEMPT_PUSH));
+    }
     // Retrieve merged shuffle file metadata
     AppShufflePartitionInfo partitionInfoBeforeCheck;
     BlockPushNonFatalFailure failure = null;
@@ -513,12 +511,18 @@ public class RemoteBlockPushResolver implements MergedShuffleFileManager {
       msg.shuffleId, msg.shuffleMergeId, msg.appId, msg.appAttemptId);
     AppShuffleInfo appShuffleInfo = validateAndGetAppShuffleInfo(msg.appId);
     if (appShuffleInfo.attemptId != msg.appAttemptId) {
-      // If this Block belongs to a former application attempt, it is considered late,
-      // as only the blocks from the current application attempt will be merged
-      // TODO: [SPARK-35548] Client should be updated to handle this error.
+      // If finalizeShuffleMerge from a former application attempt, it is considered late,
+      // as only the finalizeShuffleMerge request from the current application attempt
+      // will be merged. Too old app attempt only being seen by an already failed
+      // app attempt, and no need use callback to return to client now, because
+      // the finalizeShuffleMerge in DAGScheduler has no retry policy, and don't
+      // use the BlockPushNonFatalFailure because it's the finalizeShuffleMerge
+      // related case, not the block push case, just throw it in server side now.
+      // TODO we may use a new exception class to include the finalizeShuffleMerge
+      // related case just as the BlockPushNonFatalFailure contains the block push cases.
       throw new IllegalArgumentException(
         String.format("The attempt id %s in this FinalizeShuffleMerge message does not match "
-          + "with the current attempt id %s stored in shuffle service for application %s",
+            + "with the current attempt id %s stored in shuffle service for application %s",
           msg.appAttemptId, appShuffleInfo.attemptId, msg.appId));
     }
     AtomicReference<Map<Integer, AppShufflePartitionInfo>> shuffleMergePartitionsRef =
diff --git a/common/network-shuffle/src/test/java/org/apache/spark/network/shuffle/ErrorHandlerSuite.java b/common/network-shuffle/src/test/java/org/apache/spark/network/shuffle/ErrorHandlerSuite.java
index 56c9a97..246fda6 100644
--- a/common/network-shuffle/src/test/java/org/apache/spark/network/shuffle/ErrorHandlerSuite.java
+++ b/common/network-shuffle/src/test/java/org/apache/spark/network/shuffle/ErrorHandlerSuite.java
@@ -37,6 +37,8 @@ public class ErrorHandlerSuite {
     assertFalse(pushHandler.shouldRetryError(new BlockPushNonFatalFailure(
       ReturnCode.TOO_LATE_BLOCK_PUSH, "")));
     assertFalse(pushHandler.shouldRetryError(new BlockPushNonFatalFailure(
+      ReturnCode.TOO_OLD_ATTEMPT_PUSH, "")));
+    assertFalse(pushHandler.shouldRetryError(new BlockPushNonFatalFailure(
       ReturnCode.STALE_BLOCK_PUSH, "")));
     assertFalse(pushHandler.shouldRetryError(new RuntimeException(new ConnectException())));
     assertTrue(pushHandler.shouldRetryError(new BlockPushNonFatalFailure(
@@ -54,6 +56,8 @@ public class ErrorHandlerSuite {
     assertFalse(pushHandler.shouldLogError(new BlockPushNonFatalFailure(
       ReturnCode.TOO_LATE_BLOCK_PUSH, "")));
     assertFalse(pushHandler.shouldLogError(new BlockPushNonFatalFailure(
+      ReturnCode.TOO_OLD_ATTEMPT_PUSH, "")));
+    assertFalse(pushHandler.shouldLogError(new BlockPushNonFatalFailure(
       ReturnCode.STALE_BLOCK_PUSH, "")));
     assertFalse(pushHandler.shouldLogError(new BlockPushNonFatalFailure(
       ReturnCode.BLOCK_APPEND_COLLISION_DETECTED, "")));
diff --git a/common/network-shuffle/src/test/java/org/apache/spark/network/shuffle/RemoteBlockPushResolverSuite.java b/common/network-shuffle/src/test/java/org/apache/spark/network/shuffle/RemoteBlockPushResolverSuite.java
index 46b43bc..f4a29aa 100644
--- a/common/network-shuffle/src/test/java/org/apache/spark/network/shuffle/RemoteBlockPushResolverSuite.java
+++ b/common/network-shuffle/src/test/java/org/apache/spark/network/shuffle/RemoteBlockPushResolverSuite.java
@@ -112,6 +112,8 @@ public class RemoteBlockPushResolverSuite {
     assertFalse(errorHandler.shouldLogError(new BlockPushNonFatalFailure(
       BlockPushNonFatalFailure.ReturnCode.TOO_LATE_BLOCK_PUSH, "")));
     assertFalse(errorHandler.shouldLogError(new BlockPushNonFatalFailure(
+      BlockPushNonFatalFailure.ReturnCode.TOO_OLD_ATTEMPT_PUSH, "")));
+    assertFalse(errorHandler.shouldLogError(new BlockPushNonFatalFailure(
       BlockPushNonFatalFailure.ReturnCode.STALE_BLOCK_PUSH, "")));
     assertFalse(errorHandler.shouldLogError(new BlockPushNonFatalFailure(
       BlockPushNonFatalFailure.ReturnCode.BLOCK_APPEND_COLLISION_DETECTED, "")));
@@ -939,7 +941,7 @@ public class RemoteBlockPushResolverSuite {
     }
   }
 
-  @Test(expected = IllegalArgumentException.class)
+  @Test(expected = BlockPushNonFatalFailure.class)
   public void testPushBlockFromPreviousAttemptIsRejected()
       throws IOException, InterruptedException {
     Semaphore closed = new Semaphore(0);
@@ -998,11 +1000,12 @@ public class RemoteBlockPushResolverSuite {
     try {
       pushResolver.receiveBlockDataAsStream(
         new PushBlockStream(testApp, 1, 0, 0, 1, 0, 0));
-    } catch (IllegalArgumentException re) {
-      assertEquals(
-        "The attempt id 1 in this PushBlockStream message does not match " +
-          "with the current attempt id 2 stored in shuffle service for application " +
-          testApp, re.getMessage());
+    } catch (BlockPushNonFatalFailure re) {
+      BlockPushReturnCode errorCode =
+        (BlockPushReturnCode) BlockTransferMessage.Decoder.fromByteBuffer(re.getResponse());
+      assertEquals(BlockPushNonFatalFailure.ReturnCode.TOO_OLD_ATTEMPT_PUSH.id(),
+        errorCode.returnCode);
+      assertEquals(errorCode.failureBlockId, stream2.getID());
       throw re;
     }
   }
@@ -1034,7 +1037,7 @@ public class RemoteBlockPushResolverSuite {
     } catch (IllegalArgumentException e) {
       assertEquals(e.getMessage(),
         String.format("The attempt id %s in this FinalizeShuffleMerge message does not " +
-          "match with the current attempt id %s stored in shuffle service for application %s",
+            "match with the current attempt id %s stored in shuffle service for application %s",
           ATTEMPT_ID_1, ATTEMPT_ID_2, testApp));
       throw e;
     }
diff --git a/core/src/main/scala/org/apache/spark/shuffle/ShuffleBlockPusher.scala b/core/src/main/scala/org/apache/spark/shuffle/ShuffleBlockPusher.scala
index e6af767..bb260f8 100644
--- a/core/src/main/scala/org/apache/spark/shuffle/ShuffleBlockPusher.scala
+++ b/core/src/main/scala/org/apache/spark/shuffle/ShuffleBlockPusher.scala
@@ -32,7 +32,6 @@ import org.apache.spark.launcher.SparkLauncher
 import org.apache.spark.network.buffer.{FileSegmentManagedBuffer, ManagedBuffer, NioManagedBuffer}
 import org.apache.spark.network.netty.SparkTransportConf
 import org.apache.spark.network.server.BlockPushNonFatalFailure
-import org.apache.spark.network.server.BlockPushNonFatalFailure.ReturnCode
 import org.apache.spark.network.shuffle.BlockPushingListener
 import org.apache.spark.network.shuffle.ErrorHandler.BlockPushErrorHandler
 import org.apache.spark.network.util.TransportConf
@@ -78,12 +77,11 @@ private[spark] class ShuffleBlockPusher(conf: SparkConf) extends Logging {
         if (t.getCause != null && t.getCause.isInstanceOf[FileNotFoundException]) {
           return false
         }
-        // If the block is too late or the invalid block push, there is no need to retry it
+        // If the block is too late or the invalid block push or the attempt is not the latest one,
+        // there is no need to retry it
         !(t.isInstanceOf[BlockPushNonFatalFailure] &&
-          (t.asInstanceOf[BlockPushNonFatalFailure].getReturnCode
-            == ReturnCode.TOO_LATE_BLOCK_PUSH ||
-            t.asInstanceOf[BlockPushNonFatalFailure].getReturnCode
-            == ReturnCode.STALE_BLOCK_PUSH))
+          BlockPushNonFatalFailure.
+            shouldNotRetryErrorCode(t.asInstanceOf[BlockPushNonFatalFailure].getReturnCode));
       }
     }
   }
diff --git a/core/src/test/scala/org/apache/spark/shuffle/ShuffleBlockPusherSuite.scala b/core/src/test/scala/org/apache/spark/shuffle/ShuffleBlockPusherSuite.scala
index 6f9b5e4..298ba50 100644
--- a/core/src/test/scala/org/apache/spark/shuffle/ShuffleBlockPusherSuite.scala
+++ b/core/src/test/scala/org/apache/spark/shuffle/ShuffleBlockPusherSuite.scala
@@ -224,6 +224,9 @@ class ShuffleBlockPusherSuite extends SparkFunSuite with BeforeAndAfterEach {
         ReturnCode.TOO_LATE_BLOCK_PUSH, "")))
     assert(
       !errorHandler.shouldRetryError(new BlockPushNonFatalFailure(
+        ReturnCode.TOO_OLD_ATTEMPT_PUSH, "")))
+    assert(
+      !errorHandler.shouldRetryError(new BlockPushNonFatalFailure(
         ReturnCode.STALE_BLOCK_PUSH, "")))
     assert(errorHandler.shouldRetryError(new RuntimeException(new ConnectException())))
     assert(
@@ -240,6 +243,9 @@ class ShuffleBlockPusherSuite extends SparkFunSuite with BeforeAndAfterEach {
         ReturnCode.TOO_LATE_BLOCK_PUSH, "")))
     assert(
       !errorHandler.shouldLogError(new BlockPushNonFatalFailure(
+        ReturnCode.TOO_OLD_ATTEMPT_PUSH, "")))
+    assert(
+      !errorHandler.shouldLogError(new BlockPushNonFatalFailure(
         ReturnCode.STALE_BLOCK_PUSH, "")))
     assert(!errorHandler.shouldLogError(new BlockPushNonFatalFailure(
       ReturnCode.BLOCK_APPEND_COLLISION_DETECTED, "")))

---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@spark.apache.org
For additional commands, e-mail: commits-help@spark.apache.org