You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@uniffle.apache.org by ro...@apache.org on 2022/10/27 06:28:52 UTC
[incubator-uniffle] branch master updated: [ISSUE-239][BUG] RssUtils#transIndexDataToSegments should consider the length of the data file (#275)
This is an automated email from the ASF dual-hosted git repository.
roryqi pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-uniffle.git
The following commit(s) were added to refs/heads/master by this push:
new 2429c67f [ISSUE-239][BUG] RssUtils#transIndexDataToSegments should consider the length of the data file (#275)
2429c67f is described below
commit 2429c67fb3ef48b17e3a11fb1f1665a2fee37c0b
Author: Xianming Lei <31...@users.noreply.github.com>
AuthorDate: Thu Oct 27 14:28:47 2022 +0800
[ISSUE-239][BUG] RssUtils#transIndexDataToSegments should consider the length of the data file (#275)
### What changes were proposed in this pull request?
For issue#239, Fix inconsistent blocks when reading shuffle data.
### Why are the changes needed?
This problem will cause reading shuffle data failed.
### Does this PR introduce _any_ user-facing change?
No
### How was this patch tested?
Already added UT
Co-authored-by: leixianming <le...@didiglobal.com>
---
.../apache/uniffle/common/ShuffleIndexResult.java | 10 +-
.../org/apache/uniffle/common/util/RssUtils.java | 14 +-
.../uniffle/common/ShuffleIndexResultTest.java | 2 +-
.../apache/uniffle/common/util/RssUtilsTest.java | 4 +-
.../client/impl/grpc/ShuffleServerGrpcClient.java | 2 +-
.../response/RssGetShuffleIndexResponse.java | 4 +-
proto/src/main/proto/Rss.proto | 1 +
.../uniffle/server/ShuffleServerGrpcService.java | 1 +
.../storage/handler/impl/HdfsFileReader.java | 7 +-
.../handler/impl/HdfsShuffleReadHandler.java | 13 +-
.../handler/impl/LocalFileServerReadHandler.java | 10 +-
.../handler/impl/HdfsShuffleReadHandlerTest.java | 139 ++++++++++++++++++
.../storage/handler/impl/LocalFileHandlerTest.java | 112 +++------------
.../handler/impl/LocalFileHandlerTestBase.java | 157 +++++++++++++++++++++
.../impl/LocalFileServerReadHandlerTest.java | 109 ++++++++++++++
15 files changed, 475 insertions(+), 110 deletions(-)
diff --git a/common/src/main/java/org/apache/uniffle/common/ShuffleIndexResult.java b/common/src/main/java/org/apache/uniffle/common/ShuffleIndexResult.java
index 36dc8213..7e5d5b58 100644
--- a/common/src/main/java/org/apache/uniffle/common/ShuffleIndexResult.java
+++ b/common/src/main/java/org/apache/uniffle/common/ShuffleIndexResult.java
@@ -19,19 +19,25 @@ package org.apache.uniffle.common;
public class ShuffleIndexResult {
private final byte[] indexData;
+ private long dataFileLen;
public ShuffleIndexResult() {
- this(new byte[0]);
+ this(new byte[0], -1);
}
- public ShuffleIndexResult(byte[] bytes) {
+ public ShuffleIndexResult(byte[] bytes, long dataFileLen) {
this.indexData = bytes;
+ this.dataFileLen = dataFileLen;
}
public byte[] getIndexData() {
return indexData;
}
+ public long getDataFileLen() {
+ return dataFileLen;
+ }
+
public boolean isEmpty() {
return indexData == null || indexData.length == 0;
}
diff --git a/common/src/main/java/org/apache/uniffle/common/util/RssUtils.java b/common/src/main/java/org/apache/uniffle/common/util/RssUtils.java
index 47160204..848d6d01 100644
--- a/common/src/main/java/org/apache/uniffle/common/util/RssUtils.java
+++ b/common/src/main/java/org/apache/uniffle/common/util/RssUtils.java
@@ -191,15 +191,18 @@ public class RssUtils {
}
byte[] indexData = shuffleIndexResult.getIndexData();
- return transIndexDataToSegments(indexData, readBufferSize);
+ long dataFileLen = shuffleIndexResult.getDataFileLen();
+ return transIndexDataToSegments(indexData, readBufferSize, dataFileLen);
}
- private static List<ShuffleDataSegment> transIndexDataToSegments(byte[] indexData, int readBufferSize) {
+ private static List<ShuffleDataSegment> transIndexDataToSegments(byte[] indexData,
+ int readBufferSize, long dataFileLen) {
ByteBuffer byteBuffer = ByteBuffer.wrap(indexData);
List<BufferSegment> bufferSegments = Lists.newArrayList();
List<ShuffleDataSegment> dataFileSegments = Lists.newArrayList();
int bufferOffset = 0;
long fileOffset = -1;
+ long totalLength = 0;
while (byteBuffer.hasRemaining()) {
try {
@@ -218,6 +221,13 @@ public class RssUtils {
bufferSegments.add(new BufferSegment(blockId, bufferOffset, length, uncompressLength, crc, taskAttemptId));
bufferOffset += length;
+ totalLength += length;
+
+ // If ShuffleServer is flushing the file at this time, the length in the index file record may be greater
+ // than the length in the actual data file, and it needs to be returned at this time to avoid EOFException
+ if (dataFileLen != -1 && totalLength >= dataFileLen) {
+ break;
+ }
if (bufferOffset >= readBufferSize) {
ShuffleDataSegment sds = new ShuffleDataSegment(fileOffset, bufferOffset, bufferSegments);
diff --git a/common/src/test/java/org/apache/uniffle/common/ShuffleIndexResultTest.java b/common/src/test/java/org/apache/uniffle/common/ShuffleIndexResultTest.java
index 74c7c0d3..349e5a88 100644
--- a/common/src/test/java/org/apache/uniffle/common/ShuffleIndexResultTest.java
+++ b/common/src/test/java/org/apache/uniffle/common/ShuffleIndexResultTest.java
@@ -26,7 +26,7 @@ public class ShuffleIndexResultTest {
@Test
public void testEmpty() {
assertTrue(new ShuffleIndexResult().isEmpty());
- assertTrue(new ShuffleIndexResult(null).isEmpty());
+ assertTrue(new ShuffleIndexResult(null, -1).isEmpty());
}
}
diff --git a/common/src/test/java/org/apache/uniffle/common/util/RssUtilsTest.java b/common/src/test/java/org/apache/uniffle/common/util/RssUtilsTest.java
index c67bf482..4c1f5a8c 100644
--- a/common/src/test/java/org/apache/uniffle/common/util/RssUtilsTest.java
+++ b/common/src/test/java/org/apache/uniffle/common/util/RssUtilsTest.java
@@ -137,7 +137,7 @@ public class RssUtilsTest {
}
byte[] data = byteBuffer.array();
- shuffleDataSegments = RssUtils.transIndexDataToSegments(new ShuffleIndexResult(data), readBufferSize);
+ shuffleDataSegments = RssUtils.transIndexDataToSegments(new ShuffleIndexResult(data, -1), readBufferSize);
assertEquals(expectedTotalSegmentNum, shuffleDataSegments.size());
assertEquals(0, shuffleDataSegments.get(0).getOffset());
@@ -158,7 +158,7 @@ public class RssUtilsTest {
data = incompleteByteBuffer.array();
// It should throw exception
try {
- RssUtils.transIndexDataToSegments(new ShuffleIndexResult(data), readBufferSize);
+ RssUtils.transIndexDataToSegments(new ShuffleIndexResult(data, -1), readBufferSize);
fail();
} catch (Exception e) {
// ignore
diff --git a/internal-client/src/main/java/org/apache/uniffle/client/impl/grpc/ShuffleServerGrpcClient.java b/internal-client/src/main/java/org/apache/uniffle/client/impl/grpc/ShuffleServerGrpcClient.java
index 7f4fa076..f817b2ca 100644
--- a/internal-client/src/main/java/org/apache/uniffle/client/impl/grpc/ShuffleServerGrpcClient.java
+++ b/internal-client/src/main/java/org/apache/uniffle/client/impl/grpc/ShuffleServerGrpcClient.java
@@ -576,7 +576,7 @@ public class ShuffleServerGrpcClient extends GrpcClient implements ShuffleServer
switch (statusCode) {
case SUCCESS:
response = new RssGetShuffleIndexResponse(
- ResponseStatusCode.SUCCESS, rpcResponse.getIndexData().toByteArray());
+ ResponseStatusCode.SUCCESS, rpcResponse.getIndexData().toByteArray(), rpcResponse.getDataFileLen());
break;
default:
diff --git a/internal-client/src/main/java/org/apache/uniffle/client/response/RssGetShuffleIndexResponse.java b/internal-client/src/main/java/org/apache/uniffle/client/response/RssGetShuffleIndexResponse.java
index 74ddb826..8884271c 100644
--- a/internal-client/src/main/java/org/apache/uniffle/client/response/RssGetShuffleIndexResponse.java
+++ b/internal-client/src/main/java/org/apache/uniffle/client/response/RssGetShuffleIndexResponse.java
@@ -22,9 +22,9 @@ import org.apache.uniffle.common.ShuffleIndexResult;
public class RssGetShuffleIndexResponse extends ClientResponse {
private final ShuffleIndexResult shuffleIndexResult;
- public RssGetShuffleIndexResponse(ResponseStatusCode statusCode, byte[] data) {
+ public RssGetShuffleIndexResponse(ResponseStatusCode statusCode, byte[] data, long dataFileLen) {
super(statusCode);
- this.shuffleIndexResult = new ShuffleIndexResult(data);
+ this.shuffleIndexResult = new ShuffleIndexResult(data, dataFileLen);
}
public ShuffleIndexResult getShuffleIndexResult() {
diff --git a/proto/src/main/proto/Rss.proto b/proto/src/main/proto/Rss.proto
index c2111bb0..4555a194 100644
--- a/proto/src/main/proto/Rss.proto
+++ b/proto/src/main/proto/Rss.proto
@@ -111,6 +111,7 @@ message GetLocalShuffleIndexResponse {
bytes indexData = 1;
StatusCode status = 2;
string retMsg = 3;
+ int64 dataFileLen = 4;
}
message ReportShuffleResultRequest {
diff --git a/server/src/main/java/org/apache/uniffle/server/ShuffleServerGrpcService.java b/server/src/main/java/org/apache/uniffle/server/ShuffleServerGrpcService.java
index b7a8ab20..3980cc65 100644
--- a/server/src/main/java/org/apache/uniffle/server/ShuffleServerGrpcService.java
+++ b/server/src/main/java/org/apache/uniffle/server/ShuffleServerGrpcService.java
@@ -556,6 +556,7 @@ public class ShuffleServerGrpcService extends ShuffleServerImplBase {
+ " bytes with {}", readTime, data.length, requestInfo);
builder.setIndexData(UnsafeByteOperations.unsafeWrap(data));
+ builder.setDataFileLen(shuffleIndexResult.getDataFileLen());
reply = builder.build();
} catch (FileNotFoundException indexFileNotFoundException) {
LOG.warn("Index file for {} is not found, maybe the data has been flushed to cold storage.",
diff --git a/storage/src/main/java/org/apache/uniffle/storage/handler/impl/HdfsFileReader.java b/storage/src/main/java/org/apache/uniffle/storage/handler/impl/HdfsFileReader.java
index a6414114..38d6439e 100644
--- a/storage/src/main/java/org/apache/uniffle/storage/handler/impl/HdfsFileReader.java
+++ b/storage/src/main/java/org/apache/uniffle/storage/handler/impl/HdfsFileReader.java
@@ -37,6 +37,7 @@ public class HdfsFileReader implements FileReader, Closeable {
private Path path;
private Configuration hadoopConf;
private FSDataInputStream fsDataInputStream;
+ private FileSystem fileSystem;
public HdfsFileReader(Path path, Configuration hadoopConf) throws Exception {
this.path = path;
@@ -45,7 +46,7 @@ public class HdfsFileReader implements FileReader, Closeable {
}
private void createStream() throws Exception {
- FileSystem fileSystem = HadoopFilesystemProvider.getFilesystem(path, hadoopConf);
+ fileSystem = HadoopFilesystemProvider.getFilesystem(path, hadoopConf);
if (!fileSystem.isFile(path)) {
String msg = path + " don't exist or is not a file.";
@@ -92,4 +93,8 @@ public class HdfsFileReader implements FileReader, Closeable {
public Path getPath() {
return path;
}
+
+ public long getFileLen() throws IOException {
+ return fileSystem.getFileStatus(path).getLen();
+ }
}
diff --git a/storage/src/main/java/org/apache/uniffle/storage/handler/impl/HdfsShuffleReadHandler.java b/storage/src/main/java/org/apache/uniffle/storage/handler/impl/HdfsShuffleReadHandler.java
index f91e27f4..af94723f 100644
--- a/storage/src/main/java/org/apache/uniffle/storage/handler/impl/HdfsShuffleReadHandler.java
+++ b/storage/src/main/java/org/apache/uniffle/storage/handler/impl/HdfsShuffleReadHandler.java
@@ -71,15 +71,15 @@ public class HdfsShuffleReadHandler extends DataSkippableReadHandler {
System.arraycopy(indexData, 0, indexNewData, 0, expectedLen);
indexData = indexNewData;
}
+ long dateFileLen = getDataFileLen();
LOG.info("Read index files {}.index for {} ms", filePrefix, System.currentTimeMillis() - start);
- return new ShuffleIndexResult(indexData);
+ return new ShuffleIndexResult(indexData, dateFileLen);
} catch (Exception e) {
LOG.info("Fail to read index files {}.index", filePrefix, e);
}
return new ShuffleIndexResult();
}
- @Override
protected ShuffleDataResult readShuffleData(ShuffleDataSegment shuffleDataSegment) {
// Here we make an assumption that the rest of the file is corrupted, if an unexpected data is read.
int expectedLength = shuffleDataSegment.getLength();
@@ -115,6 +115,15 @@ public class HdfsShuffleReadHandler extends DataSkippableReadHandler {
return data;
}
+ private long getDataFileLen() {
+ try {
+ return dataReader.getFileLen();
+ } catch (IOException ioException) {
+ LOG.error("getDataFileLen failed for " + ShuffleStorageUtils.generateDataFileName(filePrefix), ioException);
+ return -1;
+ }
+ }
+
public synchronized void close() {
try {
dataReader.close();
diff --git a/storage/src/main/java/org/apache/uniffle/storage/handler/impl/LocalFileServerReadHandler.java b/storage/src/main/java/org/apache/uniffle/storage/handler/impl/LocalFileServerReadHandler.java
index d1d0bc7d..b786cfc7 100644
--- a/storage/src/main/java/org/apache/uniffle/storage/handler/impl/LocalFileServerReadHandler.java
+++ b/storage/src/main/java/org/apache/uniffle/storage/handler/impl/LocalFileServerReadHandler.java
@@ -140,14 +140,16 @@ public class LocalFileServerReadHandler implements ServerReadHandler {
int indexNum = 0;
int len = 0;
try (LocalFileReader reader = createFileReader(indexFileName)) {
- long fileSize = new File(indexFileName).length();
- indexNum = (int) (fileSize / FileBasedShuffleSegment.SEGMENT_SIZE);
+ long indexFileSize = new File(indexFileName).length();
+ indexNum = (int) (indexFileSize / FileBasedShuffleSegment.SEGMENT_SIZE);
len = indexNum * FileBasedShuffleSegment.SEGMENT_SIZE;
- if (fileSize != len) {
+ if (indexFileSize != len) {
LOG.warn("Maybe the index file: {} is being written due to the shuffle-buffer flushing.", indexFileName);
}
byte[] indexData = reader.read(0, len);
- return new ShuffleIndexResult(indexData);
+ // get dataFileSize for read segment generation in DataSkippableReadHandler#readShuffleData
+ long dataFileSize = new File(dataFileName).length();
+ return new ShuffleIndexResult(indexData, dataFileSize);
} catch (Exception e) {
LOG.error("Fail to read index file {} indexNum {} len {}",
indexFileName, indexNum, len);
diff --git a/storage/src/test/java/org/apache/uniffle/storage/handler/impl/HdfsShuffleReadHandlerTest.java b/storage/src/test/java/org/apache/uniffle/storage/handler/impl/HdfsShuffleReadHandlerTest.java
index 8c5dd7a7..cd2df454 100644
--- a/storage/src/test/java/org/apache/uniffle/storage/handler/impl/HdfsShuffleReadHandlerTest.java
+++ b/storage/src/test/java/org/apache/uniffle/storage/handler/impl/HdfsShuffleReadHandlerTest.java
@@ -17,10 +17,15 @@
package org.apache.uniffle.storage.handler.impl;
+import java.io.IOException;
+import java.util.List;
import java.util.Map;
import java.util.Random;
import java.util.Set;
+import java.util.concurrent.locks.Lock;
+import java.util.concurrent.locks.ReentrantLock;
+import com.google.common.collect.Lists;
import com.google.common.collect.Maps;
import com.google.common.collect.Sets;
import org.apache.commons.lang3.StringUtils;
@@ -30,8 +35,12 @@ import org.roaringbitmap.longlong.Roaring64NavigableMap;
import org.apache.uniffle.common.BufferSegment;
import org.apache.uniffle.common.ShuffleDataResult;
+import org.apache.uniffle.common.ShufflePartitionedBlock;
+import org.apache.uniffle.common.util.ChecksumUtils;
+import org.apache.uniffle.common.util.Constants;
import org.apache.uniffle.storage.HdfsShuffleHandlerTestBase;
import org.apache.uniffle.storage.HdfsTestBase;
+import org.apache.uniffle.storage.common.FileBasedShuffleSegment;
import org.apache.uniffle.storage.util.ShuffleStorageUtils;
import static org.junit.jupiter.api.Assertions.assertEquals;
@@ -99,4 +108,134 @@ public class HdfsShuffleReadHandlerTest extends HdfsTestBase {
public void test() {
createAndRunCases(HDFS_URI, conf, StringUtils.EMPTY);
}
+
+ @Test
+ public void testDataInconsistent() {
+
+ try {
+ String basePath = HDFS_URI + "HdfsShuffleFileReadHandlerTest#testDataInconsistent";
+ TestHdfsShuffleWriteHandler writeHandler =
+ new TestHdfsShuffleWriteHandler(
+ "appId",
+ 0,
+ 1,
+ 1,
+ basePath,
+ "test",
+ conf,
+ StringUtils.EMPTY);
+
+ Map<Long, byte[]> expectedData = Maps.newHashMap();
+ int totalBlockNum = 0;
+ int expectTotalBlockNum = 6;
+ int blockSize = 7;
+ int taskAttemptId = 0;
+
+ // write expectTotalBlockNum - 1 complete block
+ HdfsShuffleHandlerTestBase.writeTestData(writeHandler, expectTotalBlockNum - 1,
+ blockSize, taskAttemptId, expectedData);
+
+ // write 1 incomplete block , which only write index file
+ List<ShufflePartitionedBlock> blocks = Lists.newArrayList();
+ byte[] buf = new byte[blockSize];
+ new Random().nextBytes(buf);
+ long blockId = (expectTotalBlockNum
+ << (Constants.PARTITION_ID_MAX_LENGTH + Constants.TASK_ATTEMPT_ID_MAX_LENGTH))
+ + taskAttemptId;
+ blocks.add(new ShufflePartitionedBlock(blockSize, blockSize, ChecksumUtils.getCrc32(buf), blockId,
+ taskAttemptId, buf));
+ writeHandler.writeIndex(blocks);
+
+ int readBufferSize = 13;
+ int total = HdfsShuffleHandlerTestBase.calcExpectedSegmentNum(expectTotalBlockNum, blockSize, readBufferSize);
+ Roaring64NavigableMap expectBlockIds = Roaring64NavigableMap.bitmapOf();
+ Roaring64NavigableMap processBlockIds = Roaring64NavigableMap.bitmapOf();
+ expectedData.forEach((id, block) -> expectBlockIds.addLong(id));
+ String fileNamePrefix = ShuffleStorageUtils.getFullShuffleDataFolder(basePath,
+ ShuffleStorageUtils.getShuffleDataPathWithRange("appId",
+ 0, 1, 1, 10)) + "/test_0";
+ HdfsShuffleReadHandler handler =
+ new HdfsShuffleReadHandler("appId", 0, 1, fileNamePrefix,
+ readBufferSize, expectBlockIds, processBlockIds, conf);
+
+ Set<Long> actualBlockIds = Sets.newHashSet();
+ for (int i = 0; i < total; ++i) {
+ ShuffleDataResult shuffleDataResult = handler.readShuffleData();
+ totalBlockNum += shuffleDataResult.getBufferSegments().size();
+ HdfsShuffleHandlerTestBase.checkData(shuffleDataResult, expectedData);
+ for (BufferSegment bufferSegment : shuffleDataResult.getBufferSegments()) {
+ actualBlockIds.add(bufferSegment.getBlockId());
+ }
+ }
+
+ assertNull(handler.readShuffleData());
+ assertEquals(
+ total,
+ handler.getShuffleDataSegments().size());
+ // The last block cannot be read, only the index is generated
+ assertEquals(expectTotalBlockNum - 1, totalBlockNum);
+ assertEquals(expectedData.keySet(), actualBlockIds);
+
+ } catch (Exception e) {
+ e.printStackTrace();
+ fail(e.getMessage());
+ }
+ }
+
+ static class TestHdfsShuffleWriteHandler extends HdfsShuffleWriteHandler {
+
+ private Configuration hadoopConf;
+ private Lock writeLock = new ReentrantLock();
+ private String basePath;
+ private String fileNamePrefix;
+ private int failTimes = 0;
+
+ TestHdfsShuffleWriteHandler(
+ String appId,
+ int shuffleId,
+ int startPartition,
+ int endPartition,
+ String storageBasePath,
+ String fileNamePrefix,
+ Configuration hadoopConf,
+ String user) throws Exception {
+ super(appId, shuffleId, startPartition, endPartition, storageBasePath, fileNamePrefix, hadoopConf, user);
+ this.hadoopConf = hadoopConf;
+ this.fileNamePrefix = fileNamePrefix;
+ this.basePath = ShuffleStorageUtils.getFullShuffleDataFolder(storageBasePath,
+ ShuffleStorageUtils.getShuffleDataPath(appId, shuffleId, startPartition, endPartition));
+ }
+
+
+ // only write index file
+ public void writeIndex(
+ List<ShufflePartitionedBlock> shuffleBlocks) throws IOException, IllegalStateException {
+ HdfsFileWriter indexWriter = null;
+ writeLock.lock();
+ try {
+ try {
+ String indexFileName = ShuffleStorageUtils.generateIndexFileName(fileNamePrefix + "_" + failTimes);
+ indexWriter = createWriter(indexFileName);
+ for (ShufflePartitionedBlock block : shuffleBlocks) {
+ long blockId = block.getBlockId();
+ long crc = block.getCrc();
+ long startOffset = indexWriter.nextOffset();
+
+ FileBasedShuffleSegment segment = new FileBasedShuffleSegment(
+ blockId, startOffset, block.getLength(), block.getUncompressLength(), crc, block.getTaskAttemptId());
+ indexWriter.writeIndex(segment);
+ }
+ } catch (Exception e) {
+ failTimes++;
+ throw new RuntimeException(e);
+ } finally {
+ if (indexWriter != null) {
+ indexWriter.close();
+ }
+ }
+ } finally {
+ writeLock.unlock();
+ }
+ }
+ }
}
diff --git a/storage/src/test/java/org/apache/uniffle/storage/handler/impl/LocalFileHandlerTest.java b/storage/src/test/java/org/apache/uniffle/storage/handler/impl/LocalFileHandlerTest.java
index 83df32db..a0c0043f 100644
--- a/storage/src/test/java/org/apache/uniffle/storage/handler/impl/LocalFileHandlerTest.java
+++ b/storage/src/test/java/org/apache/uniffle/storage/handler/impl/LocalFileHandlerTest.java
@@ -21,37 +21,24 @@ import java.io.File;
import java.io.IOException;
import java.util.List;
import java.util.Map;
-import java.util.Random;
import java.util.Set;
-import java.util.concurrent.atomic.AtomicLong;
-import com.google.common.collect.Lists;
import com.google.common.collect.Maps;
import com.google.common.collect.Sets;
import com.google.common.io.Files;
import org.junit.jupiter.api.Test;
-import org.apache.uniffle.common.BufferSegment;
import org.apache.uniffle.common.ShuffleDataResult;
-import org.apache.uniffle.common.ShuffleDataSegment;
import org.apache.uniffle.common.ShuffleIndexResult;
-import org.apache.uniffle.common.ShufflePartitionedBlock;
import org.apache.uniffle.common.config.RssBaseConf;
-import org.apache.uniffle.common.util.ChecksumUtils;
-import org.apache.uniffle.common.util.RssUtils;
-import org.apache.uniffle.storage.handler.api.ServerReadHandler;
-import org.apache.uniffle.storage.handler.api.ShuffleWriteHandler;
import org.apache.uniffle.storage.util.ShuffleStorageUtils;
-import static org.junit.jupiter.api.Assertions.assertArrayEquals;
import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.junit.jupiter.api.Assertions.assertFalse;
import static org.junit.jupiter.api.Assertions.assertTrue;
public class LocalFileHandlerTest {
- private static AtomicLong ATOMIC_LONG = new AtomicLong(0L);
-
@Test
public void writeTest() throws Exception {
File tmpDir = Files.createTempDir();
@@ -76,15 +63,15 @@ public class LocalFileHandlerTest {
final Set<Long> expectedBlockIds1 = Sets.newHashSet();
final Set<Long> expectedBlockIds2 = Sets.newHashSet();
- writeTestData(writeHandler1, 1, 32, expectedData, expectedBlockIds1);
- writeTestData(writeHandler1, 2, 32, expectedData, expectedBlockIds1);
- writeTestData(writeHandler1, 3, 32, expectedData, expectedBlockIds1);
- writeTestData(writeHandler1, 4, 32, expectedData, expectedBlockIds1);
+ LocalFileHandlerTestBase.writeTestData(writeHandler1, 1, 32, expectedData, expectedBlockIds1);
+ LocalFileHandlerTestBase.writeTestData(writeHandler1, 2, 32, expectedData, expectedBlockIds1);
+ LocalFileHandlerTestBase.writeTestData(writeHandler1, 3, 32, expectedData, expectedBlockIds1);
+ LocalFileHandlerTestBase.writeTestData(writeHandler1, 4, 32, expectedData, expectedBlockIds1);
- writeTestData(writeHandler2, 3, 32, expectedData, expectedBlockIds2);
- writeTestData(writeHandler2, 3, 32, expectedData, expectedBlockIds2);
- writeTestData(writeHandler2, 2, 32, expectedData, expectedBlockIds2);
- writeTestData(writeHandler2, 1, 32, expectedData, expectedBlockIds2);
+ LocalFileHandlerTestBase.writeTestData(writeHandler2, 3, 32, expectedData, expectedBlockIds2);
+ LocalFileHandlerTestBase.writeTestData(writeHandler2, 3, 32, expectedData, expectedBlockIds2);
+ LocalFileHandlerTestBase.writeTestData(writeHandler2, 2, 32, expectedData, expectedBlockIds2);
+ LocalFileHandlerTestBase.writeTestData(writeHandler2, 1, 32, expectedData, expectedBlockIds2);
RssBaseConf conf = new RssBaseConf();
conf.setString("rss.storage.basePath", dataDir1.getAbsolutePath() + "," + dataDir2.getAbsolutePath());
@@ -93,21 +80,22 @@ public class LocalFileHandlerTest {
LocalFileServerReadHandler readHandler2 = new LocalFileServerReadHandler(
"appId", 0, 2, 1, 10, dataDir1.getAbsolutePath());
- validateResult(readHandler1, expectedBlockIds1, expectedData);
- validateResult(readHandler2, expectedBlockIds2, expectedData);
+ LocalFileHandlerTestBase.validateResult(readHandler1, expectedBlockIds1, expectedData);
+ LocalFileHandlerTestBase.validateResult(readHandler2, expectedBlockIds2, expectedData);
// after first read, write more data
- writeTestData(writeHandler1, 1, 32, expectedData, expectedBlockIds1);
+ LocalFileHandlerTestBase.writeTestData(writeHandler1, 1, 32, expectedData, expectedBlockIds1);
// new data should be read
- validateResult(readHandler1, expectedBlockIds1, expectedData);
+ LocalFileHandlerTestBase.validateResult(readHandler1, expectedBlockIds1, expectedData);
- File targetDataFile = new File(possiblePath1, "pre.data");
- ShuffleIndexResult shuffleIndexResult = readIndex(readHandler1);
+ ShuffleIndexResult shuffleIndexResult = LocalFileHandlerTestBase.readIndex(readHandler1);
assertFalse(shuffleIndexResult.isEmpty());
- List<ShuffleDataResult> shuffleDataResults = readData(readHandler1, shuffleIndexResult);
+ assertEquals(352, shuffleIndexResult.getDataFileLen());
+ List<ShuffleDataResult> shuffleDataResults = LocalFileHandlerTestBase.readData(readHandler1, shuffleIndexResult);
assertFalse(shuffleDataResults.isEmpty());
+ File targetDataFile = new File(possiblePath1, "pre.data");
targetDataFile.delete();
- shuffleDataResults = readData(readHandler1, shuffleIndexResult);
+ shuffleDataResults = LocalFileHandlerTestBase.readData(readHandler1, shuffleIndexResult);
for (ShuffleDataResult shuffleData : shuffleDataResults) {
assertEquals(0, shuffleData.getData().length);
assertTrue(shuffleData.isEmpty());
@@ -129,71 +117,9 @@ public class LocalFileHandlerTest {
assertEquals(writer.nextOffset(), totalSize);
}
+ @Test
+ public void testReadIndex() {
- private void writeTestData(
- ShuffleWriteHandler writeHandler,
- int num, int length,
- Map<Long, byte[]> expectedData,
- Set<Long> expectedBlockIds) throws Exception {
- List<ShufflePartitionedBlock> blocks = Lists.newArrayList();
- for (int i = 0; i < num; i++) {
- byte[] buf = new byte[length];
- new Random().nextBytes(buf);
- long blockId = ATOMIC_LONG.incrementAndGet();
- blocks.add(new ShufflePartitionedBlock(length, length, ChecksumUtils.getCrc32(buf), blockId, 100,
- buf));
- expectedData.put(blockId, buf);
- expectedBlockIds.add(blockId);
- }
- writeHandler.write(blocks);
- }
-
- protected void validateResult(ServerReadHandler readHandler, Set<Long> expectedBlockIds,
- Map<Long, byte[]> expectedData) {
- List<ShuffleDataResult> shuffleDataResults = readAll(readHandler);
- Set<Long> actualBlockIds = Sets.newHashSet();
- for (ShuffleDataResult sdr : shuffleDataResults) {
- byte[] buffer = sdr.getData();
- List<BufferSegment> bufferSegments = sdr.getBufferSegments();
-
- for (BufferSegment bs : bufferSegments) {
- byte[] data = new byte[bs.getLength()];
- System.arraycopy(buffer, bs.getOffset(), data, 0, bs.getLength());
- assertEquals(bs.getCrc(), ChecksumUtils.getCrc32(data));
- assertArrayEquals(expectedData.get(bs.getBlockId()), data);
- actualBlockIds.add(bs.getBlockId());
- }
- }
- assertEquals(expectedBlockIds, actualBlockIds);
- }
-
- private List<ShuffleDataResult> readAll(ServerReadHandler readHandler) {
- ShuffleIndexResult shuffleIndexResult = readIndex(readHandler);
- return readData(readHandler, shuffleIndexResult);
- }
-
- private ShuffleIndexResult readIndex(ServerReadHandler readHandler) {
- ShuffleIndexResult shuffleIndexResult = readHandler.getShuffleIndex();
- return shuffleIndexResult;
- }
-
- private List<ShuffleDataResult> readData(ServerReadHandler readHandler, ShuffleIndexResult shuffleIndexResult) {
- List<ShuffleDataResult> shuffleDataResults = Lists.newLinkedList();
- if (shuffleIndexResult == null || shuffleIndexResult.isEmpty()) {
- return shuffleDataResults;
- }
-
- List<ShuffleDataSegment> shuffleDataSegments =
- RssUtils.transIndexDataToSegments(shuffleIndexResult, 32);
-
- for (ShuffleDataSegment shuffleDataSegment : shuffleDataSegments) {
- byte[] shuffleData =
- readHandler.getShuffleData(shuffleDataSegment.getOffset(), shuffleDataSegment.getLength()).getData();
- ShuffleDataResult sdr = new ShuffleDataResult(shuffleData, shuffleDataSegment.getBufferSegments());
- shuffleDataResults.add(sdr);
- }
-
- return shuffleDataResults;
}
}
diff --git a/storage/src/test/java/org/apache/uniffle/storage/handler/impl/LocalFileHandlerTestBase.java b/storage/src/test/java/org/apache/uniffle/storage/handler/impl/LocalFileHandlerTestBase.java
new file mode 100644
index 00000000..ae7f0541
--- /dev/null
+++ b/storage/src/test/java/org/apache/uniffle/storage/handler/impl/LocalFileHandlerTestBase.java
@@ -0,0 +1,157 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.uniffle.storage.handler.impl;
+
+import java.nio.ByteBuffer;
+import java.util.List;
+import java.util.Map;
+import java.util.Random;
+import java.util.Set;
+import java.util.concurrent.atomic.AtomicLong;
+
+import com.google.common.collect.Lists;
+import com.google.common.collect.Sets;
+
+import org.apache.uniffle.common.BufferSegment;
+import org.apache.uniffle.common.ShuffleDataResult;
+import org.apache.uniffle.common.ShuffleDataSegment;
+import org.apache.uniffle.common.ShuffleIndexResult;
+import org.apache.uniffle.common.ShufflePartitionedBlock;
+import org.apache.uniffle.common.util.ChecksumUtils;
+import org.apache.uniffle.common.util.RssUtils;
+import org.apache.uniffle.storage.common.FileBasedShuffleSegment;
+import org.apache.uniffle.storage.handler.api.ServerReadHandler;
+import org.apache.uniffle.storage.handler.api.ShuffleWriteHandler;
+
+import static org.junit.jupiter.api.Assertions.assertArrayEquals;
+import static org.junit.jupiter.api.Assertions.assertEquals;
+
+public class LocalFileHandlerTestBase {
+ private static AtomicLong ATOMIC_LONG = new AtomicLong(0L);
+
+ public static void writeTestData(
+ ShuffleWriteHandler writeHandler,
+ int num, int length,
+ Map<Long, byte[]> expectedData,
+ Set<Long> expectedBlockIds) throws Exception {
+ List<ShufflePartitionedBlock> blocks = Lists.newArrayList();
+ for (int i = 0; i < num; i++) {
+ byte[] buf = new byte[length];
+ new Random().nextBytes(buf);
+ long blockId = ATOMIC_LONG.incrementAndGet();
+ blocks.add(new ShufflePartitionedBlock(length, length, ChecksumUtils.getCrc32(buf), blockId, 100,
+ buf));
+ expectedData.put(blockId, buf);
+ expectedBlockIds.add(blockId);
+ }
+ writeHandler.write(blocks);
+ }
+
+ public static void validateResult(ServerReadHandler readHandler, Set<Long> expectedBlockIds,
+ Map<Long, byte[]> expectedData) {
+ List<ShuffleDataResult> shuffleDataResults = readAll(readHandler);
+ Set<Long> actualBlockIds = Sets.newHashSet();
+ for (ShuffleDataResult sdr : shuffleDataResults) {
+ byte[] buffer = sdr.getData();
+ List<BufferSegment> bufferSegments = sdr.getBufferSegments();
+
+ for (BufferSegment bs : bufferSegments) {
+ byte[] data = new byte[bs.getLength()];
+ System.arraycopy(buffer, bs.getOffset(), data, 0, bs.getLength());
+ assertEquals(bs.getCrc(), ChecksumUtils.getCrc32(data));
+ assertArrayEquals(expectedData.get(bs.getBlockId()), data);
+ actualBlockIds.add(bs.getBlockId());
+ }
+ }
+ assertEquals(expectedBlockIds, actualBlockIds);
+ }
+
+ public static List<ShuffleDataResult> readAll(ServerReadHandler readHandler) {
+ ShuffleIndexResult shuffleIndexResult = readIndex(readHandler);
+ return readData(readHandler, shuffleIndexResult);
+ }
+
+ public static ShuffleIndexResult readIndex(ServerReadHandler readHandler) {
+ ShuffleIndexResult shuffleIndexResult = readHandler.getShuffleIndex();
+ return shuffleIndexResult;
+ }
+
+ public static List<ShuffleDataResult> readData(ServerReadHandler readHandler, ShuffleIndexResult shuffleIndexResult) {
+ List<ShuffleDataResult> shuffleDataResults = Lists.newLinkedList();
+ if (shuffleIndexResult == null || shuffleIndexResult.isEmpty()) {
+ return shuffleDataResults;
+ }
+
+ List<ShuffleDataSegment> shuffleDataSegments =
+ RssUtils.transIndexDataToSegments(shuffleIndexResult, 32);
+
+ for (ShuffleDataSegment shuffleDataSegment : shuffleDataSegments) {
+ byte[] shuffleData =
+ readHandler.getShuffleData(shuffleDataSegment.getOffset(), shuffleDataSegment.getLength()).getData();
+ ShuffleDataResult sdr = new ShuffleDataResult(shuffleData, shuffleDataSegment.getBufferSegments());
+ shuffleDataResults.add(sdr);
+ }
+
+ return shuffleDataResults;
+ }
+
+ public static void checkData(ShuffleDataResult shuffleDataResult, Map<Long, byte[]> expectedData) {
+ byte[] buffer = shuffleDataResult.getData();
+ List<BufferSegment> bufferSegments = shuffleDataResult.getBufferSegments();
+
+ for (BufferSegment bs : bufferSegments) {
+ byte[] data = new byte[bs.getLength()];
+ System.arraycopy(buffer, bs.getOffset(), data, 0, bs.getLength());
+ assertEquals(bs.getCrc(), ChecksumUtils.getCrc32(data));
+ assertArrayEquals(expectedData.get(bs.getBlockId()), data);
+ }
+ }
+
+ public static void writeIndex(ByteBuffer byteBuffer, FileBasedShuffleSegment segment) {
+ byteBuffer.putLong(segment.getOffset());
+ byteBuffer.putInt(segment.getLength());
+ byteBuffer.putInt(segment.getUncompressLength());
+ byteBuffer.putLong(segment.getCrc());
+ byteBuffer.putLong(segment.getBlockId());
+ byteBuffer.putLong(segment.getTaskAttemptId());
+ }
+
+ public static List<byte[]> calcSegmentBytes(Map<Long, byte[]> blockIdToData, int bytesPerSegment, int blockNum) {
+ List<byte[]> res = Lists.newArrayList();
+ int curSize = 0;
+ ByteBuffer byteBuffer = ByteBuffer.allocate(2 * bytesPerSegment);
+ for (long i = 1; i <= blockNum; i++) {
+ byte[] data = blockIdToData.get(i);
+ byteBuffer.put(data, 0, data.length);
+ curSize += data.length;
+ if (curSize >= bytesPerSegment) {
+ byte[] newByte = new byte[curSize];
+ System.arraycopy(byteBuffer.array(), 0, newByte, 0, curSize);
+ res.add(newByte);
+ byteBuffer.clear();
+ curSize = 0;
+ }
+ }
+ if (curSize > 0) {
+ byte[] newByte = new byte[curSize];
+ System.arraycopy(byteBuffer.array(), 0, newByte, 0, curSize);
+ res.add(newByte);
+ }
+ return res;
+ }
+}
diff --git a/storage/src/test/java/org/apache/uniffle/storage/handler/impl/LocalFileServerReadHandlerTest.java b/storage/src/test/java/org/apache/uniffle/storage/handler/impl/LocalFileServerReadHandlerTest.java
new file mode 100644
index 00000000..d25f44f4
--- /dev/null
+++ b/storage/src/test/java/org/apache/uniffle/storage/handler/impl/LocalFileServerReadHandlerTest.java
@@ -0,0 +1,109 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.uniffle.storage.handler.impl;
+
+import java.nio.ByteBuffer;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+
+import com.google.common.collect.Maps;
+import org.junit.jupiter.api.Test;
+import org.mockito.ArgumentMatcher;
+import org.mockito.Mockito;
+import org.roaringbitmap.longlong.Roaring64NavigableMap;
+
+import org.apache.uniffle.client.api.ShuffleServerClient;
+import org.apache.uniffle.client.request.RssGetShuffleDataRequest;
+import org.apache.uniffle.client.response.ResponseStatusCode;
+import org.apache.uniffle.client.response.RssGetShuffleDataResponse;
+import org.apache.uniffle.client.response.RssGetShuffleIndexResponse;
+import org.apache.uniffle.common.ShuffleDataResult;
+import org.apache.uniffle.common.ShufflePartitionedBlock;
+import org.apache.uniffle.storage.common.FileBasedShuffleSegment;
+
+import static org.junit.jupiter.api.Assertions.assertEquals;
+
+public class LocalFileServerReadHandlerTest {
+ @Test
+ public void testDataInconsistent() throws Exception {
+ Map<Long, byte[]> expectedData = Maps.newHashMap();
+ int expectTotalBlockNum = 4;
+ int blockSize = 7;
+
+ ByteBuffer byteBuffer = ByteBuffer.allocate(expectTotalBlockNum * 40);
+ Roaring64NavigableMap expectBlockIds = Roaring64NavigableMap.bitmapOf();
+
+ // We simulate the generation of 4 block index files and 3 block data files to test LocalFileClientReadHandler
+ LocalFileHandlerTestBase.writeTestData(shuffleBlocks -> {
+ int offset = 0;
+ for (ShufflePartitionedBlock block : shuffleBlocks) {
+ FileBasedShuffleSegment segment = new FileBasedShuffleSegment(
+ block.getBlockId(), offset, block.getLength(), block.getUncompressLength(),
+ block.getCrc(), block.getTaskAttemptId());
+ offset += block.getLength();
+ LocalFileHandlerTestBase.writeIndex(byteBuffer, segment);
+ }
+ }, expectTotalBlockNum, blockSize,
+ expectedData, new HashSet<>());
+ expectedData.forEach((id, block) -> expectBlockIds.addLong(id));
+
+ String appId = "app1";
+ int shuffleId = 1;
+ int partitionId = 1;
+ ShuffleServerClient mockShuffleServerClient = Mockito.mock(ShuffleServerClient.class);
+
+ int actualWriteDataBlock = expectTotalBlockNum - 1;
+ int actualFileLen = blockSize * actualWriteDataBlock;
+ RssGetShuffleIndexResponse response = new RssGetShuffleIndexResponse(ResponseStatusCode.SUCCESS,
+ byteBuffer.array(), actualFileLen);
+ Mockito.doReturn(response).when(mockShuffleServerClient).getShuffleIndex(Mockito.any());
+
+ int readBufferSize = 13;
+ int bytesPerSegment = ((readBufferSize / blockSize) + 1) * blockSize;
+ List<byte[]> segments = LocalFileHandlerTestBase.calcSegmentBytes(expectedData,
+ bytesPerSegment, actualWriteDataBlock);
+
+ // first segment include 2 blocks
+ ArgumentMatcher<RssGetShuffleDataRequest> segment1Match =
+ (request) -> request.getOffset() == 0 && request.getLength() == bytesPerSegment;
+ // second segment include 1 block
+ ArgumentMatcher<RssGetShuffleDataRequest> segment2Match =
+ (request) -> request.getOffset() == bytesPerSegment && request.getLength() == blockSize;
+ RssGetShuffleDataResponse segment1Response =
+ new RssGetShuffleDataResponse(ResponseStatusCode.SUCCESS, segments.get(0));
+ RssGetShuffleDataResponse segment2Response =
+ new RssGetShuffleDataResponse(ResponseStatusCode.SUCCESS, segments.get(1));
+
+ Mockito.doReturn(segment1Response).when(mockShuffleServerClient).getShuffleData(Mockito.argThat(segment1Match));
+ Mockito.doReturn(segment2Response).when(mockShuffleServerClient).getShuffleData(Mockito.argThat(segment2Match));
+
+ Roaring64NavigableMap processBlockIds = Roaring64NavigableMap.bitmapOf();
+ LocalFileClientReadHandler handler = new LocalFileClientReadHandler(appId, partitionId, shuffleId, -1, 1, 1,
+ readBufferSize, expectBlockIds, processBlockIds, mockShuffleServerClient);
+ int totalSegment = ((blockSize * actualWriteDataBlock) / bytesPerSegment) + 1;
+ int readBlocks = 0;
+ for (int i = 0; i < totalSegment; i++) {
+ ShuffleDataResult result = handler.readShuffleData();
+ LocalFileHandlerTestBase.checkData(result, expectedData);
+ readBlocks += result.getBufferSegments().size();
+ }
+ assertEquals(actualWriteDataBlock, readBlocks);
+ }
+
+}