You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@uniffle.apache.org by ro...@apache.org on 2022/09/09 15:10:20 UTC

[incubator-uniffle] branch master updated: [MINOR][IMPROVEMENT] Return index-file size of n*SEGMENT_SIZE in HDFS reading (#204)

This is an automated email from the ASF dual-hosted git repository.

roryqi pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-uniffle.git


The following commit(s) were added to refs/heads/master by this push:
     new 9ef48d6b [MINOR][IMPROVEMENT] Return index-file size of n*SEGMENT_SIZE in HDFS reading (#204)
9ef48d6b is described below

commit 9ef48d6bc7891a3866fbd95ce5f2d8ffd0ae2f25
Author: Junfan Zhang <ju...@outlook.com>
AuthorDate: Fri Sep 9 23:10:16 2022 +0800

    [MINOR][IMPROVEMENT] Return index-file size of n*SEGMENT_SIZE in HDFS reading (#204)
    
    ### What changes were proposed in this pull request?
    Return index-file size of n*SEGMENT_SIZE in HDFS reading
    
    ### Why are the changes needed?
    It should keep consistent with the logic of returning the complete index data in localfile reader that data size is always n*SEGMENT_SIZE.
    
    ### Does this PR introduce _any_ user-facing change?
    No
    
    ### How was this patch tested?
    UT
---
 .../org/apache/uniffle/common/util/RssUtils.java   |  4 ++--
 .../apache/uniffle/common/util/RssUtilsTest.java   |  9 +++++++-
 .../handler/impl/HdfsShuffleReadHandler.java       | 11 ++++++++-
 .../handler/impl/HdfsShuffleWriteHandler.java      |  3 ++-
 .../handler/impl/LocalFileServerReadHandler.java   |  6 ++++-
 .../handler/impl/HdfsClientReadHandlerTest.java    | 27 +++++++++++++++++++++-
 6 files changed, 53 insertions(+), 7 deletions(-)

diff --git a/common/src/main/java/org/apache/uniffle/common/util/RssUtils.java b/common/src/main/java/org/apache/uniffle/common/util/RssUtils.java
index 896711bf..47160204 100644
--- a/common/src/main/java/org/apache/uniffle/common/util/RssUtils.java
+++ b/common/src/main/java/org/apache/uniffle/common/util/RssUtils.java
@@ -53,6 +53,7 @@ import org.apache.uniffle.common.BufferSegment;
 import org.apache.uniffle.common.ShuffleDataSegment;
 import org.apache.uniffle.common.ShuffleIndexResult;
 import org.apache.uniffle.common.ShuffleServerInfo;
+import org.apache.uniffle.common.exception.RssException;
 
 public class RssUtils {
 
@@ -226,8 +227,7 @@ public class RssUtils {
           fileOffset = -1;
         }
       } catch (BufferUnderflowException ue) {
-        LOGGER.warn("Read index data under flow", ue);
-        break;
+        throw new RssException("Read index data under flow", ue);
       }
     }
 
diff --git a/common/src/test/java/org/apache/uniffle/common/util/RssUtilsTest.java b/common/src/test/java/org/apache/uniffle/common/util/RssUtilsTest.java
index a5211174..c67bf482 100644
--- a/common/src/test/java/org/apache/uniffle/common/util/RssUtilsTest.java
+++ b/common/src/test/java/org/apache/uniffle/common/util/RssUtilsTest.java
@@ -156,7 +156,14 @@ public class RssUtilsTest {
     incompleteByteBuffer.putLong(1L);
     incompleteByteBuffer.putInt(2);
     data = incompleteByteBuffer.array();
-    assertTrue(RssUtils.transIndexDataToSegments(new ShuffleIndexResult(data), readBufferSize).isEmpty());
+    // It should throw exception
+    try {
+      RssUtils.transIndexDataToSegments(new ShuffleIndexResult(data), readBufferSize);
+      fail();
+    } catch (Exception e) {
+      // ignore
+      assertTrue(e.getMessage().contains("Read index data under flow"));
+    }
   }
 
   @Test
diff --git a/storage/src/main/java/org/apache/uniffle/storage/handler/impl/HdfsShuffleReadHandler.java b/storage/src/main/java/org/apache/uniffle/storage/handler/impl/HdfsShuffleReadHandler.java
index 011ac0f2..f91e27f4 100644
--- a/storage/src/main/java/org/apache/uniffle/storage/handler/impl/HdfsShuffleReadHandler.java
+++ b/storage/src/main/java/org/apache/uniffle/storage/handler/impl/HdfsShuffleReadHandler.java
@@ -29,6 +29,7 @@ import org.slf4j.LoggerFactory;
 import org.apache.uniffle.common.ShuffleDataResult;
 import org.apache.uniffle.common.ShuffleDataSegment;
 import org.apache.uniffle.common.ShuffleIndexResult;
+import org.apache.uniffle.storage.common.FileBasedShuffleSegment;
 import org.apache.uniffle.storage.util.ShuffleStorageUtils;
 
 /**
@@ -62,10 +63,18 @@ public class HdfsShuffleReadHandler extends DataSkippableReadHandler {
     long start = System.currentTimeMillis();
     try {
       byte[] indexData = indexReader.read();
+      int segmentNumber = (int) (indexData.length / FileBasedShuffleSegment.SEGMENT_SIZE);
+      int expectedLen = segmentNumber * FileBasedShuffleSegment.SEGMENT_SIZE;
+      if (indexData.length != expectedLen) {
+        LOG.warn("Maybe the index file: {} is being written due to the shuffle-buffer flushing.", filePrefix);
+        byte[] indexNewData = new byte[expectedLen];
+        System.arraycopy(indexData, 0, indexNewData, 0, expectedLen);
+        indexData = indexNewData;
+      }
       LOG.info("Read index files {}.index for {} ms", filePrefix, System.currentTimeMillis() - start);
       return new ShuffleIndexResult(indexData);
     } catch (Exception e) {
-      LOG.info("Fail to read index files {}.index", filePrefix);
+      LOG.info("Fail to read index files {}.index", filePrefix, e);
     }
     return new ShuffleIndexResult();
   }
diff --git a/storage/src/main/java/org/apache/uniffle/storage/handler/impl/HdfsShuffleWriteHandler.java b/storage/src/main/java/org/apache/uniffle/storage/handler/impl/HdfsShuffleWriteHandler.java
index f6b89b59..af59ecbd 100644
--- a/storage/src/main/java/org/apache/uniffle/storage/handler/impl/HdfsShuffleWriteHandler.java
+++ b/storage/src/main/java/org/apache/uniffle/storage/handler/impl/HdfsShuffleWriteHandler.java
@@ -152,7 +152,8 @@ public class HdfsShuffleWriteHandler implements ShuffleWriteHandler {
         fileNamePrefix);
   }
 
-  private HdfsFileWriter createWriter(String fileName) throws IOException, IllegalStateException {
+  @VisibleForTesting
+  public HdfsFileWriter createWriter(String fileName) throws IOException, IllegalStateException {
     Path path = new Path(basePath, fileName);
     HdfsFileWriter writer = new HdfsFileWriter(fileSystem, path, hadoopConf);
     return writer;
diff --git a/storage/src/main/java/org/apache/uniffle/storage/handler/impl/LocalFileServerReadHandler.java b/storage/src/main/java/org/apache/uniffle/storage/handler/impl/LocalFileServerReadHandler.java
index 588b70c1..1eeb75bc 100644
--- a/storage/src/main/java/org/apache/uniffle/storage/handler/impl/LocalFileServerReadHandler.java
+++ b/storage/src/main/java/org/apache/uniffle/storage/handler/impl/LocalFileServerReadHandler.java
@@ -139,8 +139,12 @@ public class LocalFileServerReadHandler implements ServerReadHandler {
     int indexNum = 0;
     int len = 0;
     try (LocalFileReader reader = createFileReader(indexFileName)) {
-      indexNum = (int)  (new File(indexFileName).length() / FileBasedShuffleSegment.SEGMENT_SIZE);
+      long fileSize = new File(indexFileName).length();
+      indexNum = (int)  (fileSize / FileBasedShuffleSegment.SEGMENT_SIZE);
       len = indexNum * FileBasedShuffleSegment.SEGMENT_SIZE;
+      if (fileSize != len) {
+        LOG.warn("Maybe the index file: {} is being written due to the shuffle-buffer flushing.", indexFileName);
+      }
       byte[] indexData = reader.read(0, len);
       return new ShuffleIndexResult(indexData);
     } catch (Exception e) {
diff --git a/storage/src/test/java/org/apache/uniffle/storage/handler/impl/HdfsClientReadHandlerTest.java b/storage/src/test/java/org/apache/uniffle/storage/handler/impl/HdfsClientReadHandlerTest.java
index 26c0175e..6309392f 100644
--- a/storage/src/test/java/org/apache/uniffle/storage/handler/impl/HdfsClientReadHandlerTest.java
+++ b/storage/src/test/java/org/apache/uniffle/storage/handler/impl/HdfsClientReadHandlerTest.java
@@ -17,6 +17,7 @@
 
 package org.apache.uniffle.storage.handler.impl;
 
+import java.nio.ByteBuffer;
 import java.util.Map;
 import java.util.Random;
 import java.util.Set;
@@ -30,7 +31,10 @@ import org.roaringbitmap.longlong.Roaring64NavigableMap;
 
 import org.apache.uniffle.common.BufferSegment;
 import org.apache.uniffle.common.ShuffleDataResult;
+import org.apache.uniffle.common.ShuffleIndexResult;
 import org.apache.uniffle.storage.HdfsTestBase;
+import org.apache.uniffle.storage.common.FileBasedShuffleSegment;
+import org.apache.uniffle.storage.util.ShuffleStorageUtils;
 
 import static org.apache.uniffle.storage.HdfsShuffleHandlerTestBase.calcExpectedSegmentNum;
 import static org.apache.uniffle.storage.HdfsShuffleHandlerTestBase.checkData;
@@ -57,12 +61,12 @@ public class HdfsClientReadHandlerTest extends HdfsTestBase {
 
       Map<Long, byte[]> expectedData = Maps.newHashMap();
       Roaring64NavigableMap expectBlockIds = Roaring64NavigableMap.bitmapOf();
-      Roaring64NavigableMap processBlockIds = Roaring64NavigableMap.bitmapOf();
 
       int readBufferSize = 13;
       int total = 0;
       int totalBlockNum = 0;
       int expectTotalBlockNum = 0;
+
       for (int i = 0; i < 5; i++) {
         writeHandler.setFailTimes(i);
         int num = new Random().nextInt(17);
@@ -72,6 +76,27 @@ public class HdfsClientReadHandlerTest extends HdfsTestBase {
         expectedData.forEach((id, block) -> expectBlockIds.addLong(id));
       }
 
+      /**
+       * This part is to check the fault tolerance of reading HDFS incomplete index file
+       */
+      String indexFileName = ShuffleStorageUtils.generateIndexFileName("test_0");
+      HdfsFileWriter indexWriter = writeHandler.createWriter(indexFileName);
+      indexWriter.writeData(ByteBuffer.allocate(4).putInt(169560).array());
+      indexWriter.writeData(ByteBuffer.allocate(4).putInt(999).array());
+      indexWriter.close();
+
+      Roaring64NavigableMap processBlockIds = Roaring64NavigableMap.bitmapOf();
+
+      HdfsShuffleReadHandler indexReader = new HdfsShuffleReadHandler(
+          "appId", 0, 1, basePath + "/appId/0/1-1/test_0",
+          readBufferSize, expectBlockIds, processBlockIds, hadoopConf);
+      try {
+        ShuffleIndexResult indexResult = indexReader.readShuffleIndex();
+        assertEquals(0, indexResult.getIndexData().length % FileBasedShuffleSegment.SEGMENT_SIZE);
+      } catch (Exception e) {
+        fail();
+      }
+
       HdfsClientReadHandler handler = new HdfsClientReadHandler(
           "appId",
           0,