You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@spark.apache.org by mr...@apache.org on 2021/08/16 18:59:25 UTC
[spark] branch master updated: [SPARK-35548][CORE][SHUFFLE]
Handling new attempt has started error message in BlockPushErrorHandler in
client
This is an automated email from the ASF dual-hosted git repository.
mridulm80 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/spark.git
The following commit(s) were added to refs/heads/master by this push:
new 05cd5f9 [SPARK-35548][CORE][SHUFFLE] Handling new attempt has started error message in BlockPushErrorHandler in client
05cd5f9 is described below
commit 05cd5f97c3dea25dacdbdb319243cdab9667c774
Author: zhuqi-lucas <82...@qq.com>
AuthorDate: Mon Aug 16 13:58:48 2021 -0500
[SPARK-35548][CORE][SHUFFLE] Handling new attempt has started error message in BlockPushErrorHandler in client
### What changes were proposed in this pull request?
Add a new type of error message in BlockPushErrorHandler which indicates the PushblockStream message is received after a new application attempt has started. This error message should be correctly handled in client without retrying the block push.
### Why are the changes needed?
When we get a block push failure because of the too old attempt, we will not retry pushing the block nor log the exception on the client side.
### Does this PR introduce _any_ user-facing change?
No
### How was this patch tested?
Add the corresponding unit test.
Closes #33617 from zhuqi-lucas/master.
Authored-by: zhuqi-lucas <82...@qq.com>
Signed-off-by: Mridul Muralidharan <mridul<at>gmail.com>
---
.../network/server/BlockPushNonFatalFailure.java | 22 +++++++++++++++-
.../network/server/TransportRequestHandler.java | 13 ++++++++--
.../apache/spark/network/shuffle/ErrorHandler.java | 7 ++---
.../network/shuffle/RemoteBlockPushResolver.java | 30 ++++++++++++----------
.../spark/network/shuffle/ErrorHandlerSuite.java | 4 +++
.../shuffle/RemoteBlockPushResolverSuite.java | 17 +++++++-----
.../apache/spark/shuffle/ShuffleBlockPusher.scala | 10 +++-----
.../spark/shuffle/ShuffleBlockPusherSuite.scala | 6 +++++
8 files changed, 77 insertions(+), 32 deletions(-)
diff --git a/common/network-common/src/main/java/org/apache/spark/network/server/BlockPushNonFatalFailure.java b/common/network-common/src/main/java/org/apache/spark/network/server/BlockPushNonFatalFailure.java
index 5906fa2..4bb22b2 100644
--- a/common/network-common/src/main/java/org/apache/spark/network/server/BlockPushNonFatalFailure.java
+++ b/common/network-common/src/main/java/org/apache/spark/network/server/BlockPushNonFatalFailure.java
@@ -40,6 +40,14 @@ public class BlockPushNonFatalFailure extends RuntimeException {
" is received after merged shuffle is finalized";
/**
+ * String constant used for generating exception messages indicating the application attempt is
+ * not the latest attempt on the server side. When we get a block push failure because of the too
+ * old attempt, we will not retry pushing the block nor log the exception on the client side.
+ */
+ public static final String TOO_OLD_ATTEMPT_SUFFIX =
+ " is from an older app attempt";
+
+ /**
* String constant used for generating exception messages indicating a block to be merged
* is a stale block push in the case of indeterminate stage retries on the server side.
* When we get a block push failure because of the block push being stale, we will not
@@ -124,7 +132,12 @@ public class BlockPushNonFatalFailure extends RuntimeException {
* indeterminate stage retries. When the client receives this code, it will not retry
* pushing the block.
*/
- STALE_BLOCK_PUSH(3, STALE_BLOCK_PUSH_MESSAGE_SUFFIX);
+ STALE_BLOCK_PUSH(3, STALE_BLOCK_PUSH_MESSAGE_SUFFIX),
+ /**
+ * Indicate the application attempt is not the latest attempt on the server side.
+ * When the client gets this code, it will not retry pushing the block.
+ */
+ TOO_OLD_ATTEMPT_PUSH(4, TOO_OLD_ATTEMPT_SUFFIX);
private final byte id;
// Error message suffix used to generate an error message for a given ReturnCode and
@@ -146,10 +159,17 @@ public class BlockPushNonFatalFailure extends RuntimeException {
case 1: return ReturnCode.TOO_LATE_BLOCK_PUSH;
case 2: return ReturnCode.BLOCK_APPEND_COLLISION_DETECTED;
case 3: return ReturnCode.STALE_BLOCK_PUSH;
+ case 4: return ReturnCode.TOO_OLD_ATTEMPT_PUSH;
default: throw new IllegalArgumentException("Unknown block push return code: " + id);
}
}
+ public static boolean shouldNotRetryErrorCode(ReturnCode returnCode) {
+ return returnCode == ReturnCode.TOO_LATE_BLOCK_PUSH ||
+ returnCode == ReturnCode.STALE_BLOCK_PUSH ||
+ returnCode == ReturnCode.TOO_OLD_ATTEMPT_PUSH;
+ }
+
public static String getErrorMsg(String blockId, ReturnCode errorCode) {
Preconditions.checkArgument(errorCode != ReturnCode.SUCCESS);
return "Block " + blockId + errorCode.errorMsgSuffix;
diff --git a/common/network-common/src/main/java/org/apache/spark/network/server/TransportRequestHandler.java b/common/network-common/src/main/java/org/apache/spark/network/server/TransportRequestHandler.java
index 5c07f20..bc99248 100644
--- a/common/network-common/src/main/java/org/apache/spark/network/server/TransportRequestHandler.java
+++ b/common/network-common/src/main/java/org/apache/spark/network/server/TransportRequestHandler.java
@@ -249,8 +249,17 @@ public class TransportRequestHandler extends MessageHandler<RequestMessage> {
wrappedCallback.onComplete(wrappedCallback.getID());
}
} catch (Exception e) {
- logger.error("Error while invoking RpcHandler#receive() on RPC id " + req.requestId, e);
- respond(new RpcFailure(req.requestId, Throwables.getStackTraceAsString(e)));
+ if (e instanceof BlockPushNonFatalFailure) {
+ // Thrown by rpcHandler.receiveStream(reverseClient, meta, callback), the same as
+ // onComplete method. Respond an RPC message with the error code to client instead of
+ // using exceptions encoded in the RPCFailure. Using a proper RPCResponse is more
+ // efficient, and now only include the too old attempt case here.
+ respond(new RpcResponse(req.requestId,
+ new NioManagedBuffer(((BlockPushNonFatalFailure) e).getResponse())));
+ } else {
+ logger.error("Error while invoking RpcHandler#receive() on RPC id " + req.requestId, e);
+ respond(new RpcFailure(req.requestId, Throwables.getStackTraceAsString(e)));
+ }
// We choose to totally fail the channel, rather than trying to recover as we do in other
// cases. We don't know how many bytes of the stream the client has already sent for the
// stream, it's not worth trying to recover.
diff --git a/common/network-shuffle/src/main/java/org/apache/spark/network/shuffle/ErrorHandler.java b/common/network-shuffle/src/main/java/org/apache/spark/network/shuffle/ErrorHandler.java
index 271d762..9136ff6 100644
--- a/common/network-shuffle/src/main/java/org/apache/spark/network/shuffle/ErrorHandler.java
+++ b/common/network-shuffle/src/main/java/org/apache/spark/network/shuffle/ErrorHandler.java
@@ -87,10 +87,11 @@ public interface ErrorHandler {
return false;
}
- // If the block is too late or stale block push, there is no need to retry it
+ // If the block is too late or the invalid block push or the attempt is not the latest one,
+ // there is no need to retry it
return !(t instanceof BlockPushNonFatalFailure &&
- (((BlockPushNonFatalFailure) t).getReturnCode() == TOO_LATE_BLOCK_PUSH ||
- ((BlockPushNonFatalFailure) t).getReturnCode() == STALE_BLOCK_PUSH));
+ BlockPushNonFatalFailure
+ .shouldNotRetryErrorCode(((BlockPushNonFatalFailure) t).getReturnCode()));
}
@Override
diff --git a/common/network-shuffle/src/main/java/org/apache/spark/network/shuffle/RemoteBlockPushResolver.java b/common/network-shuffle/src/main/java/org/apache/spark/network/shuffle/RemoteBlockPushResolver.java
index 30777ca..e7fe8e2 100644
--- a/common/network-shuffle/src/main/java/org/apache/spark/network/shuffle/RemoteBlockPushResolver.java
+++ b/common/network-shuffle/src/main/java/org/apache/spark/network/shuffle/RemoteBlockPushResolver.java
@@ -395,19 +395,17 @@ public class RemoteBlockPushResolver implements MergedShuffleFileManager {
@Override
public StreamCallbackWithID receiveBlockDataAsStream(PushBlockStream msg) {
AppShuffleInfo appShuffleInfo = validateAndGetAppShuffleInfo(msg.appId);
- if (appShuffleInfo.attemptId != msg.appAttemptId) {
- // If this Block belongs to a former application attempt, it is considered late,
- // as only the blocks from the current application attempt will be merged
- // TODO: [SPARK-35548] Client should be updated to handle this error.
- throw new IllegalArgumentException(
- String.format("The attempt id %s in this PushBlockStream message does not match "
- + "with the current attempt id %s stored in shuffle service for application %s",
- msg.appAttemptId, appShuffleInfo.attemptId, msg.appId));
- }
// Use string concatenation here to avoid the overhead with String.format on every
// pushed block.
final String streamId = OneForOneBlockPusher.SHUFFLE_PUSH_BLOCK_PREFIX + "_"
+ msg.shuffleId + "_" + msg.shuffleMergeId + "_" + msg.mapIndex + "_" + msg.reduceId;
+ if (appShuffleInfo.attemptId != msg.appAttemptId) {
+ // If this Block belongs to a former application attempt, it is considered late,
+ // as only the blocks from the current application attempt will be merged
+ throw new BlockPushNonFatalFailure(new BlockPushReturnCode(ReturnCode
+ .TOO_OLD_ATTEMPT_PUSH.id(), streamId).toByteBuffer(),
+ BlockPushNonFatalFailure.getErrorMsg(streamId, ReturnCode.TOO_OLD_ATTEMPT_PUSH));
+ }
// Retrieve merged shuffle file metadata
AppShufflePartitionInfo partitionInfoBeforeCheck;
BlockPushNonFatalFailure failure = null;
@@ -513,12 +511,18 @@ public class RemoteBlockPushResolver implements MergedShuffleFileManager {
msg.shuffleId, msg.shuffleMergeId, msg.appId, msg.appAttemptId);
AppShuffleInfo appShuffleInfo = validateAndGetAppShuffleInfo(msg.appId);
if (appShuffleInfo.attemptId != msg.appAttemptId) {
- // If this Block belongs to a former application attempt, it is considered late,
- // as only the blocks from the current application attempt will be merged
- // TODO: [SPARK-35548] Client should be updated to handle this error.
+ // If finalizeShuffleMerge from a former application attempt, it is considered late,
+ // as only the finalizeShuffleMerge request from the current application attempt
+ // will be merged. Too old app attempt only being seen by an already failed
+ // app attempt, and no need use callback to return to client now, because
+ // the finalizeShuffleMerge in DAGScheduler has no retry policy, and don't
+ // use the BlockPushNonFatalFailure because it's the finalizeShuffleMerge
+ // related case, not the block push case, just throw it in server side now.
+ // TODO we may use a new exception class to include the finalizeShuffleMerge
+ // related case just as the BlockPushNonFatalFailure contains the block push cases.
throw new IllegalArgumentException(
String.format("The attempt id %s in this FinalizeShuffleMerge message does not match "
- + "with the current attempt id %s stored in shuffle service for application %s",
+ + "with the current attempt id %s stored in shuffle service for application %s",
msg.appAttemptId, appShuffleInfo.attemptId, msg.appId));
}
AtomicReference<Map<Integer, AppShufflePartitionInfo>> shuffleMergePartitionsRef =
diff --git a/common/network-shuffle/src/test/java/org/apache/spark/network/shuffle/ErrorHandlerSuite.java b/common/network-shuffle/src/test/java/org/apache/spark/network/shuffle/ErrorHandlerSuite.java
index 56c9a97..246fda6 100644
--- a/common/network-shuffle/src/test/java/org/apache/spark/network/shuffle/ErrorHandlerSuite.java
+++ b/common/network-shuffle/src/test/java/org/apache/spark/network/shuffle/ErrorHandlerSuite.java
@@ -37,6 +37,8 @@ public class ErrorHandlerSuite {
assertFalse(pushHandler.shouldRetryError(new BlockPushNonFatalFailure(
ReturnCode.TOO_LATE_BLOCK_PUSH, "")));
assertFalse(pushHandler.shouldRetryError(new BlockPushNonFatalFailure(
+ ReturnCode.TOO_OLD_ATTEMPT_PUSH, "")));
+ assertFalse(pushHandler.shouldRetryError(new BlockPushNonFatalFailure(
ReturnCode.STALE_BLOCK_PUSH, "")));
assertFalse(pushHandler.shouldRetryError(new RuntimeException(new ConnectException())));
assertTrue(pushHandler.shouldRetryError(new BlockPushNonFatalFailure(
@@ -54,6 +56,8 @@ public class ErrorHandlerSuite {
assertFalse(pushHandler.shouldLogError(new BlockPushNonFatalFailure(
ReturnCode.TOO_LATE_BLOCK_PUSH, "")));
assertFalse(pushHandler.shouldLogError(new BlockPushNonFatalFailure(
+ ReturnCode.TOO_OLD_ATTEMPT_PUSH, "")));
+ assertFalse(pushHandler.shouldLogError(new BlockPushNonFatalFailure(
ReturnCode.STALE_BLOCK_PUSH, "")));
assertFalse(pushHandler.shouldLogError(new BlockPushNonFatalFailure(
ReturnCode.BLOCK_APPEND_COLLISION_DETECTED, "")));
diff --git a/common/network-shuffle/src/test/java/org/apache/spark/network/shuffle/RemoteBlockPushResolverSuite.java b/common/network-shuffle/src/test/java/org/apache/spark/network/shuffle/RemoteBlockPushResolverSuite.java
index 46b43bc..f4a29aa 100644
--- a/common/network-shuffle/src/test/java/org/apache/spark/network/shuffle/RemoteBlockPushResolverSuite.java
+++ b/common/network-shuffle/src/test/java/org/apache/spark/network/shuffle/RemoteBlockPushResolverSuite.java
@@ -112,6 +112,8 @@ public class RemoteBlockPushResolverSuite {
assertFalse(errorHandler.shouldLogError(new BlockPushNonFatalFailure(
BlockPushNonFatalFailure.ReturnCode.TOO_LATE_BLOCK_PUSH, "")));
assertFalse(errorHandler.shouldLogError(new BlockPushNonFatalFailure(
+ BlockPushNonFatalFailure.ReturnCode.TOO_OLD_ATTEMPT_PUSH, "")));
+ assertFalse(errorHandler.shouldLogError(new BlockPushNonFatalFailure(
BlockPushNonFatalFailure.ReturnCode.STALE_BLOCK_PUSH, "")));
assertFalse(errorHandler.shouldLogError(new BlockPushNonFatalFailure(
BlockPushNonFatalFailure.ReturnCode.BLOCK_APPEND_COLLISION_DETECTED, "")));
@@ -939,7 +941,7 @@ public class RemoteBlockPushResolverSuite {
}
}
- @Test(expected = IllegalArgumentException.class)
+ @Test(expected = BlockPushNonFatalFailure.class)
public void testPushBlockFromPreviousAttemptIsRejected()
throws IOException, InterruptedException {
Semaphore closed = new Semaphore(0);
@@ -998,11 +1000,12 @@ public class RemoteBlockPushResolverSuite {
try {
pushResolver.receiveBlockDataAsStream(
new PushBlockStream(testApp, 1, 0, 0, 1, 0, 0));
- } catch (IllegalArgumentException re) {
- assertEquals(
- "The attempt id 1 in this PushBlockStream message does not match " +
- "with the current attempt id 2 stored in shuffle service for application " +
- testApp, re.getMessage());
+ } catch (BlockPushNonFatalFailure re) {
+ BlockPushReturnCode errorCode =
+ (BlockPushReturnCode) BlockTransferMessage.Decoder.fromByteBuffer(re.getResponse());
+ assertEquals(BlockPushNonFatalFailure.ReturnCode.TOO_OLD_ATTEMPT_PUSH.id(),
+ errorCode.returnCode);
+ assertEquals(errorCode.failureBlockId, stream2.getID());
throw re;
}
}
@@ -1034,7 +1037,7 @@ public class RemoteBlockPushResolverSuite {
} catch (IllegalArgumentException e) {
assertEquals(e.getMessage(),
String.format("The attempt id %s in this FinalizeShuffleMerge message does not " +
- "match with the current attempt id %s stored in shuffle service for application %s",
+ "match with the current attempt id %s stored in shuffle service for application %s",
ATTEMPT_ID_1, ATTEMPT_ID_2, testApp));
throw e;
}
diff --git a/core/src/main/scala/org/apache/spark/shuffle/ShuffleBlockPusher.scala b/core/src/main/scala/org/apache/spark/shuffle/ShuffleBlockPusher.scala
index e6af767..bb260f8 100644
--- a/core/src/main/scala/org/apache/spark/shuffle/ShuffleBlockPusher.scala
+++ b/core/src/main/scala/org/apache/spark/shuffle/ShuffleBlockPusher.scala
@@ -32,7 +32,6 @@ import org.apache.spark.launcher.SparkLauncher
import org.apache.spark.network.buffer.{FileSegmentManagedBuffer, ManagedBuffer, NioManagedBuffer}
import org.apache.spark.network.netty.SparkTransportConf
import org.apache.spark.network.server.BlockPushNonFatalFailure
-import org.apache.spark.network.server.BlockPushNonFatalFailure.ReturnCode
import org.apache.spark.network.shuffle.BlockPushingListener
import org.apache.spark.network.shuffle.ErrorHandler.BlockPushErrorHandler
import org.apache.spark.network.util.TransportConf
@@ -78,12 +77,11 @@ private[spark] class ShuffleBlockPusher(conf: SparkConf) extends Logging {
if (t.getCause != null && t.getCause.isInstanceOf[FileNotFoundException]) {
return false
}
- // If the block is too late or the invalid block push, there is no need to retry it
+ // If the block is too late or the invalid block push or the attempt is not the latest one,
+ // there is no need to retry it
!(t.isInstanceOf[BlockPushNonFatalFailure] &&
- (t.asInstanceOf[BlockPushNonFatalFailure].getReturnCode
- == ReturnCode.TOO_LATE_BLOCK_PUSH ||
- t.asInstanceOf[BlockPushNonFatalFailure].getReturnCode
- == ReturnCode.STALE_BLOCK_PUSH))
+ BlockPushNonFatalFailure.
+ shouldNotRetryErrorCode(t.asInstanceOf[BlockPushNonFatalFailure].getReturnCode));
}
}
}
diff --git a/core/src/test/scala/org/apache/spark/shuffle/ShuffleBlockPusherSuite.scala b/core/src/test/scala/org/apache/spark/shuffle/ShuffleBlockPusherSuite.scala
index 6f9b5e4..298ba50 100644
--- a/core/src/test/scala/org/apache/spark/shuffle/ShuffleBlockPusherSuite.scala
+++ b/core/src/test/scala/org/apache/spark/shuffle/ShuffleBlockPusherSuite.scala
@@ -224,6 +224,9 @@ class ShuffleBlockPusherSuite extends SparkFunSuite with BeforeAndAfterEach {
ReturnCode.TOO_LATE_BLOCK_PUSH, "")))
assert(
!errorHandler.shouldRetryError(new BlockPushNonFatalFailure(
+ ReturnCode.TOO_OLD_ATTEMPT_PUSH, "")))
+ assert(
+ !errorHandler.shouldRetryError(new BlockPushNonFatalFailure(
ReturnCode.STALE_BLOCK_PUSH, "")))
assert(errorHandler.shouldRetryError(new RuntimeException(new ConnectException())))
assert(
@@ -240,6 +243,9 @@ class ShuffleBlockPusherSuite extends SparkFunSuite with BeforeAndAfterEach {
ReturnCode.TOO_LATE_BLOCK_PUSH, "")))
assert(
!errorHandler.shouldLogError(new BlockPushNonFatalFailure(
+ ReturnCode.TOO_OLD_ATTEMPT_PUSH, "")))
+ assert(
+ !errorHandler.shouldLogError(new BlockPushNonFatalFailure(
ReturnCode.STALE_BLOCK_PUSH, "")))
assert(!errorHandler.shouldLogError(new BlockPushNonFatalFailure(
ReturnCode.BLOCK_APPEND_COLLISION_DETECTED, "")))
---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@spark.apache.org
For additional commands, e-mail: commits-help@spark.apache.org