You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hudi.apache.org by si...@apache.org on 2022/09/28 15:03:12 UTC

[hudi] branch master updated: [HUDI-2780] Fix the issue of Mor log skipping complete blocks when reading data (#4015)

This is an automated email from the ASF dual-hosted git repository.

sivabalan pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/hudi.git


The following commit(s) were added to refs/heads/master by this push:
     new 9966b2c556 [HUDI-2780] Fix the issue of Mor log skipping complete blocks when reading data (#4015)
9966b2c556 is described below

commit 9966b2c5560db6cc844ba5108d536f6fc2f43ad8
Author: hj2016 <hj...@163.com>
AuthorDate: Wed Sep 28 23:02:59 2022 +0800

    [HUDI-2780] Fix the issue of Mor log skipping complete blocks when reading data (#4015)
    
    
    Co-authored-by: huangjing02 <hu...@bilibili.com>
    Co-authored-by: sivabalan <n....@gmail.com>
---
 .../hudi/common/table/log/HoodieLogFileReader.java | 15 ++--
 .../common/functional/TestHoodieLogFormat.java     | 79 ++++++++++++++--------
 2 files changed, 57 insertions(+), 37 deletions(-)

diff --git a/hudi-common/src/main/java/org/apache/hudi/common/table/log/HoodieLogFileReader.java b/hudi-common/src/main/java/org/apache/hudi/common/table/log/HoodieLogFileReader.java
index c784684cc0..2e2af79823 100644
--- a/hudi-common/src/main/java/org/apache/hudi/common/table/log/HoodieLogFileReader.java
+++ b/hudi-common/src/main/java/org/apache/hudi/common/table/log/HoodieLogFileReader.java
@@ -150,13 +150,14 @@ public class HoodieLogFileReader implements HoodieLogFormat.Reader {
   // for max of Integer size
   private HoodieLogBlock readBlock() throws IOException {
     int blockSize;
+    long blockStartPos = inputStream.getPos();
     try {
       // 1 Read the total size of the block
       blockSize = (int) inputStream.readLong();
     } catch (EOFException | CorruptedLogFileException e) {
       // An exception reading any of the above indicates a corrupt block
       // Create a corrupt block by finding the next MAGIC marker or EOF
-      return createCorruptBlock();
+      return createCorruptBlock(blockStartPos);
     }
 
     // We may have had a crash which could have written this block partially
@@ -164,7 +165,7 @@ public class HoodieLogFileReader implements HoodieLogFormat.Reader {
     // block) or EOF. If we did not find either of it, then this block is a corrupted block.
     boolean isCorrupted = isBlockCorrupted(blockSize);
     if (isCorrupted) {
-      return createCorruptBlock();
+      return createCorruptBlock(blockStartPos);
     }
 
     // 2. Read the version for this log format
@@ -253,14 +254,14 @@ public class HoodieLogFileReader implements HoodieLogFormat.Reader {
     return HoodieLogBlockType.values()[type];
   }
 
-  private HoodieLogBlock createCorruptBlock() throws IOException {
-    LOG.info("Log " + logFile + " has a corrupted block at " + inputStream.getPos());
-    long currentPos = inputStream.getPos();
+  private HoodieLogBlock createCorruptBlock(long blockStartPos) throws IOException {
+    LOG.info("Log " + logFile + " has a corrupted block at " + blockStartPos);
+    inputStream.seek(blockStartPos);
     long nextBlockOffset = scanForNextAvailableBlockOffset();
     // Rewind to the initial start and read corrupted bytes till the nextBlockOffset
-    inputStream.seek(currentPos);
+    inputStream.seek(blockStartPos);
     LOG.info("Next available block in " + logFile + " starts at " + nextBlockOffset);
-    int corruptedBlockSize = (int) (nextBlockOffset - currentPos);
+    int corruptedBlockSize = (int) (nextBlockOffset - blockStartPos);
     long contentPosition = inputStream.getPos();
     Option<byte[]> corruptedBytes = HoodieLogBlock.tryReadContent(inputStream, corruptedBlockSize, readBlockLazily);
     HoodieLogBlock.HoodieLogBlockContentLocation logBlockContentLoc =
diff --git a/hudi-common/src/test/java/org/apache/hudi/common/functional/TestHoodieLogFormat.java b/hudi-common/src/test/java/org/apache/hudi/common/functional/TestHoodieLogFormat.java
index 6d084f8e17..f87e5a41b8 100755
--- a/hudi-common/src/test/java/org/apache/hudi/common/functional/TestHoodieLogFormat.java
+++ b/hudi-common/src/test/java/org/apache/hudi/common/functional/TestHoodieLogFormat.java
@@ -700,20 +700,11 @@ public class TestHoodieLogFormat extends HoodieCommonTestHarness {
 
   @Test
   public void testAppendAndReadOnCorruptedLog() throws IOException, URISyntaxException, InterruptedException {
-    Writer writer =
-        HoodieLogFormat.newWriterBuilder().onParentPath(partitionPath).withFileExtension(HoodieLogFile.DELTA_EXTENSION)
-            .withFileId("test-fileid1").overBaseCommit("100").withFs(fs).build();
-    List<IndexedRecord> records = SchemaTestUtil.generateTestRecords(0, 100);
-    Map<HoodieLogBlock.HeaderMetadataType, String> header = new HashMap<>();
-    header.put(HoodieLogBlock.HeaderMetadataType.INSTANT_TIME, "100");
-    header.put(HoodieLogBlock.HeaderMetadataType.SCHEMA, getSimpleSchema().toString());
-    HoodieDataBlock dataBlock = getDataBlock(DEFAULT_DATA_BLOCK_TYPE, records, header);
-    writer.appendBlock(dataBlock);
-    writer.close();
+    HoodieLogFile logFile = addValidBlock("test-fileId1", "100", 100);
 
     // Append some arbit byte[] to the end of the log (mimics a partially written commit)
     fs = FSUtils.getFs(fs.getUri().toString(), fs.getConf());
-    FSDataOutputStream outputStream = fs.append(writer.getLogFile().getPath());
+    FSDataOutputStream outputStream = fs.append(logFile.getPath());
     // create a block with
     outputStream.write(HoodieLogFormat.MAGIC);
     // Write out a length that does not confirm with the content
@@ -728,17 +719,10 @@ public class TestHoodieLogFormat extends HoodieCommonTestHarness {
     outputStream.close();
 
     // Append a proper block that is of the missing length of the corrupted block
-    writer =
-        HoodieLogFormat.newWriterBuilder().onParentPath(partitionPath).withFileExtension(HoodieLogFile.DELTA_EXTENSION)
-            .withFileId("test-fileid1").overBaseCommit("100").withFs(fs).build();
-    records = SchemaTestUtil.generateTestRecords(0, 10);
-    header.put(HoodieLogBlock.HeaderMetadataType.SCHEMA, getSimpleSchema().toString());
-    dataBlock = getDataBlock(DEFAULT_DATA_BLOCK_TYPE, records, header);
-    writer.appendBlock(dataBlock);
-    writer.close();
+    logFile = addValidBlock("test-fileId1", "100", 10);
 
     // First round of reads - we should be able to read the first block and then EOF
-    Reader reader = HoodieLogFormat.newReader(fs, writer.getLogFile(), SchemaTestUtil.getSimpleSchema());
+    Reader reader = HoodieLogFormat.newReader(fs, logFile, SchemaTestUtil.getSimpleSchema());
     assertTrue(reader.hasNext(), "First block should be available");
     reader.next();
     assertTrue(reader.hasNext(), "We should have corrupted block next");
@@ -751,7 +735,7 @@ public class TestHoodieLogFormat extends HoodieCommonTestHarness {
     reader.close();
 
     // Simulate another failure back to back
-    outputStream = fs.append(writer.getLogFile().getPath());
+    outputStream = fs.append(logFile.getPath());
     // create a block with
     outputStream.write(HoodieLogFormat.MAGIC);
     // Write out a length that does not confirm with the content
@@ -766,17 +750,10 @@ public class TestHoodieLogFormat extends HoodieCommonTestHarness {
     outputStream.close();
 
     // Should be able to append a new block
-    writer =
-        HoodieLogFormat.newWriterBuilder().onParentPath(partitionPath).withFileExtension(HoodieLogFile.DELTA_EXTENSION)
-            .withFileId("test-fileid1").overBaseCommit("100").withFs(fs).build();
-    records = SchemaTestUtil.generateTestRecords(0, 100);
-    header.put(HoodieLogBlock.HeaderMetadataType.SCHEMA, getSimpleSchema().toString());
-    dataBlock = getDataBlock(DEFAULT_DATA_BLOCK_TYPE, records, header);
-    writer.appendBlock(dataBlock);
-    writer.close();
+    logFile = addValidBlock("test-fileId1", "100", 100);
 
     // Second round of reads - we should be able to read the first and last block
-    reader = HoodieLogFormat.newReader(fs, writer.getLogFile(), SchemaTestUtil.getSimpleSchema());
+    reader = HoodieLogFormat.newReader(fs, logFile, SchemaTestUtil.getSimpleSchema());
     assertTrue(reader.hasNext(), "First block should be available");
     reader.next();
     assertTrue(reader.hasNext(), "We should get the 1st corrupted block next");
@@ -792,6 +769,48 @@ public class TestHoodieLogFormat extends HoodieCommonTestHarness {
     reader.close();
   }
 
+  @Test
+  public void testMissingBlockExceptMagicBytes() throws IOException, URISyntaxException, InterruptedException {
+    HoodieLogFile logFile = addValidBlock("test-fileId1", "100", 100);
+
+    // Append just magic bytes and move onto next block
+    fs = FSUtils.getFs(fs.getUri().toString(), fs.getConf());
+    FSDataOutputStream outputStream = fs.append(logFile.getPath());
+    outputStream.write(HoodieLogFormat.MAGIC);
+    outputStream.flush();
+    outputStream.close();
+
+    // Append a proper block
+    logFile = addValidBlock("test-fileId1", "100", 10);
+
+    // First round of reads - we should be able to read the first block and then EOF
+    Reader reader = HoodieLogFormat.newReader(fs, logFile, SchemaTestUtil.getSimpleSchema());
+    assertTrue(reader.hasNext(), "First block should be available");
+    reader.next();
+    assertTrue(reader.hasNext(), "We should have corrupted block next");
+    HoodieLogBlock block = reader.next();
+    assertEquals(HoodieLogBlockType.CORRUPT_BLOCK, block.getBlockType(), "The read block should be a corrupt block");
+    assertTrue(reader.hasNext(), "Third block should be available");
+    reader.next();
+    assertFalse(reader.hasNext(), "There should be no more block left");
+
+    reader.close();
+  }
+
+  private HoodieLogFile addValidBlock(String fileId, String commitTime, int numRecords) throws IOException, URISyntaxException, InterruptedException {
+    Writer writer =
+        HoodieLogFormat.newWriterBuilder().onParentPath(partitionPath).withFileExtension(HoodieLogFile.DELTA_EXTENSION)
+            .withFileId(fileId).overBaseCommit(commitTime).withFs(fs).build();
+    List<IndexedRecord> records = SchemaTestUtil.generateTestRecords(0, numRecords);
+    Map<HoodieLogBlock.HeaderMetadataType, String> header = new HashMap<>();
+    header.put(HoodieLogBlock.HeaderMetadataType.INSTANT_TIME, "100");
+    header.put(HoodieLogBlock.HeaderMetadataType.SCHEMA, getSimpleSchema().toString());
+    HoodieDataBlock dataBlock = getDataBlock(DEFAULT_DATA_BLOCK_TYPE, records, header);
+    writer.appendBlock(dataBlock);
+    writer.close();
+    return writer.getLogFile();
+  }
+
   @Test
   public void testValidateCorruptBlockEndPosition() throws IOException, URISyntaxException, InterruptedException {
     Writer writer =