You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@uniffle.apache.org by ro...@apache.org on 2022/11/29 08:19:54 UTC

[incubator-uniffle] branch master updated: [ISSUE-369] Don't throw exception if blocks are corrupted but have multi replicas (#374)

This is an automated email from the ASF dual-hosted git repository.

roryqi pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-uniffle.git


The following commit(s) were added to refs/heads/master by this push:
     new c503f8f8 [ISSUE-369] Don't throw exception if blocks are corrupted but have multi replicas (#374)
c503f8f8 is described below

commit c503f8f8ee522f67551652c5821cc1a7feb92ef7
Author: xianjingfeng <58...@qq.com>
AuthorDate: Tue Nov 29 16:19:49 2022 +0800

    [ISSUE-369] Don't throw exception if blocks are corrupted but have multi replicas (#374)
    
    ### What changes were proposed in this pull request?
    Don't throw exception if blocks are corrupted but have multi replicas
    
    ### Why are the changes needed?
    If some blocks of one replica are corrupted, maybe other replicas are not corrupted, so exception should not be thrown here if blocks have multi replicas. #369
    
    ### Does this PR introduce _any_ user-facing change?
    No
    
    ### How was this patch tested?
    UT
---
 .../shuffle/reader/RssShuffleDataIteratorTest.java | 13 ++++++-
 .../uniffle/client/impl/ShuffleReadClientImpl.java | 44 ++++++++++++++--------
 .../client/impl/ShuffleReadClientImplTest.java     |  9 +++++
 3 files changed, 49 insertions(+), 17 deletions(-)

diff --git a/client-spark/common/src/test/java/org/apache/spark/shuffle/reader/RssShuffleDataIteratorTest.java b/client-spark/common/src/test/java/org/apache/spark/shuffle/reader/RssShuffleDataIteratorTest.java
index 77d6b31c..9983d9a0 100644
--- a/client-spark/common/src/test/java/org/apache/spark/shuffle/reader/RssShuffleDataIteratorTest.java
+++ b/client-spark/common/src/test/java/org/apache/spark/shuffle/reader/RssShuffleDataIteratorTest.java
@@ -239,10 +239,12 @@ public class RssShuffleDataIteratorTest extends AbstractRssReaderTest {
 
     RssShuffleDataIterator rssShuffleDataIterator = getDataIterator(basePath, blockIdBitmap,
         taskIdBitmap, Lists.newArrayList(ssi1));
-
+    RssShuffleDataIterator rssShuffleDataIterator2 = getDataIterator(basePath, blockIdBitmap,
+        taskIdBitmap, Lists.newArrayList(ssi1, ssi2));
     // crc32 is incorrect
     try (MockedStatic<ChecksumUtils> checksumUtilsMock = Mockito.mockStatic(ChecksumUtils.class)) {
       checksumUtilsMock.when(() -> ChecksumUtils.getCrc32((ByteBuffer) any())).thenReturn(-1L);
+
       try {
         while (rssShuffleDataIterator.hasNext()) {
           rssShuffleDataIterator.next();
@@ -251,6 +253,15 @@ public class RssShuffleDataIteratorTest extends AbstractRssReaderTest {
       } catch (Exception e) {
         assertTrue(e.getMessage().startsWith("Unexpected crc value"));
       }
+
+      try {
+        while (rssShuffleDataIterator2.hasNext()) {
+          rssShuffleDataIterator2.next();
+        }
+        fail(EXPECTED_EXCEPTION_MESSAGE);
+      } catch (Exception e) {
+        assertTrue(e.getMessage().startsWith("Blocks read inconsistent"));
+      }
     }
   }
 
diff --git a/client/src/main/java/org/apache/uniffle/client/impl/ShuffleReadClientImpl.java b/client/src/main/java/org/apache/uniffle/client/impl/ShuffleReadClientImpl.java
index 8b6b0d06..a6de15b9 100644
--- a/client/src/main/java/org/apache/uniffle/client/impl/ShuffleReadClientImpl.java
+++ b/client/src/main/java/org/apache/uniffle/client/impl/ShuffleReadClientImpl.java
@@ -47,6 +47,7 @@ import org.apache.uniffle.storage.request.CreateShuffleReadHandlerRequest;
 public class ShuffleReadClientImpl implements ShuffleReadClient {
 
   private static final Logger LOG = LoggerFactory.getLogger(ShuffleReadClientImpl.class);
+  private final List<ShuffleServerInfo> shuffleServerInfoList;
   private int shuffleId;
   private int partitionId;
   private byte[] readBuffer;
@@ -83,6 +84,7 @@ public class ShuffleReadClientImpl implements ShuffleReadClient {
     this.blockIdBitmap = blockIdBitmap;
     this.taskIdBitmap = taskIdBitmap;
     this.idHelper = idHelper;
+    this.shuffleServerInfoList = shuffleServerInfoList;
 
     CreateShuffleReadHandlerRequest request = new CreateShuffleReadHandlerRequest();
     request.setStorageType(storageType);
@@ -177,6 +179,32 @@ public class ShuffleReadClientImpl implements ShuffleReadClient {
       if (!processedBlockIds.contains(bs.getBlockId())
           && blockIdBitmap.contains(bs.getBlockId())
           && taskIdBitmap.contains(bs.getTaskAttemptId())) {
+        long expectedCrc = -1;
+        long actualCrc = -1;
+        try {
+          long start = System.currentTimeMillis();
+          copyTime.addAndGet(System.currentTimeMillis() - start);
+          start = System.currentTimeMillis();
+          expectedCrc = bs.getCrc();
+          actualCrc = ChecksumUtils.getCrc32(readBuffer, bs.getOffset(), bs.getLength());
+          crcCheckTime.addAndGet(System.currentTimeMillis() - start);
+        } catch (Exception e) {
+          LOG.warn("Can't read data for blockId[" + bs.getBlockId() + "]", e);
+        }
+
+        if (expectedCrc != actualCrc) {
+          String errMsg = "Unexpected crc value for blockId[" + bs.getBlockId()
+              + "], expected:" + expectedCrc + ", actual:" + actualCrc;
+          //If some blocks of one replica are corrupted,but maybe other replicas are not corrupted,
+          //so exception should not be thrown here if blocks have multiple replicas
+          if (shuffleServerInfoList.size() > 1) {
+            LOG.warn(errMsg);
+            continue;
+          } else {
+            throw new RssException(errMsg);
+          }
+        }
+
         // mark block as processed
         processedBlockIds.addLong(bs.getBlockId());
         pendingBlockIds.removeLong(bs.getBlockId());
@@ -190,22 +218,6 @@ public class ShuffleReadClientImpl implements ShuffleReadClient {
     }
 
     if (bs != null) {
-      long expectedCrc = -1;
-      long actualCrc = -1;
-      try {
-        long start = System.currentTimeMillis();
-        copyTime.addAndGet(System.currentTimeMillis() - start);
-        start = System.currentTimeMillis();
-        expectedCrc = bs.getCrc();
-        actualCrc = ChecksumUtils.getCrc32(readBuffer, bs.getOffset(), bs.getLength());
-        crcCheckTime.addAndGet(System.currentTimeMillis() - start);
-      } catch (Exception e) {
-        LOG.warn("Can't read data for blockId[" + bs.getBlockId() + "]", e);
-      }
-      if (expectedCrc != actualCrc) {
-        throw new RssException("Unexpected crc value for blockId[" + bs.getBlockId()
-            + "], expected:" + expectedCrc + ", actual:" + actualCrc);
-      }
       return new CompressedShuffleBlock(ByteBuffer.wrap(readBuffer,
           bs.getOffset(), bs.getLength()), bs.getUncompressLength());
     }
diff --git a/client/src/test/java/org/apache/uniffle/client/impl/ShuffleReadClientImplTest.java b/client/src/test/java/org/apache/uniffle/client/impl/ShuffleReadClientImplTest.java
index e2f41536..3ca35ebb 100644
--- a/client/src/test/java/org/apache/uniffle/client/impl/ShuffleReadClientImplTest.java
+++ b/client/src/test/java/org/apache/uniffle/client/impl/ShuffleReadClientImplTest.java
@@ -35,6 +35,7 @@ import org.roaringbitmap.longlong.LongIterator;
 import org.roaringbitmap.longlong.Roaring64NavigableMap;
 
 import org.apache.uniffle.client.TestUtils;
+import org.apache.uniffle.client.response.CompressedShuffleBlock;
 import org.apache.uniffle.client.util.DefaultIdHelper;
 import org.apache.uniffle.common.ShufflePartitionedBlock;
 import org.apache.uniffle.common.ShuffleServerInfo;
@@ -257,6 +258,10 @@ public class ShuffleReadClientImplTest extends HdfsTestBase {
     ShuffleReadClientImpl readClient = new ShuffleReadClientImpl(StorageType.HDFS.name(),
         "appId", 0, 1, 100, 2, 10, 1000,
         basePath, blockIdBitmap, taskIdBitmap, Lists.newArrayList(ssi1), new Configuration(), new DefaultIdHelper());
+    ShuffleReadClientImpl readClient2 = new ShuffleReadClientImpl(StorageType.HDFS.name(),
+        "appId", 0, 1, 100, 2, 10, 1000,
+        basePath, blockIdBitmap, taskIdBitmap, Lists.newArrayList(ssi1, ssi2),
+        new Configuration(), new DefaultIdHelper());
     // crc32 is incorrect
     try (MockedStatic<ChecksumUtils> checksumUtilsMock = Mockito.mockStatic(ChecksumUtils.class)) {
       checksumUtilsMock.when(() -> ChecksumUtils.getCrc32((ByteBuffer) any())).thenReturn(-1L);
@@ -269,8 +274,12 @@ public class ShuffleReadClientImplTest extends HdfsTestBase {
       } catch (Exception e) {
         assertTrue(e.getMessage().startsWith("Unexpected crc value"));
       }
+
+      CompressedShuffleBlock block = readClient2.readShuffleBlockData();
+      assertNull(block);
     }
     readClient.close();
+    readClient2.close();
   }
 
   @Test