You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@spark.apache.org by mr...@apache.org on 2022/01/29 03:14:43 UTC

[spark] branch branch-3.2 updated: [SPARK-37675][SPARK-37793] Prevent overwriting of push shuffle merged files once the shuffle is finalized

This is an automated email from the ASF dual-hosted git repository.

mridulm80 pushed a commit to branch branch-3.2
in repository https://gitbox.apache.org/repos/asf/spark.git


The following commit(s) were added to refs/heads/branch-3.2 by this push:
     new 837c6b4  [SPARK-37675][SPARK-37793] Prevent overwriting of push shuffle merged files once the shuffle is finalized
837c6b4 is described below

commit 837c6b436745cb9fb8411b30e0a26144a46a3589
Author: Chandni Singh <si...@gmail.com>
AuthorDate: Fri Jan 28 21:12:28 2022 -0600

    [SPARK-37675][SPARK-37793] Prevent overwriting of push shuffle merged files once the shuffle is finalized
    
    ### What changes were proposed in this pull request?
    This fixes the bugs that were reported in SPARK-37675 and SPARK-37793.
    - Empty merged partitions were reported by the shuffle server to the driver.
    - The push merged files were getting overwritten after a shuffle merge is finalized.
    - Throwing exception in the finalization of a shuffle for which the shuffle server didn't receive any blocks.
    
    ### Why are the changes needed?
    Changes are need to fix the bug.
    
    ### Does this PR introduce _any_ user-facing change?
    No
    
    ### How was this patch tested?
    Have added unit test.
    
    Closes #35325 from otterc/SPARK-37675.
    
    Authored-by: Chandni Singh <si...@gmail.com>
    Signed-off-by: Mridul Muralidharan <mridul<at>gmail.com>
    (cherry picked from commit 9afb407fa7aaf2f0961661b5d8cfbec549e591ee)
    Signed-off-by: Mridul Muralidharan <mridulatgmail.com>
---
 .../network/shuffle/RemoteBlockPushResolver.java   | 171 ++++++++++-----------
 .../shuffle/RemoteBlockPushResolverSuite.java      |  77 +++++++++-
 2 files changed, 158 insertions(+), 90 deletions(-)

diff --git a/common/network-shuffle/src/main/java/org/apache/spark/network/shuffle/RemoteBlockPushResolver.java b/common/network-shuffle/src/main/java/org/apache/spark/network/shuffle/RemoteBlockPushResolver.java
index d04db67..c1d4d5b 100644
--- a/common/network-shuffle/src/main/java/org/apache/spark/network/shuffle/RemoteBlockPushResolver.java
+++ b/common/network-shuffle/src/main/java/org/apache/spark/network/shuffle/RemoteBlockPushResolver.java
@@ -82,20 +82,11 @@ public class RemoteBlockPushResolver implements MergedShuffleFileManager {
   public static final String MERGE_DIR_KEY = "mergeDir";
   public static final String ATTEMPT_ID_KEY = "attemptId";
   private static final int UNDEFINED_ATTEMPT_ID = -1;
-  // Shuffles of determinate stages will have shuffleMergeId set to 0
-  private static final int DETERMINATE_SHUFFLE_MERGE_ID = 0;
   private static final ErrorHandler.BlockPushErrorHandler ERROR_HANDLER = createErrorHandler();
   // ByteBuffer to respond to client upon a successful merge of a pushed block
   private static final ByteBuffer SUCCESS_RESPONSE =
     new BlockPushReturnCode(ReturnCode.SUCCESS.id(), "").toByteBuffer().asReadOnlyBuffer();
 
-  // ConcurrentHashMap doesn't allow null for keys or values which is why this is required.
-  // Marker to identify finalized indeterminate shuffle partitions in the case of indeterminate
-  // stage retries.
-  @VisibleForTesting
-  public static final Map<Integer, AppShufflePartitionInfo> INDETERMINATE_SHUFFLE_FINALIZED =
-    Collections.emptyMap();
-
   /**
    * A concurrent hashmap where the key is the applicationId, and the value includes
    * all the merged shuffle information for this application. AppShuffleInfo stores
@@ -168,59 +159,45 @@ public class RemoteBlockPushResolver implements MergedShuffleFileManager {
       String blockId) throws BlockPushNonFatalFailure {
     ConcurrentMap<Integer, AppShuffleMergePartitionsInfo> shuffles = appShuffleInfo.shuffles;
     AppShuffleMergePartitionsInfo shufflePartitionsWithMergeId =
-      shuffles.compute(shuffleId, (id, appShuffleMergePartitionsInfo) -> {
-        if (appShuffleMergePartitionsInfo == null) {
-          File dataFile =
-            appShuffleInfo.getMergedShuffleDataFile(shuffleId, shuffleMergeId, reduceId);
-          // If this partition is already finalized then the partitions map will not contain the
-          // shuffleId for determinate stages but the data file would exist.
-          // In that case the block is considered late. In the case of indeterminate stages, most
-          // recent shuffleMergeId finalized would be pointing to INDETERMINATE_SHUFFLE_FINALIZED
-          if (dataFile.exists()) {
-            throw new BlockPushNonFatalFailure(new BlockPushReturnCode(
-              ReturnCode.TOO_LATE_BLOCK_PUSH.id(), blockId).toByteBuffer(),
-              BlockPushNonFatalFailure.getErrorMsg(blockId, ReturnCode.TOO_LATE_BLOCK_PUSH));
-          } else {
-            logger.info("Creating a new attempt for shuffle blocks push request for shuffle {}"
-              + " with shuffleMergeId {} for application {}_{}", shuffleId, shuffleMergeId,
-              appShuffleInfo.appId, appShuffleInfo.attemptId);
-            return new AppShuffleMergePartitionsInfo(shuffleMergeId, false);
-          }
+      shuffles.compute(shuffleId, (id, mergePartitionsInfo) -> {
+        if (mergePartitionsInfo == null) {
+          logger.info("{} attempt {} shuffle {} shuffleMerge {}: creating a new shuffle " +
+              "merge metadata", appShuffleInfo.appId, appShuffleInfo.attemptId, shuffleId,
+              shuffleMergeId);
+          return new AppShuffleMergePartitionsInfo(shuffleMergeId, false);
         } else {
-          // Reject the request as we have already seen a higher shuffleMergeId than the
-          // current incoming one
-          int latestShuffleMergeId = appShuffleMergePartitionsInfo.shuffleMergeId;
+          int latestShuffleMergeId = mergePartitionsInfo.shuffleMergeId;
           if (latestShuffleMergeId > shuffleMergeId) {
+            // Reject the request as we have already seen a higher shuffleMergeId than the one
+            // in the current request.
             throw new BlockPushNonFatalFailure(
               new BlockPushReturnCode(ReturnCode.STALE_BLOCK_PUSH.id(), blockId).toByteBuffer(),
               BlockPushNonFatalFailure.getErrorMsg(blockId, ReturnCode.STALE_BLOCK_PUSH));
-          } else if (latestShuffleMergeId == shuffleMergeId) {
-            return appShuffleMergePartitionsInfo;
-          } else {
+          } else if (latestShuffleMergeId < shuffleMergeId){
             // Higher shuffleMergeId seen for the shuffle ID meaning new stage attempt is being
             // run for the shuffle ID. Close and clean up old shuffleMergeId files,
             // happens in the indeterminate stage retries
-            logger.info("Creating a new attempt for shuffle blocks push request for shuffle {}"
-              + " with shuffleMergeId {} for application {}_{} since it is higher than the"
-              + " latest shuffleMergeId {} already seen", shuffleId, shuffleMergeId,
-              appShuffleInfo.appId, appShuffleInfo.attemptId, latestShuffleMergeId);
+            logger.info("{} attempt {} shuffle {} shuffleMerge {}: creating a new shuffle " +
+                "merge metadata since received shuffleMergeId is higher than latest " +
+                "shuffleMergeId {}", appShuffleInfo.appId, appShuffleInfo.attemptId, shuffleId,
+                shuffleMergeId, latestShuffleMergeId);
             mergedShuffleCleaner.execute(() ->
-              closeAndDeletePartitionFiles(appShuffleMergePartitionsInfo.shuffleMergePartitions));
+                closeAndDeletePartitionFiles(mergePartitionsInfo.shuffleMergePartitions));
             return new AppShuffleMergePartitionsInfo(shuffleMergeId, false);
+          } else {
+            // The request is for block with same shuffleMergeId as the latest shuffleMergeId
+            if (mergePartitionsInfo.isFinalized()) {
+              throw new BlockPushNonFatalFailure(
+                  new BlockPushReturnCode(
+                      ReturnCode.TOO_LATE_BLOCK_PUSH.id(), blockId).toByteBuffer(),
+                  BlockPushNonFatalFailure.getErrorMsg(blockId, ReturnCode.TOO_LATE_BLOCK_PUSH));
+            }
+            return mergePartitionsInfo;
           }
         }
       });
-
-    // It only gets here when the shuffle is already finalized.
-    if (null == shufflePartitionsWithMergeId ||
-        INDETERMINATE_SHUFFLE_FINALIZED == shufflePartitionsWithMergeId.shuffleMergePartitions) {
-      throw new BlockPushNonFatalFailure(
-        new BlockPushReturnCode(ReturnCode.TOO_LATE_BLOCK_PUSH.id(), blockId).toByteBuffer(),
-        BlockPushNonFatalFailure.getErrorMsg(blockId, ReturnCode.TOO_LATE_BLOCK_PUSH));
-    }
-
     Map<Integer, AppShufflePartitionInfo> shuffleMergePartitions =
-      shufflePartitionsWithMergeId.shuffleMergePartitions;
+        shufflePartitionsWithMergeId.shuffleMergePartitions;
     return shuffleMergePartitions.computeIfAbsent(reduceId, key -> {
       // It only gets here when the key is not present in the map. The first time the merge
       // manager receives a pushed block for a given application shuffle partition.
@@ -234,9 +211,9 @@ public class RemoteBlockPushResolver implements MergedShuffleFileManager {
         return newAppShufflePartitionInfo(appShuffleInfo.appId, shuffleId, shuffleMergeId,
           reduceId, dataFile, indexFile, metaFile);
       } catch (IOException e) {
-        logger.error(
-          "Cannot create merged shuffle partition with data file {}, index file {}, and "
-            + "meta file {}", dataFile.getAbsolutePath(),
+        logger.error("{} attempt {} shuffle {} shuffleMerge {}: cannot create merged shuffle " +
+            "partition with data file {}, index file {}, and meta file {}", appShuffleInfo.appId,
+            appShuffleInfo.attemptId, shuffleId, shuffleMergeId, dataFile.getAbsolutePath(),
             indexFile.getAbsolutePath(), metaFile.getAbsolutePath());
         throw new RuntimeException(
           String.format("Cannot initialize merged shuffle partition for appId %s shuffleId %s "
@@ -349,6 +326,7 @@ public class RemoteBlockPushResolver implements MergedShuffleFileManager {
    * If cleanupLocalDirs is true, the merged shuffle files will also be deleted.
    * The cleanup will be executed in a separate thread.
    */
+  @SuppressWarnings("SynchronizationOnLocalVariableOrMethodParameter")
   @VisibleForTesting
   void closeAndDeletePartitionFilesIfNeeded(
       AppShuffleInfo appShuffleInfo,
@@ -511,10 +489,11 @@ public class RemoteBlockPushResolver implements MergedShuffleFileManager {
     }
   }
 
+  @SuppressWarnings("SynchronizationOnLocalVariableOrMethodParameter")
   @Override
   public MergeStatuses finalizeShuffleMerge(FinalizeShuffleMerge msg) {
-    logger.info("Finalizing shuffle {} with shuffleMergeId {} from Application {}_{}.",
-      msg.shuffleId, msg.shuffleMergeId, msg.appId, msg.appAttemptId);
+    logger.info("{} attempt {} shuffle {} shuffleMerge {}: finalize shuffle merge",
+        msg.appId, msg.appAttemptId, msg.shuffleId, msg.shuffleMergeId);
     AppShuffleInfo appShuffleInfo = validateAndGetAppShuffleInfo(msg.appId);
     if (appShuffleInfo.attemptId != msg.appAttemptId) {
       // If finalizeShuffleMerge from a former application attempt, it is considered late,
@@ -533,35 +512,33 @@ public class RemoteBlockPushResolver implements MergedShuffleFileManager {
     }
     AtomicReference<Map<Integer, AppShufflePartitionInfo>> shuffleMergePartitionsRef =
       new AtomicReference<>(null);
-    // Metadata of the determinate stage shuffle can be safely removed as part of finalizing
-    // shuffle merge. Currently once the shuffle is finalized for a determinate stages, retry
-    // stages of the same shuffle will have shuffle push disabled.
-    if (msg.shuffleMergeId == DETERMINATE_SHUFFLE_MERGE_ID) {
-      AppShuffleMergePartitionsInfo appShuffleMergePartitionsInfo =
-        appShuffleInfo.shuffles.remove(msg.shuffleId);
-      if (appShuffleMergePartitionsInfo != null) {
-        shuffleMergePartitionsRef.set(appShuffleMergePartitionsInfo.shuffleMergePartitions);
-      }
-    } else {
-      appShuffleInfo.shuffles.compute(msg.shuffleId, (id, value) -> {
-        if (null == value || msg.shuffleMergeId < value.shuffleMergeId ||
-          INDETERMINATE_SHUFFLE_FINALIZED == value.shuffleMergePartitions) {
+    appShuffleInfo.shuffles.compute(msg.shuffleId, (shuffleId, mergePartitionsInfo) -> {
+      if (null != mergePartitionsInfo) {
+        if (msg.shuffleMergeId < mergePartitionsInfo.shuffleMergeId ||
+          mergePartitionsInfo.isFinalized()) {
           throw new RuntimeException(String.format(
-            "Shuffle merge finalize request for shuffle %s with" + " shuffleMergeId %s is %s",
-            msg.shuffleId, msg.shuffleMergeId,
-            ErrorHandler.BlockPushErrorHandler.STALE_SHUFFLE_FINALIZE_SUFFIX));
-        } else if (msg.shuffleMergeId > value.shuffleMergeId) {
+              "Shuffle merge finalize request for shuffle %s with" + " shuffleMergeId %s is %s",
+              msg.shuffleId, msg.shuffleMergeId,
+              ErrorHandler.BlockPushErrorHandler.STALE_SHUFFLE_FINALIZE_SUFFIX));
+        } else if (msg.shuffleMergeId > mergePartitionsInfo.shuffleMergeId) {
           // If no blocks pushed for the finalizeShuffleMerge shuffleMergeId then return
           // empty MergeStatuses but cleanup the older shuffleMergeId files.
           mergedShuffleCleaner.execute(() ->
-            closeAndDeletePartitionFiles(value.shuffleMergePartitions));
-          return new AppShuffleMergePartitionsInfo(msg.shuffleMergeId, true);
+              closeAndDeletePartitionFiles(mergePartitionsInfo.shuffleMergePartitions));
         } else {
-          shuffleMergePartitionsRef.set(value.shuffleMergePartitions);
-          return new AppShuffleMergePartitionsInfo(msg.shuffleMergeId, true);
+          // This block covers:
+          //  1. finalization of determinate stage
+          //  2. finalization of indeterminate stage if the shuffleMergeId related to it is the one
+          //  for which the message is received.
+          shuffleMergePartitionsRef.set(mergePartitionsInfo.shuffleMergePartitions);
         }
-      });
-    }
+      }
+      // Even when the mergePartitionsInfo is null, we mark the shuffle as finalized but the results
+      // sent to the driver will be empty. This cam happen when the service didn't receive any
+      // blocks for the shuffle yet and the driver didn't wait for enough time to finalize the
+      // shuffle.
+      return new AppShuffleMergePartitionsInfo(msg.shuffleMergeId, true);
+    });
     Map<Integer, AppShufflePartitionInfo> shuffleMergePartitions = shuffleMergePartitionsRef.get();
     MergeStatuses mergeStatuses;
     if (null == shuffleMergePartitions || shuffleMergePartitions.isEmpty()) {
@@ -575,14 +552,25 @@ public class RemoteBlockPushResolver implements MergedShuffleFileManager {
       for (AppShufflePartitionInfo partition: shuffleMergePartitions.values()) {
         synchronized (partition) {
           try {
+            logger.debug("{} attempt {} shuffle {} shuffleMerge {}: finalizing shuffle " +
+                "partition {} ", msg.appId, msg.appAttemptId, msg.shuffleId,
+                msg.shuffleMergeId, partition.reduceId);
             // This can throw IOException which will marks this shuffle partition as not merged.
             partition.finalizePartition();
-            bitmaps.add(partition.mapTracker);
-            reduceIds.add(partition.reduceId);
-            sizes.add(partition.getLastChunkOffset());
+            if (partition.mapTracker.getCardinality() > 0) {
+              bitmaps.add(partition.mapTracker);
+              reduceIds.add(partition.reduceId);
+              sizes.add(partition.getLastChunkOffset());
+              logger.debug("{} attempt {} shuffle {} shuffleMerge {}: finalization results " +
+                  "added for partition {} data size {} index size {} meta size {}",
+                  msg.appId, msg.appAttemptId, msg.shuffleId,
+                  msg.shuffleMergeId, partition.reduceId, partition.getLastChunkOffset(),
+                  partition.indexFile.getPos(), partition.metaFile.getPos());
+            }
           } catch (IOException ioe) {
-            logger.warn("Exception while finalizing shuffle partition {}_{} {} {}", msg.appId,
-              msg.appAttemptId, msg.shuffleId, partition.reduceId, ioe);
+            logger.warn("{} attempt {} shuffle {} shuffleMerge {}: exception while " +
+                "finalizing shuffle partition {}", msg.appId, msg.appAttemptId, msg.shuffleId,
+                msg.shuffleMergeId, partition.reduceId);
           } finally {
             partition.closeAllFilesAndDeleteIfNeeded(false);
           }
@@ -592,8 +580,8 @@ public class RemoteBlockPushResolver implements MergedShuffleFileManager {
         bitmaps.toArray(new RoaringBitmap[bitmaps.size()]), Ints.toArray(reduceIds),
         Longs.toArray(sizes));
     }
-    logger.info("Finalized shuffle {} with shuffleMergeId {} from Application {}_{}.",
-      msg.shuffleId, msg.shuffleMergeId, msg.appId, msg.appAttemptId);
+    logger.info("{} attempt {} shuffle {} shuffleMerge {}: finalization of shuffle merge completed",
+        msg.appId, msg.appAttemptId, msg.shuffleId, msg.shuffleMergeId);
     return mergeStatuses;
   }
 
@@ -806,7 +794,7 @@ public class RemoteBlockPushResolver implements MergedShuffleFileManager {
         AppShuffleMergePartitionsInfo appShuffleMergePartitionsInfo,
         int reduceId) {
       return null == appShuffleMergePartitionsInfo ||
-        INDETERMINATE_SHUFFLE_FINALIZED == appShuffleMergePartitionsInfo.shuffleMergePartitions ||
+        appShuffleMergePartitionsInfo.isFinalized() ||
         !appShuffleMergePartitionsInfo.shuffleMergePartitions.containsKey(reduceId);
     }
 
@@ -1006,20 +994,27 @@ public class RemoteBlockPushResolver implements MergedShuffleFileManager {
    * required for the shuffles of indeterminate stages.
    */
   public static class AppShuffleMergePartitionsInfo {
+    // ConcurrentHashMap doesn't allow null for keys or values which is why this is required.
+    // Marker to identify finalized shuffle partitions.
+    private static final Map<Integer, AppShufflePartitionInfo> SHUFFLE_FINALIZED_MARKER =
+        Collections.emptyMap();
     private final int shuffleMergeId;
     private final Map<Integer, AppShufflePartitionInfo> shuffleMergePartitions;
 
-    public AppShuffleMergePartitionsInfo(
-        int shuffleMergeId, boolean shuffleFinalized) {
+    public AppShuffleMergePartitionsInfo(int shuffleMergeId, boolean shuffleFinalized) {
       this.shuffleMergeId = shuffleMergeId;
-      this.shuffleMergePartitions = shuffleFinalized ?
-        INDETERMINATE_SHUFFLE_FINALIZED : new ConcurrentHashMap<>();
+      this.shuffleMergePartitions = shuffleFinalized ? SHUFFLE_FINALIZED_MARKER :
+          new ConcurrentHashMap<>();
     }
 
     @VisibleForTesting
     public Map<Integer, AppShufflePartitionInfo> getShuffleMergePartitions() {
       return shuffleMergePartitions;
     }
+
+    public boolean isFinalized() {
+      return shuffleMergePartitions == SHUFFLE_FINALIZED_MARKER;
+    }
   }
 
   /** Metadata tracked for an actively merged shuffle partition */
diff --git a/common/network-shuffle/src/test/java/org/apache/spark/network/shuffle/RemoteBlockPushResolverSuite.java b/common/network-shuffle/src/test/java/org/apache/spark/network/shuffle/RemoteBlockPushResolverSuite.java
index 3324b4e..c786047 100644
--- a/common/network-shuffle/src/test/java/org/apache/spark/network/shuffle/RemoteBlockPushResolverSuite.java
+++ b/common/network-shuffle/src/test/java/org/apache/spark/network/shuffle/RemoteBlockPushResolverSuite.java
@@ -1161,8 +1161,8 @@ public class RemoteBlockPushResolverSuite {
 
     RemoteBlockPushResolver.AppShuffleInfo appShuffleInfo =
       pushResolver.validateAndGetAppShuffleInfo(TEST_APP);
-    assertTrue("Metadata of determinate shuffle should be removed after finalize shuffle"
-      + " merge", appShuffleInfo.getShuffles().get(0) == null);
+    assertTrue("Determinate shuffle should be marked finalized",
+        appShuffleInfo.getShuffles().get(0).isFinalized());
     validateMergeStatuses(statuses, new int[] {0}, new long[] {9});
     MergedBlockMeta blockMeta = pushResolver.getMergedBlockMeta(TEST_APP, 0, 0, 0);
     validateChunks(TEST_APP, 0, 0, 0, blockMeta, new int[]{4, 5}, new int[][]{{0}, {1}});
@@ -1287,6 +1287,79 @@ public class RemoteBlockPushResolverSuite {
       + " up", appShuffleInfo.getMergedShuffleDataFile(0, 4, 0).exists());
   }
 
+  @Test
+  public void testFinalizationResultIsEmptyWhenTheServerDidNotReceiveAnyBlocks() {
+    //shuffle 1 0 is finalized even though the server didn't receive any blocks for it.
+    MergeStatuses statuses = pushResolver.finalizeShuffleMerge(
+        new FinalizeShuffleMerge(TEST_APP, NO_ATTEMPT_ID, 1, 0));
+    assertEquals("no partitions were merged", 0, statuses.reduceIds.length);
+    RemoteBlockPushResolver.AppShuffleInfo appShuffleInfo =
+        pushResolver.validateAndGetAppShuffleInfo(TEST_APP);
+    assertTrue("shuffle 1 should be marked finalized",
+        appShuffleInfo.getShuffles().get(1).isFinalized());
+    removeApplication(TEST_APP);
+  }
+
+  // Test for SPARK-37675 and SPARK-37793
+  @Test
+  public void testEmptyMergePartitionsAreNotReported() throws IOException {
+    //shufflePush_1_0_0_100 is received by the server
+    StreamCallbackWithID stream1 = pushResolver.receiveBlockDataAsStream(
+        new PushBlockStream(TEST_APP, NO_ATTEMPT_ID, 1, 0, 0, 100, 0));
+    stream1.onData(stream1.getID(), ByteBuffer.wrap(new byte[4]));
+    //shuffle 1 0 is finalized
+    MergeStatuses statuses = pushResolver.finalizeShuffleMerge(
+        new FinalizeShuffleMerge(TEST_APP, NO_ATTEMPT_ID, 1, 0));
+    assertEquals("no partitions were merged", 0, statuses.reduceIds.length);
+    removeApplication(TEST_APP);
+  }
+
+  // Test for SPARK-37675 and SPARK-37793
+  @Test
+  public void testAllBlocksAreRejectedWhenReceivedAfterFinalization() throws IOException {
+    //shufflePush_1_0_0_100 is received by the server
+    StreamCallbackWithID stream1 = pushResolver.receiveBlockDataAsStream(
+        new PushBlockStream(TEST_APP, NO_ATTEMPT_ID, 1, 0, 0, 100, 0));
+    stream1.onData(stream1.getID(), ByteBuffer.wrap(new byte[4]));
+    stream1.onComplete(stream1.getID());
+    //shuffle 1 0 is finalized
+    pushResolver.finalizeShuffleMerge(new FinalizeShuffleMerge(TEST_APP, NO_ATTEMPT_ID, 1, 0));
+    BlockPushNonFatalFailure errorToValidate = null;
+    try {
+      //shufflePush_1_0_0_200 is received by the server after finalization of shuffle 1 0 which
+      //should be rejected
+      StreamCallbackWithID failureCallback = pushResolver.receiveBlockDataAsStream(
+          new PushBlockStream(TEST_APP, NO_ATTEMPT_ID, 1, 0, 0, 200, 0));
+      failureCallback.onComplete(failureCallback.getID());
+    } catch (BlockPushNonFatalFailure e) {
+      BlockPushReturnCode errorCode =
+          (BlockPushReturnCode) BlockTransferMessage.Decoder.fromByteBuffer(e.getResponse());
+      assertEquals(BlockPushNonFatalFailure.ReturnCode.TOO_LATE_BLOCK_PUSH.id(),
+          errorCode.returnCode);
+      errorToValidate = e;
+      assertEquals(errorCode.failureBlockId, "shufflePush_1_0_0_200");
+    }
+    assertNotNull("shufflePush_1_0_0_200 should be rejected", errorToValidate);
+    try {
+      //shufflePush_1_0_1_100 is received by the server after finalization of shuffle 1 0 which
+      //should also be rejected
+      StreamCallbackWithID failureCallback = pushResolver.receiveBlockDataAsStream(
+          new PushBlockStream(TEST_APP, NO_ATTEMPT_ID, 1, 0, 1, 100, 0));
+      failureCallback.onComplete(failureCallback.getID());
+    } catch (BlockPushNonFatalFailure e) {
+      BlockPushReturnCode errorCode =
+          (BlockPushReturnCode) BlockTransferMessage.Decoder.fromByteBuffer(e.getResponse());
+      assertEquals(BlockPushNonFatalFailure.ReturnCode.TOO_LATE_BLOCK_PUSH.id(),
+          errorCode.returnCode);
+      errorToValidate = e;
+      assertEquals(errorCode.failureBlockId, "shufflePush_1_0_1_100");
+    }
+    assertNotNull("shufflePush_1_0_1_100 should be rejected", errorToValidate);
+    MergedBlockMeta blockMeta = pushResolver.getMergedBlockMeta(TEST_APP, 1, 0, 100);
+    validateChunks(TEST_APP, 1, 0, 100, blockMeta, new int[]{4}, new int[][]{{0}});
+    removeApplication(TEST_APP);
+  }
+
   private void useTestFiles(boolean useTestIndexFile, boolean useTestMetaFile) throws IOException {
     pushResolver = new RemoteBlockPushResolver(conf) {
       @Override

---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@spark.apache.org
For additional commands, e-mail: commits-help@spark.apache.org