You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@uniffle.apache.org by ro...@apache.org on 2022/09/09 15:10:20 UTC
[incubator-uniffle] branch master updated: [MINOR][IMPROVEMENT] Return index-file size of n*SEGMENT_SIZE in HDFS reading (#204)
This is an automated email from the ASF dual-hosted git repository.
roryqi pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-uniffle.git
The following commit(s) were added to refs/heads/master by this push:
new 9ef48d6b [MINOR][IMPROVEMENT] Return index-file size of n*SEGMENT_SIZE in HDFS reading (#204)
9ef48d6b is described below
commit 9ef48d6bc7891a3866fbd95ce5f2d8ffd0ae2f25
Author: Junfan Zhang <ju...@outlook.com>
AuthorDate: Fri Sep 9 23:10:16 2022 +0800
[MINOR][IMPROVEMENT] Return index-file size of n*SEGMENT_SIZE in HDFS reading (#204)
### What changes were proposed in this pull request?
Return index-file size of n*SEGMENT_SIZE in HDFS reading
### Why are the changes needed?
It should keep consistent with the logic of returning the complete index data in localfile reader that data size is always n*SEGMENT_SIZE.
### Does this PR introduce _any_ user-facing change?
No
### How was this patch tested?
UT
---
.../org/apache/uniffle/common/util/RssUtils.java | 4 ++--
.../apache/uniffle/common/util/RssUtilsTest.java | 9 +++++++-
.../handler/impl/HdfsShuffleReadHandler.java | 11 ++++++++-
.../handler/impl/HdfsShuffleWriteHandler.java | 3 ++-
.../handler/impl/LocalFileServerReadHandler.java | 6 ++++-
.../handler/impl/HdfsClientReadHandlerTest.java | 27 +++++++++++++++++++++-
6 files changed, 53 insertions(+), 7 deletions(-)
diff --git a/common/src/main/java/org/apache/uniffle/common/util/RssUtils.java b/common/src/main/java/org/apache/uniffle/common/util/RssUtils.java
index 896711bf..47160204 100644
--- a/common/src/main/java/org/apache/uniffle/common/util/RssUtils.java
+++ b/common/src/main/java/org/apache/uniffle/common/util/RssUtils.java
@@ -53,6 +53,7 @@ import org.apache.uniffle.common.BufferSegment;
import org.apache.uniffle.common.ShuffleDataSegment;
import org.apache.uniffle.common.ShuffleIndexResult;
import org.apache.uniffle.common.ShuffleServerInfo;
+import org.apache.uniffle.common.exception.RssException;
public class RssUtils {
@@ -226,8 +227,7 @@ public class RssUtils {
fileOffset = -1;
}
} catch (BufferUnderflowException ue) {
- LOGGER.warn("Read index data under flow", ue);
- break;
+ throw new RssException("Read index data under flow", ue);
}
}
diff --git a/common/src/test/java/org/apache/uniffle/common/util/RssUtilsTest.java b/common/src/test/java/org/apache/uniffle/common/util/RssUtilsTest.java
index a5211174..c67bf482 100644
--- a/common/src/test/java/org/apache/uniffle/common/util/RssUtilsTest.java
+++ b/common/src/test/java/org/apache/uniffle/common/util/RssUtilsTest.java
@@ -156,7 +156,14 @@ public class RssUtilsTest {
incompleteByteBuffer.putLong(1L);
incompleteByteBuffer.putInt(2);
data = incompleteByteBuffer.array();
- assertTrue(RssUtils.transIndexDataToSegments(new ShuffleIndexResult(data), readBufferSize).isEmpty());
+ // It should throw exception
+ try {
+ RssUtils.transIndexDataToSegments(new ShuffleIndexResult(data), readBufferSize);
+ fail();
+ } catch (Exception e) {
+ // ignore
+ assertTrue(e.getMessage().contains("Read index data under flow"));
+ }
}
@Test
diff --git a/storage/src/main/java/org/apache/uniffle/storage/handler/impl/HdfsShuffleReadHandler.java b/storage/src/main/java/org/apache/uniffle/storage/handler/impl/HdfsShuffleReadHandler.java
index 011ac0f2..f91e27f4 100644
--- a/storage/src/main/java/org/apache/uniffle/storage/handler/impl/HdfsShuffleReadHandler.java
+++ b/storage/src/main/java/org/apache/uniffle/storage/handler/impl/HdfsShuffleReadHandler.java
@@ -29,6 +29,7 @@ import org.slf4j.LoggerFactory;
import org.apache.uniffle.common.ShuffleDataResult;
import org.apache.uniffle.common.ShuffleDataSegment;
import org.apache.uniffle.common.ShuffleIndexResult;
+import org.apache.uniffle.storage.common.FileBasedShuffleSegment;
import org.apache.uniffle.storage.util.ShuffleStorageUtils;
/**
@@ -62,10 +63,18 @@ public class HdfsShuffleReadHandler extends DataSkippableReadHandler {
long start = System.currentTimeMillis();
try {
byte[] indexData = indexReader.read();
+ int segmentNumber = (int) (indexData.length / FileBasedShuffleSegment.SEGMENT_SIZE);
+ int expectedLen = segmentNumber * FileBasedShuffleSegment.SEGMENT_SIZE;
+ if (indexData.length != expectedLen) {
+ LOG.warn("Maybe the index file: {} is being written due to the shuffle-buffer flushing.", filePrefix);
+ byte[] indexNewData = new byte[expectedLen];
+ System.arraycopy(indexData, 0, indexNewData, 0, expectedLen);
+ indexData = indexNewData;
+ }
LOG.info("Read index files {}.index for {} ms", filePrefix, System.currentTimeMillis() - start);
return new ShuffleIndexResult(indexData);
} catch (Exception e) {
- LOG.info("Fail to read index files {}.index", filePrefix);
+ LOG.info("Fail to read index files {}.index", filePrefix, e);
}
return new ShuffleIndexResult();
}
diff --git a/storage/src/main/java/org/apache/uniffle/storage/handler/impl/HdfsShuffleWriteHandler.java b/storage/src/main/java/org/apache/uniffle/storage/handler/impl/HdfsShuffleWriteHandler.java
index f6b89b59..af59ecbd 100644
--- a/storage/src/main/java/org/apache/uniffle/storage/handler/impl/HdfsShuffleWriteHandler.java
+++ b/storage/src/main/java/org/apache/uniffle/storage/handler/impl/HdfsShuffleWriteHandler.java
@@ -152,7 +152,8 @@ public class HdfsShuffleWriteHandler implements ShuffleWriteHandler {
fileNamePrefix);
}
- private HdfsFileWriter createWriter(String fileName) throws IOException, IllegalStateException {
+ @VisibleForTesting
+ public HdfsFileWriter createWriter(String fileName) throws IOException, IllegalStateException {
Path path = new Path(basePath, fileName);
HdfsFileWriter writer = new HdfsFileWriter(fileSystem, path, hadoopConf);
return writer;
diff --git a/storage/src/main/java/org/apache/uniffle/storage/handler/impl/LocalFileServerReadHandler.java b/storage/src/main/java/org/apache/uniffle/storage/handler/impl/LocalFileServerReadHandler.java
index 588b70c1..1eeb75bc 100644
--- a/storage/src/main/java/org/apache/uniffle/storage/handler/impl/LocalFileServerReadHandler.java
+++ b/storage/src/main/java/org/apache/uniffle/storage/handler/impl/LocalFileServerReadHandler.java
@@ -139,8 +139,12 @@ public class LocalFileServerReadHandler implements ServerReadHandler {
int indexNum = 0;
int len = 0;
try (LocalFileReader reader = createFileReader(indexFileName)) {
- indexNum = (int) (new File(indexFileName).length() / FileBasedShuffleSegment.SEGMENT_SIZE);
+ long fileSize = new File(indexFileName).length();
+ indexNum = (int) (fileSize / FileBasedShuffleSegment.SEGMENT_SIZE);
len = indexNum * FileBasedShuffleSegment.SEGMENT_SIZE;
+ if (fileSize != len) {
+ LOG.warn("Maybe the index file: {} is being written due to the shuffle-buffer flushing.", indexFileName);
+ }
byte[] indexData = reader.read(0, len);
return new ShuffleIndexResult(indexData);
} catch (Exception e) {
diff --git a/storage/src/test/java/org/apache/uniffle/storage/handler/impl/HdfsClientReadHandlerTest.java b/storage/src/test/java/org/apache/uniffle/storage/handler/impl/HdfsClientReadHandlerTest.java
index 26c0175e..6309392f 100644
--- a/storage/src/test/java/org/apache/uniffle/storage/handler/impl/HdfsClientReadHandlerTest.java
+++ b/storage/src/test/java/org/apache/uniffle/storage/handler/impl/HdfsClientReadHandlerTest.java
@@ -17,6 +17,7 @@
package org.apache.uniffle.storage.handler.impl;
+import java.nio.ByteBuffer;
import java.util.Map;
import java.util.Random;
import java.util.Set;
@@ -30,7 +31,10 @@ import org.roaringbitmap.longlong.Roaring64NavigableMap;
import org.apache.uniffle.common.BufferSegment;
import org.apache.uniffle.common.ShuffleDataResult;
+import org.apache.uniffle.common.ShuffleIndexResult;
import org.apache.uniffle.storage.HdfsTestBase;
+import org.apache.uniffle.storage.common.FileBasedShuffleSegment;
+import org.apache.uniffle.storage.util.ShuffleStorageUtils;
import static org.apache.uniffle.storage.HdfsShuffleHandlerTestBase.calcExpectedSegmentNum;
import static org.apache.uniffle.storage.HdfsShuffleHandlerTestBase.checkData;
@@ -57,12 +61,12 @@ public class HdfsClientReadHandlerTest extends HdfsTestBase {
Map<Long, byte[]> expectedData = Maps.newHashMap();
Roaring64NavigableMap expectBlockIds = Roaring64NavigableMap.bitmapOf();
- Roaring64NavigableMap processBlockIds = Roaring64NavigableMap.bitmapOf();
int readBufferSize = 13;
int total = 0;
int totalBlockNum = 0;
int expectTotalBlockNum = 0;
+
for (int i = 0; i < 5; i++) {
writeHandler.setFailTimes(i);
int num = new Random().nextInt(17);
@@ -72,6 +76,27 @@ public class HdfsClientReadHandlerTest extends HdfsTestBase {
expectedData.forEach((id, block) -> expectBlockIds.addLong(id));
}
+ /**
+ * This part is to check the fault tolerance of reading HDFS incomplete index file
+ */
+ String indexFileName = ShuffleStorageUtils.generateIndexFileName("test_0");
+ HdfsFileWriter indexWriter = writeHandler.createWriter(indexFileName);
+ indexWriter.writeData(ByteBuffer.allocate(4).putInt(169560).array());
+ indexWriter.writeData(ByteBuffer.allocate(4).putInt(999).array());
+ indexWriter.close();
+
+ Roaring64NavigableMap processBlockIds = Roaring64NavigableMap.bitmapOf();
+
+ HdfsShuffleReadHandler indexReader = new HdfsShuffleReadHandler(
+ "appId", 0, 1, basePath + "/appId/0/1-1/test_0",
+ readBufferSize, expectBlockIds, processBlockIds, hadoopConf);
+ try {
+ ShuffleIndexResult indexResult = indexReader.readShuffleIndex();
+ assertEquals(0, indexResult.getIndexData().length % FileBasedShuffleSegment.SEGMENT_SIZE);
+ } catch (Exception e) {
+ fail();
+ }
+
HdfsClientReadHandler handler = new HdfsClientReadHandler(
"appId",
0,