You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hudi.apache.org by si...@apache.org on 2022/09/28 15:03:12 UTC
[hudi] branch master updated: [HUDI-2780] Fix the issue of Mor log skipping complete blocks when reading data (#4015)
This is an automated email from the ASF dual-hosted git repository.
sivabalan pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/hudi.git
The following commit(s) were added to refs/heads/master by this push:
new 9966b2c556 [HUDI-2780] Fix the issue of Mor log skipping complete blocks when reading data (#4015)
9966b2c556 is described below
commit 9966b2c5560db6cc844ba5108d536f6fc2f43ad8
Author: hj2016 <hj...@163.com>
AuthorDate: Wed Sep 28 23:02:59 2022 +0800
[HUDI-2780] Fix the issue of Mor log skipping complete blocks when reading data (#4015)
Co-authored-by: huangjing02 <hu...@bilibili.com>
Co-authored-by: sivabalan <n....@gmail.com>
---
.../hudi/common/table/log/HoodieLogFileReader.java | 15 ++--
.../common/functional/TestHoodieLogFormat.java | 79 ++++++++++++++--------
2 files changed, 57 insertions(+), 37 deletions(-)
diff --git a/hudi-common/src/main/java/org/apache/hudi/common/table/log/HoodieLogFileReader.java b/hudi-common/src/main/java/org/apache/hudi/common/table/log/HoodieLogFileReader.java
index c784684cc0..2e2af79823 100644
--- a/hudi-common/src/main/java/org/apache/hudi/common/table/log/HoodieLogFileReader.java
+++ b/hudi-common/src/main/java/org/apache/hudi/common/table/log/HoodieLogFileReader.java
@@ -150,13 +150,14 @@ public class HoodieLogFileReader implements HoodieLogFormat.Reader {
// for max of Integer size
private HoodieLogBlock readBlock() throws IOException {
int blockSize;
+ long blockStartPos = inputStream.getPos();
try {
// 1 Read the total size of the block
blockSize = (int) inputStream.readLong();
} catch (EOFException | CorruptedLogFileException e) {
// An exception reading any of the above indicates a corrupt block
// Create a corrupt block by finding the next MAGIC marker or EOF
- return createCorruptBlock();
+ return createCorruptBlock(blockStartPos);
}
// We may have had a crash which could have written this block partially
@@ -164,7 +165,7 @@ public class HoodieLogFileReader implements HoodieLogFormat.Reader {
// block) or EOF. If we did not find either of it, then this block is a corrupted block.
boolean isCorrupted = isBlockCorrupted(blockSize);
if (isCorrupted) {
- return createCorruptBlock();
+ return createCorruptBlock(blockStartPos);
}
// 2. Read the version for this log format
@@ -253,14 +254,14 @@ public class HoodieLogFileReader implements HoodieLogFormat.Reader {
return HoodieLogBlockType.values()[type];
}
- private HoodieLogBlock createCorruptBlock() throws IOException {
- LOG.info("Log " + logFile + " has a corrupted block at " + inputStream.getPos());
- long currentPos = inputStream.getPos();
+ private HoodieLogBlock createCorruptBlock(long blockStartPos) throws IOException {
+ LOG.info("Log " + logFile + " has a corrupted block at " + blockStartPos);
+ inputStream.seek(blockStartPos);
long nextBlockOffset = scanForNextAvailableBlockOffset();
// Rewind to the initial start and read corrupted bytes till the nextBlockOffset
- inputStream.seek(currentPos);
+ inputStream.seek(blockStartPos);
LOG.info("Next available block in " + logFile + " starts at " + nextBlockOffset);
- int corruptedBlockSize = (int) (nextBlockOffset - currentPos);
+ int corruptedBlockSize = (int) (nextBlockOffset - blockStartPos);
long contentPosition = inputStream.getPos();
Option<byte[]> corruptedBytes = HoodieLogBlock.tryReadContent(inputStream, corruptedBlockSize, readBlockLazily);
HoodieLogBlock.HoodieLogBlockContentLocation logBlockContentLoc =
diff --git a/hudi-common/src/test/java/org/apache/hudi/common/functional/TestHoodieLogFormat.java b/hudi-common/src/test/java/org/apache/hudi/common/functional/TestHoodieLogFormat.java
index 6d084f8e17..f87e5a41b8 100755
--- a/hudi-common/src/test/java/org/apache/hudi/common/functional/TestHoodieLogFormat.java
+++ b/hudi-common/src/test/java/org/apache/hudi/common/functional/TestHoodieLogFormat.java
@@ -700,20 +700,11 @@ public class TestHoodieLogFormat extends HoodieCommonTestHarness {
@Test
public void testAppendAndReadOnCorruptedLog() throws IOException, URISyntaxException, InterruptedException {
- Writer writer =
- HoodieLogFormat.newWriterBuilder().onParentPath(partitionPath).withFileExtension(HoodieLogFile.DELTA_EXTENSION)
- .withFileId("test-fileid1").overBaseCommit("100").withFs(fs).build();
- List<IndexedRecord> records = SchemaTestUtil.generateTestRecords(0, 100);
- Map<HoodieLogBlock.HeaderMetadataType, String> header = new HashMap<>();
- header.put(HoodieLogBlock.HeaderMetadataType.INSTANT_TIME, "100");
- header.put(HoodieLogBlock.HeaderMetadataType.SCHEMA, getSimpleSchema().toString());
- HoodieDataBlock dataBlock = getDataBlock(DEFAULT_DATA_BLOCK_TYPE, records, header);
- writer.appendBlock(dataBlock);
- writer.close();
+ HoodieLogFile logFile = addValidBlock("test-fileId1", "100", 100);
// Append some arbit byte[] to the end of the log (mimics a partially written commit)
fs = FSUtils.getFs(fs.getUri().toString(), fs.getConf());
- FSDataOutputStream outputStream = fs.append(writer.getLogFile().getPath());
+ FSDataOutputStream outputStream = fs.append(logFile.getPath());
// create a block with
outputStream.write(HoodieLogFormat.MAGIC);
// Write out a length that does not confirm with the content
@@ -728,17 +719,10 @@ public class TestHoodieLogFormat extends HoodieCommonTestHarness {
outputStream.close();
// Append a proper block that is of the missing length of the corrupted block
- writer =
- HoodieLogFormat.newWriterBuilder().onParentPath(partitionPath).withFileExtension(HoodieLogFile.DELTA_EXTENSION)
- .withFileId("test-fileid1").overBaseCommit("100").withFs(fs).build();
- records = SchemaTestUtil.generateTestRecords(0, 10);
- header.put(HoodieLogBlock.HeaderMetadataType.SCHEMA, getSimpleSchema().toString());
- dataBlock = getDataBlock(DEFAULT_DATA_BLOCK_TYPE, records, header);
- writer.appendBlock(dataBlock);
- writer.close();
+ logFile = addValidBlock("test-fileId1", "100", 10);
// First round of reads - we should be able to read the first block and then EOF
- Reader reader = HoodieLogFormat.newReader(fs, writer.getLogFile(), SchemaTestUtil.getSimpleSchema());
+ Reader reader = HoodieLogFormat.newReader(fs, logFile, SchemaTestUtil.getSimpleSchema());
assertTrue(reader.hasNext(), "First block should be available");
reader.next();
assertTrue(reader.hasNext(), "We should have corrupted block next");
@@ -751,7 +735,7 @@ public class TestHoodieLogFormat extends HoodieCommonTestHarness {
reader.close();
// Simulate another failure back to back
- outputStream = fs.append(writer.getLogFile().getPath());
+ outputStream = fs.append(logFile.getPath());
// create a block with
outputStream.write(HoodieLogFormat.MAGIC);
// Write out a length that does not confirm with the content
@@ -766,17 +750,10 @@ public class TestHoodieLogFormat extends HoodieCommonTestHarness {
outputStream.close();
// Should be able to append a new block
- writer =
- HoodieLogFormat.newWriterBuilder().onParentPath(partitionPath).withFileExtension(HoodieLogFile.DELTA_EXTENSION)
- .withFileId("test-fileid1").overBaseCommit("100").withFs(fs).build();
- records = SchemaTestUtil.generateTestRecords(0, 100);
- header.put(HoodieLogBlock.HeaderMetadataType.SCHEMA, getSimpleSchema().toString());
- dataBlock = getDataBlock(DEFAULT_DATA_BLOCK_TYPE, records, header);
- writer.appendBlock(dataBlock);
- writer.close();
+ logFile = addValidBlock("test-fileId1", "100", 100);
// Second round of reads - we should be able to read the first and last block
- reader = HoodieLogFormat.newReader(fs, writer.getLogFile(), SchemaTestUtil.getSimpleSchema());
+ reader = HoodieLogFormat.newReader(fs, logFile, SchemaTestUtil.getSimpleSchema());
assertTrue(reader.hasNext(), "First block should be available");
reader.next();
assertTrue(reader.hasNext(), "We should get the 1st corrupted block next");
@@ -792,6 +769,48 @@ public class TestHoodieLogFormat extends HoodieCommonTestHarness {
reader.close();
}
+ @Test
+ public void testMissingBlockExceptMagicBytes() throws IOException, URISyntaxException, InterruptedException {
+ HoodieLogFile logFile = addValidBlock("test-fileId1", "100", 100);
+
+ // Append just magic bytes and move onto next block
+ fs = FSUtils.getFs(fs.getUri().toString(), fs.getConf());
+ FSDataOutputStream outputStream = fs.append(logFile.getPath());
+ outputStream.write(HoodieLogFormat.MAGIC);
+ outputStream.flush();
+ outputStream.close();
+
+ // Append a proper block
+ logFile = addValidBlock("test-fileId1", "100", 10);
+
+ // First round of reads - we should be able to read the first block and then EOF
+ Reader reader = HoodieLogFormat.newReader(fs, logFile, SchemaTestUtil.getSimpleSchema());
+ assertTrue(reader.hasNext(), "First block should be available");
+ reader.next();
+ assertTrue(reader.hasNext(), "We should have corrupted block next");
+ HoodieLogBlock block = reader.next();
+ assertEquals(HoodieLogBlockType.CORRUPT_BLOCK, block.getBlockType(), "The read block should be a corrupt block");
+ assertTrue(reader.hasNext(), "Third block should be available");
+ reader.next();
+ assertFalse(reader.hasNext(), "There should be no more block left");
+
+ reader.close();
+ }
+
+ private HoodieLogFile addValidBlock(String fileId, String commitTime, int numRecords) throws IOException, URISyntaxException, InterruptedException {
+ Writer writer =
+ HoodieLogFormat.newWriterBuilder().onParentPath(partitionPath).withFileExtension(HoodieLogFile.DELTA_EXTENSION)
+ .withFileId(fileId).overBaseCommit(commitTime).withFs(fs).build();
+ List<IndexedRecord> records = SchemaTestUtil.generateTestRecords(0, numRecords);
+ Map<HoodieLogBlock.HeaderMetadataType, String> header = new HashMap<>();
+ header.put(HoodieLogBlock.HeaderMetadataType.INSTANT_TIME, "100");
+ header.put(HoodieLogBlock.HeaderMetadataType.SCHEMA, getSimpleSchema().toString());
+ HoodieDataBlock dataBlock = getDataBlock(DEFAULT_DATA_BLOCK_TYPE, records, header);
+ writer.appendBlock(dataBlock);
+ writer.close();
+ return writer.getLogFile();
+ }
+
@Test
public void testValidateCorruptBlockEndPosition() throws IOException, URISyntaxException, InterruptedException {
Writer writer =