You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@spark.apache.org by mr...@apache.org on 2022/01/29 03:14:43 UTC
[spark] branch branch-3.2 updated: [SPARK-37675][SPARK-37793] Prevent overwriting of push shuffle merged files once the shuffle is finalized
This is an automated email from the ASF dual-hosted git repository.
mridulm80 pushed a commit to branch branch-3.2
in repository https://gitbox.apache.org/repos/asf/spark.git
The following commit(s) were added to refs/heads/branch-3.2 by this push:
new 837c6b4 [SPARK-37675][SPARK-37793] Prevent overwriting of push shuffle merged files once the shuffle is finalized
837c6b4 is described below
commit 837c6b436745cb9fb8411b30e0a26144a46a3589
Author: Chandni Singh <si...@gmail.com>
AuthorDate: Fri Jan 28 21:12:28 2022 -0600
[SPARK-37675][SPARK-37793] Prevent overwriting of push shuffle merged files once the shuffle is finalized
### What changes were proposed in this pull request?
This fixes the bugs that were reported in SPARK-37675 and SPARK-37793.
- Empty merged partitions were reported by the shuffle server to the driver.
- The push merged files were getting overwritten after a shuffle merge is finalized.
- Throwing exception in the finalization of a shuffle for which the shuffle server didn't receive any blocks.
### Why are the changes needed?
Changes are need to fix the bug.
### Does this PR introduce _any_ user-facing change?
No
### How was this patch tested?
Have added unit test.
Closes #35325 from otterc/SPARK-37675.
Authored-by: Chandni Singh <si...@gmail.com>
Signed-off-by: Mridul Muralidharan <mridul<at>gmail.com>
(cherry picked from commit 9afb407fa7aaf2f0961661b5d8cfbec549e591ee)
Signed-off-by: Mridul Muralidharan <mridulatgmail.com>
---
.../network/shuffle/RemoteBlockPushResolver.java | 171 ++++++++++-----------
.../shuffle/RemoteBlockPushResolverSuite.java | 77 +++++++++-
2 files changed, 158 insertions(+), 90 deletions(-)
diff --git a/common/network-shuffle/src/main/java/org/apache/spark/network/shuffle/RemoteBlockPushResolver.java b/common/network-shuffle/src/main/java/org/apache/spark/network/shuffle/RemoteBlockPushResolver.java
index d04db67..c1d4d5b 100644
--- a/common/network-shuffle/src/main/java/org/apache/spark/network/shuffle/RemoteBlockPushResolver.java
+++ b/common/network-shuffle/src/main/java/org/apache/spark/network/shuffle/RemoteBlockPushResolver.java
@@ -82,20 +82,11 @@ public class RemoteBlockPushResolver implements MergedShuffleFileManager {
public static final String MERGE_DIR_KEY = "mergeDir";
public static final String ATTEMPT_ID_KEY = "attemptId";
private static final int UNDEFINED_ATTEMPT_ID = -1;
- // Shuffles of determinate stages will have shuffleMergeId set to 0
- private static final int DETERMINATE_SHUFFLE_MERGE_ID = 0;
private static final ErrorHandler.BlockPushErrorHandler ERROR_HANDLER = createErrorHandler();
// ByteBuffer to respond to client upon a successful merge of a pushed block
private static final ByteBuffer SUCCESS_RESPONSE =
new BlockPushReturnCode(ReturnCode.SUCCESS.id(), "").toByteBuffer().asReadOnlyBuffer();
- // ConcurrentHashMap doesn't allow null for keys or values which is why this is required.
- // Marker to identify finalized indeterminate shuffle partitions in the case of indeterminate
- // stage retries.
- @VisibleForTesting
- public static final Map<Integer, AppShufflePartitionInfo> INDETERMINATE_SHUFFLE_FINALIZED =
- Collections.emptyMap();
-
/**
* A concurrent hashmap where the key is the applicationId, and the value includes
* all the merged shuffle information for this application. AppShuffleInfo stores
@@ -168,59 +159,45 @@ public class RemoteBlockPushResolver implements MergedShuffleFileManager {
String blockId) throws BlockPushNonFatalFailure {
ConcurrentMap<Integer, AppShuffleMergePartitionsInfo> shuffles = appShuffleInfo.shuffles;
AppShuffleMergePartitionsInfo shufflePartitionsWithMergeId =
- shuffles.compute(shuffleId, (id, appShuffleMergePartitionsInfo) -> {
- if (appShuffleMergePartitionsInfo == null) {
- File dataFile =
- appShuffleInfo.getMergedShuffleDataFile(shuffleId, shuffleMergeId, reduceId);
- // If this partition is already finalized then the partitions map will not contain the
- // shuffleId for determinate stages but the data file would exist.
- // In that case the block is considered late. In the case of indeterminate stages, most
- // recent shuffleMergeId finalized would be pointing to INDETERMINATE_SHUFFLE_FINALIZED
- if (dataFile.exists()) {
- throw new BlockPushNonFatalFailure(new BlockPushReturnCode(
- ReturnCode.TOO_LATE_BLOCK_PUSH.id(), blockId).toByteBuffer(),
- BlockPushNonFatalFailure.getErrorMsg(blockId, ReturnCode.TOO_LATE_BLOCK_PUSH));
- } else {
- logger.info("Creating a new attempt for shuffle blocks push request for shuffle {}"
- + " with shuffleMergeId {} for application {}_{}", shuffleId, shuffleMergeId,
- appShuffleInfo.appId, appShuffleInfo.attemptId);
- return new AppShuffleMergePartitionsInfo(shuffleMergeId, false);
- }
+ shuffles.compute(shuffleId, (id, mergePartitionsInfo) -> {
+ if (mergePartitionsInfo == null) {
+ logger.info("{} attempt {} shuffle {} shuffleMerge {}: creating a new shuffle " +
+ "merge metadata", appShuffleInfo.appId, appShuffleInfo.attemptId, shuffleId,
+ shuffleMergeId);
+ return new AppShuffleMergePartitionsInfo(shuffleMergeId, false);
} else {
- // Reject the request as we have already seen a higher shuffleMergeId than the
- // current incoming one
- int latestShuffleMergeId = appShuffleMergePartitionsInfo.shuffleMergeId;
+ int latestShuffleMergeId = mergePartitionsInfo.shuffleMergeId;
if (latestShuffleMergeId > shuffleMergeId) {
+ // Reject the request as we have already seen a higher shuffleMergeId than the one
+ // in the current request.
throw new BlockPushNonFatalFailure(
new BlockPushReturnCode(ReturnCode.STALE_BLOCK_PUSH.id(), blockId).toByteBuffer(),
BlockPushNonFatalFailure.getErrorMsg(blockId, ReturnCode.STALE_BLOCK_PUSH));
- } else if (latestShuffleMergeId == shuffleMergeId) {
- return appShuffleMergePartitionsInfo;
- } else {
+ } else if (latestShuffleMergeId < shuffleMergeId){
// Higher shuffleMergeId seen for the shuffle ID meaning new stage attempt is being
// run for the shuffle ID. Close and clean up old shuffleMergeId files,
// happens in the indeterminate stage retries
- logger.info("Creating a new attempt for shuffle blocks push request for shuffle {}"
- + " with shuffleMergeId {} for application {}_{} since it is higher than the"
- + " latest shuffleMergeId {} already seen", shuffleId, shuffleMergeId,
- appShuffleInfo.appId, appShuffleInfo.attemptId, latestShuffleMergeId);
+ logger.info("{} attempt {} shuffle {} shuffleMerge {}: creating a new shuffle " +
+ "merge metadata since received shuffleMergeId is higher than latest " +
+ "shuffleMergeId {}", appShuffleInfo.appId, appShuffleInfo.attemptId, shuffleId,
+ shuffleMergeId, latestShuffleMergeId);
mergedShuffleCleaner.execute(() ->
- closeAndDeletePartitionFiles(appShuffleMergePartitionsInfo.shuffleMergePartitions));
+ closeAndDeletePartitionFiles(mergePartitionsInfo.shuffleMergePartitions));
return new AppShuffleMergePartitionsInfo(shuffleMergeId, false);
+ } else {
+ // The request is for block with same shuffleMergeId as the latest shuffleMergeId
+ if (mergePartitionsInfo.isFinalized()) {
+ throw new BlockPushNonFatalFailure(
+ new BlockPushReturnCode(
+ ReturnCode.TOO_LATE_BLOCK_PUSH.id(), blockId).toByteBuffer(),
+ BlockPushNonFatalFailure.getErrorMsg(blockId, ReturnCode.TOO_LATE_BLOCK_PUSH));
+ }
+ return mergePartitionsInfo;
}
}
});
-
- // It only gets here when the shuffle is already finalized.
- if (null == shufflePartitionsWithMergeId ||
- INDETERMINATE_SHUFFLE_FINALIZED == shufflePartitionsWithMergeId.shuffleMergePartitions) {
- throw new BlockPushNonFatalFailure(
- new BlockPushReturnCode(ReturnCode.TOO_LATE_BLOCK_PUSH.id(), blockId).toByteBuffer(),
- BlockPushNonFatalFailure.getErrorMsg(blockId, ReturnCode.TOO_LATE_BLOCK_PUSH));
- }
-
Map<Integer, AppShufflePartitionInfo> shuffleMergePartitions =
- shufflePartitionsWithMergeId.shuffleMergePartitions;
+ shufflePartitionsWithMergeId.shuffleMergePartitions;
return shuffleMergePartitions.computeIfAbsent(reduceId, key -> {
// It only gets here when the key is not present in the map. The first time the merge
// manager receives a pushed block for a given application shuffle partition.
@@ -234,9 +211,9 @@ public class RemoteBlockPushResolver implements MergedShuffleFileManager {
return newAppShufflePartitionInfo(appShuffleInfo.appId, shuffleId, shuffleMergeId,
reduceId, dataFile, indexFile, metaFile);
} catch (IOException e) {
- logger.error(
- "Cannot create merged shuffle partition with data file {}, index file {}, and "
- + "meta file {}", dataFile.getAbsolutePath(),
+ logger.error("{} attempt {} shuffle {} shuffleMerge {}: cannot create merged shuffle " +
+ "partition with data file {}, index file {}, and meta file {}", appShuffleInfo.appId,
+ appShuffleInfo.attemptId, shuffleId, shuffleMergeId, dataFile.getAbsolutePath(),
indexFile.getAbsolutePath(), metaFile.getAbsolutePath());
throw new RuntimeException(
String.format("Cannot initialize merged shuffle partition for appId %s shuffleId %s "
@@ -349,6 +326,7 @@ public class RemoteBlockPushResolver implements MergedShuffleFileManager {
* If cleanupLocalDirs is true, the merged shuffle files will also be deleted.
* The cleanup will be executed in a separate thread.
*/
+ @SuppressWarnings("SynchronizationOnLocalVariableOrMethodParameter")
@VisibleForTesting
void closeAndDeletePartitionFilesIfNeeded(
AppShuffleInfo appShuffleInfo,
@@ -511,10 +489,11 @@ public class RemoteBlockPushResolver implements MergedShuffleFileManager {
}
}
+ @SuppressWarnings("SynchronizationOnLocalVariableOrMethodParameter")
@Override
public MergeStatuses finalizeShuffleMerge(FinalizeShuffleMerge msg) {
- logger.info("Finalizing shuffle {} with shuffleMergeId {} from Application {}_{}.",
- msg.shuffleId, msg.shuffleMergeId, msg.appId, msg.appAttemptId);
+ logger.info("{} attempt {} shuffle {} shuffleMerge {}: finalize shuffle merge",
+ msg.appId, msg.appAttemptId, msg.shuffleId, msg.shuffleMergeId);
AppShuffleInfo appShuffleInfo = validateAndGetAppShuffleInfo(msg.appId);
if (appShuffleInfo.attemptId != msg.appAttemptId) {
// If finalizeShuffleMerge from a former application attempt, it is considered late,
@@ -533,35 +512,33 @@ public class RemoteBlockPushResolver implements MergedShuffleFileManager {
}
AtomicReference<Map<Integer, AppShufflePartitionInfo>> shuffleMergePartitionsRef =
new AtomicReference<>(null);
- // Metadata of the determinate stage shuffle can be safely removed as part of finalizing
- // shuffle merge. Currently once the shuffle is finalized for a determinate stages, retry
- // stages of the same shuffle will have shuffle push disabled.
- if (msg.shuffleMergeId == DETERMINATE_SHUFFLE_MERGE_ID) {
- AppShuffleMergePartitionsInfo appShuffleMergePartitionsInfo =
- appShuffleInfo.shuffles.remove(msg.shuffleId);
- if (appShuffleMergePartitionsInfo != null) {
- shuffleMergePartitionsRef.set(appShuffleMergePartitionsInfo.shuffleMergePartitions);
- }
- } else {
- appShuffleInfo.shuffles.compute(msg.shuffleId, (id, value) -> {
- if (null == value || msg.shuffleMergeId < value.shuffleMergeId ||
- INDETERMINATE_SHUFFLE_FINALIZED == value.shuffleMergePartitions) {
+ appShuffleInfo.shuffles.compute(msg.shuffleId, (shuffleId, mergePartitionsInfo) -> {
+ if (null != mergePartitionsInfo) {
+ if (msg.shuffleMergeId < mergePartitionsInfo.shuffleMergeId ||
+ mergePartitionsInfo.isFinalized()) {
throw new RuntimeException(String.format(
- "Shuffle merge finalize request for shuffle %s with" + " shuffleMergeId %s is %s",
- msg.shuffleId, msg.shuffleMergeId,
- ErrorHandler.BlockPushErrorHandler.STALE_SHUFFLE_FINALIZE_SUFFIX));
- } else if (msg.shuffleMergeId > value.shuffleMergeId) {
+ "Shuffle merge finalize request for shuffle %s with" + " shuffleMergeId %s is %s",
+ msg.shuffleId, msg.shuffleMergeId,
+ ErrorHandler.BlockPushErrorHandler.STALE_SHUFFLE_FINALIZE_SUFFIX));
+ } else if (msg.shuffleMergeId > mergePartitionsInfo.shuffleMergeId) {
// If no blocks pushed for the finalizeShuffleMerge shuffleMergeId then return
// empty MergeStatuses but cleanup the older shuffleMergeId files.
mergedShuffleCleaner.execute(() ->
- closeAndDeletePartitionFiles(value.shuffleMergePartitions));
- return new AppShuffleMergePartitionsInfo(msg.shuffleMergeId, true);
+ closeAndDeletePartitionFiles(mergePartitionsInfo.shuffleMergePartitions));
} else {
- shuffleMergePartitionsRef.set(value.shuffleMergePartitions);
- return new AppShuffleMergePartitionsInfo(msg.shuffleMergeId, true);
+ // This block covers:
+ // 1. finalization of determinate stage
+ // 2. finalization of indeterminate stage if the shuffleMergeId related to it is the one
+ // for which the message is received.
+ shuffleMergePartitionsRef.set(mergePartitionsInfo.shuffleMergePartitions);
}
- });
- }
+ }
+ // Even when the mergePartitionsInfo is null, we mark the shuffle as finalized but the results
+ // sent to the driver will be empty. This cam happen when the service didn't receive any
+ // blocks for the shuffle yet and the driver didn't wait for enough time to finalize the
+ // shuffle.
+ return new AppShuffleMergePartitionsInfo(msg.shuffleMergeId, true);
+ });
Map<Integer, AppShufflePartitionInfo> shuffleMergePartitions = shuffleMergePartitionsRef.get();
MergeStatuses mergeStatuses;
if (null == shuffleMergePartitions || shuffleMergePartitions.isEmpty()) {
@@ -575,14 +552,25 @@ public class RemoteBlockPushResolver implements MergedShuffleFileManager {
for (AppShufflePartitionInfo partition: shuffleMergePartitions.values()) {
synchronized (partition) {
try {
+ logger.debug("{} attempt {} shuffle {} shuffleMerge {}: finalizing shuffle " +
+ "partition {} ", msg.appId, msg.appAttemptId, msg.shuffleId,
+ msg.shuffleMergeId, partition.reduceId);
// This can throw IOException which will marks this shuffle partition as not merged.
partition.finalizePartition();
- bitmaps.add(partition.mapTracker);
- reduceIds.add(partition.reduceId);
- sizes.add(partition.getLastChunkOffset());
+ if (partition.mapTracker.getCardinality() > 0) {
+ bitmaps.add(partition.mapTracker);
+ reduceIds.add(partition.reduceId);
+ sizes.add(partition.getLastChunkOffset());
+ logger.debug("{} attempt {} shuffle {} shuffleMerge {}: finalization results " +
+ "added for partition {} data size {} index size {} meta size {}",
+ msg.appId, msg.appAttemptId, msg.shuffleId,
+ msg.shuffleMergeId, partition.reduceId, partition.getLastChunkOffset(),
+ partition.indexFile.getPos(), partition.metaFile.getPos());
+ }
} catch (IOException ioe) {
- logger.warn("Exception while finalizing shuffle partition {}_{} {} {}", msg.appId,
- msg.appAttemptId, msg.shuffleId, partition.reduceId, ioe);
+ logger.warn("{} attempt {} shuffle {} shuffleMerge {}: exception while " +
+ "finalizing shuffle partition {}", msg.appId, msg.appAttemptId, msg.shuffleId,
+ msg.shuffleMergeId, partition.reduceId);
} finally {
partition.closeAllFilesAndDeleteIfNeeded(false);
}
@@ -592,8 +580,8 @@ public class RemoteBlockPushResolver implements MergedShuffleFileManager {
bitmaps.toArray(new RoaringBitmap[bitmaps.size()]), Ints.toArray(reduceIds),
Longs.toArray(sizes));
}
- logger.info("Finalized shuffle {} with shuffleMergeId {} from Application {}_{}.",
- msg.shuffleId, msg.shuffleMergeId, msg.appId, msg.appAttemptId);
+ logger.info("{} attempt {} shuffle {} shuffleMerge {}: finalization of shuffle merge completed",
+ msg.appId, msg.appAttemptId, msg.shuffleId, msg.shuffleMergeId);
return mergeStatuses;
}
@@ -806,7 +794,7 @@ public class RemoteBlockPushResolver implements MergedShuffleFileManager {
AppShuffleMergePartitionsInfo appShuffleMergePartitionsInfo,
int reduceId) {
return null == appShuffleMergePartitionsInfo ||
- INDETERMINATE_SHUFFLE_FINALIZED == appShuffleMergePartitionsInfo.shuffleMergePartitions ||
+ appShuffleMergePartitionsInfo.isFinalized() ||
!appShuffleMergePartitionsInfo.shuffleMergePartitions.containsKey(reduceId);
}
@@ -1006,20 +994,27 @@ public class RemoteBlockPushResolver implements MergedShuffleFileManager {
* required for the shuffles of indeterminate stages.
*/
public static class AppShuffleMergePartitionsInfo {
+ // ConcurrentHashMap doesn't allow null for keys or values which is why this is required.
+ // Marker to identify finalized shuffle partitions.
+ private static final Map<Integer, AppShufflePartitionInfo> SHUFFLE_FINALIZED_MARKER =
+ Collections.emptyMap();
private final int shuffleMergeId;
private final Map<Integer, AppShufflePartitionInfo> shuffleMergePartitions;
- public AppShuffleMergePartitionsInfo(
- int shuffleMergeId, boolean shuffleFinalized) {
+ public AppShuffleMergePartitionsInfo(int shuffleMergeId, boolean shuffleFinalized) {
this.shuffleMergeId = shuffleMergeId;
- this.shuffleMergePartitions = shuffleFinalized ?
- INDETERMINATE_SHUFFLE_FINALIZED : new ConcurrentHashMap<>();
+ this.shuffleMergePartitions = shuffleFinalized ? SHUFFLE_FINALIZED_MARKER :
+ new ConcurrentHashMap<>();
}
@VisibleForTesting
public Map<Integer, AppShufflePartitionInfo> getShuffleMergePartitions() {
return shuffleMergePartitions;
}
+
+ public boolean isFinalized() {
+ return shuffleMergePartitions == SHUFFLE_FINALIZED_MARKER;
+ }
}
/** Metadata tracked for an actively merged shuffle partition */
diff --git a/common/network-shuffle/src/test/java/org/apache/spark/network/shuffle/RemoteBlockPushResolverSuite.java b/common/network-shuffle/src/test/java/org/apache/spark/network/shuffle/RemoteBlockPushResolverSuite.java
index 3324b4e..c786047 100644
--- a/common/network-shuffle/src/test/java/org/apache/spark/network/shuffle/RemoteBlockPushResolverSuite.java
+++ b/common/network-shuffle/src/test/java/org/apache/spark/network/shuffle/RemoteBlockPushResolverSuite.java
@@ -1161,8 +1161,8 @@ public class RemoteBlockPushResolverSuite {
RemoteBlockPushResolver.AppShuffleInfo appShuffleInfo =
pushResolver.validateAndGetAppShuffleInfo(TEST_APP);
- assertTrue("Metadata of determinate shuffle should be removed after finalize shuffle"
- + " merge", appShuffleInfo.getShuffles().get(0) == null);
+ assertTrue("Determinate shuffle should be marked finalized",
+ appShuffleInfo.getShuffles().get(0).isFinalized());
validateMergeStatuses(statuses, new int[] {0}, new long[] {9});
MergedBlockMeta blockMeta = pushResolver.getMergedBlockMeta(TEST_APP, 0, 0, 0);
validateChunks(TEST_APP, 0, 0, 0, blockMeta, new int[]{4, 5}, new int[][]{{0}, {1}});
@@ -1287,6 +1287,79 @@ public class RemoteBlockPushResolverSuite {
+ " up", appShuffleInfo.getMergedShuffleDataFile(0, 4, 0).exists());
}
+ @Test
+ public void testFinalizationResultIsEmptyWhenTheServerDidNotReceiveAnyBlocks() {
+ //shuffle 1 0 is finalized even though the server didn't receive any blocks for it.
+ MergeStatuses statuses = pushResolver.finalizeShuffleMerge(
+ new FinalizeShuffleMerge(TEST_APP, NO_ATTEMPT_ID, 1, 0));
+ assertEquals("no partitions were merged", 0, statuses.reduceIds.length);
+ RemoteBlockPushResolver.AppShuffleInfo appShuffleInfo =
+ pushResolver.validateAndGetAppShuffleInfo(TEST_APP);
+ assertTrue("shuffle 1 should be marked finalized",
+ appShuffleInfo.getShuffles().get(1).isFinalized());
+ removeApplication(TEST_APP);
+ }
+
+ // Test for SPARK-37675 and SPARK-37793
+ @Test
+ public void testEmptyMergePartitionsAreNotReported() throws IOException {
+ //shufflePush_1_0_0_100 is received by the server
+ StreamCallbackWithID stream1 = pushResolver.receiveBlockDataAsStream(
+ new PushBlockStream(TEST_APP, NO_ATTEMPT_ID, 1, 0, 0, 100, 0));
+ stream1.onData(stream1.getID(), ByteBuffer.wrap(new byte[4]));
+ //shuffle 1 0 is finalized
+ MergeStatuses statuses = pushResolver.finalizeShuffleMerge(
+ new FinalizeShuffleMerge(TEST_APP, NO_ATTEMPT_ID, 1, 0));
+ assertEquals("no partitions were merged", 0, statuses.reduceIds.length);
+ removeApplication(TEST_APP);
+ }
+
+ // Test for SPARK-37675 and SPARK-37793
+ @Test
+ public void testAllBlocksAreRejectedWhenReceivedAfterFinalization() throws IOException {
+ //shufflePush_1_0_0_100 is received by the server
+ StreamCallbackWithID stream1 = pushResolver.receiveBlockDataAsStream(
+ new PushBlockStream(TEST_APP, NO_ATTEMPT_ID, 1, 0, 0, 100, 0));
+ stream1.onData(stream1.getID(), ByteBuffer.wrap(new byte[4]));
+ stream1.onComplete(stream1.getID());
+ //shuffle 1 0 is finalized
+ pushResolver.finalizeShuffleMerge(new FinalizeShuffleMerge(TEST_APP, NO_ATTEMPT_ID, 1, 0));
+ BlockPushNonFatalFailure errorToValidate = null;
+ try {
+ //shufflePush_1_0_0_200 is received by the server after finalization of shuffle 1 0 which
+ //should be rejected
+ StreamCallbackWithID failureCallback = pushResolver.receiveBlockDataAsStream(
+ new PushBlockStream(TEST_APP, NO_ATTEMPT_ID, 1, 0, 0, 200, 0));
+ failureCallback.onComplete(failureCallback.getID());
+ } catch (BlockPushNonFatalFailure e) {
+ BlockPushReturnCode errorCode =
+ (BlockPushReturnCode) BlockTransferMessage.Decoder.fromByteBuffer(e.getResponse());
+ assertEquals(BlockPushNonFatalFailure.ReturnCode.TOO_LATE_BLOCK_PUSH.id(),
+ errorCode.returnCode);
+ errorToValidate = e;
+ assertEquals(errorCode.failureBlockId, "shufflePush_1_0_0_200");
+ }
+ assertNotNull("shufflePush_1_0_0_200 should be rejected", errorToValidate);
+ try {
+ //shufflePush_1_0_1_100 is received by the server after finalization of shuffle 1 0 which
+ //should also be rejected
+ StreamCallbackWithID failureCallback = pushResolver.receiveBlockDataAsStream(
+ new PushBlockStream(TEST_APP, NO_ATTEMPT_ID, 1, 0, 1, 100, 0));
+ failureCallback.onComplete(failureCallback.getID());
+ } catch (BlockPushNonFatalFailure e) {
+ BlockPushReturnCode errorCode =
+ (BlockPushReturnCode) BlockTransferMessage.Decoder.fromByteBuffer(e.getResponse());
+ assertEquals(BlockPushNonFatalFailure.ReturnCode.TOO_LATE_BLOCK_PUSH.id(),
+ errorCode.returnCode);
+ errorToValidate = e;
+ assertEquals(errorCode.failureBlockId, "shufflePush_1_0_1_100");
+ }
+ assertNotNull("shufflePush_1_0_1_100 should be rejected", errorToValidate);
+ MergedBlockMeta blockMeta = pushResolver.getMergedBlockMeta(TEST_APP, 1, 0, 100);
+ validateChunks(TEST_APP, 1, 0, 100, blockMeta, new int[]{4}, new int[][]{{0}});
+ removeApplication(TEST_APP);
+ }
+
private void useTestFiles(boolean useTestIndexFile, boolean useTestMetaFile) throws IOException {
pushResolver = new RemoteBlockPushResolver(conf) {
@Override
---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@spark.apache.org
For additional commands, e-mail: commits-help@spark.apache.org