You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hudi.apache.org by si...@apache.org on 2023/01/25 07:52:42 UTC

[hudi] branch master updated: [HUDI-2118] Skip checking corrupt log blocks for transactional write file systems (#6830)

This is an automated email from the ASF dual-hosted git repository.

sivabalan pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/hudi.git


The following commit(s) were added to refs/heads/master by this push:
     new 65044d38fbe [HUDI-2118] Skip checking corrupt log blocks for transactional write file systems (#6830)
65044d38fbe is described below

commit 65044d38fbefc6af644be02197bb8b1911746202
Author: Bowen Zhu <bo...@gmail.com>
AuthorDate: Tue Jan 24 23:52:33 2023 -0800

    [HUDI-2118] Skip checking corrupt log blocks for transactional write file systems (#6830)
    
    Skip checking corrupt log blocks for transactional write file systems
    
    Impact
    The benchmark results show corrupted block check could be 100's of msecs for larger file sizes.
    This change would boost block read by 100's of ms per block.
    
    Co-authored-by: sivabalan <n....@gmail.com>
---
 .../org/apache/hudi/common/fs/StorageSchemes.java  | 67 +++++++++++++--------
 .../hudi/common/table/log/HoodieLogFileReader.java |  7 +++
 .../common/functional/TestHoodieLogFormat.java     | 70 ++++++++++++++++++++++
 3 files changed, 118 insertions(+), 26 deletions(-)

diff --git a/hudi-common/src/main/java/org/apache/hudi/common/fs/StorageSchemes.java b/hudi-common/src/main/java/org/apache/hudi/common/fs/StorageSchemes.java
index 9b5af8bc648..cb8b4c51181 100644
--- a/hudi-common/src/main/java/org/apache/hudi/common/fs/StorageSchemes.java
+++ b/hudi-common/src/main/java/org/apache/hudi/common/fs/StorageSchemes.java
@@ -25,62 +25,65 @@ import java.util.Arrays;
  */
 public enum StorageSchemes {
   // Local filesystem
-  FILE("file", false),
+  FILE("file", false, false),
   // Hadoop File System
-  HDFS("hdfs", true),
+  HDFS("hdfs", true, false),
   // Baidu Advanced File System
-  AFS("afs", true),
+  AFS("afs", true, null),
   // Mapr File System
-  MAPRFS("maprfs", true),
+  MAPRFS("maprfs", true, null),
   // Apache Ignite FS
-  IGNITE("igfs", true),
+  IGNITE("igfs", true, null),
   // AWS S3
-  S3A("s3a", false), S3("s3", false),
+  S3A("s3a", false, true), S3("s3", false, true),
   // Google Cloud Storage
-  GCS("gs", false),
+  GCS("gs", false, true),
   // Azure WASB
-  WASB("wasb", false), WASBS("wasbs", false),
+  WASB("wasb", false, null), WASBS("wasbs", false, null),
   // Azure ADLS
-  ADL("adl", false),
+  ADL("adl", false, null),
   // Azure ADLS Gen2
-  ABFS("abfs", false), ABFSS("abfss", false),
+  ABFS("abfs", false, null), ABFSS("abfss", false, null),
   // Aliyun OSS
-  OSS("oss", false),
+  OSS("oss", false, null),
   // View FS for federated setups. If federating across cloud stores, then append support is false
-  VIEWFS("viewfs", true),
+  VIEWFS("viewfs", true, null),
   //ALLUXIO
-  ALLUXIO("alluxio", false),
+  ALLUXIO("alluxio", false, null),
   // Tencent Cloud Object Storage
-  COSN("cosn", false),
+  COSN("cosn", false, null),
   // Tencent Cloud HDFS
-  CHDFS("ofs", true),
+  CHDFS("ofs", true, null),
   // Tencent Cloud CacheFileSystem
-  GOOSEFS("gfs", false),
+  GOOSEFS("gfs", false, null),
   // Databricks file system
-  DBFS("dbfs", false),
+  DBFS("dbfs", false, null),
   // IBM Cloud Object Storage
-  COS("cos", false),
+  COS("cos", false, null),
   // Huawei Cloud Object Storage
-  OBS("obs", false),
+  OBS("obs", false, null),
   // Kingsoft Standard Storage ks3
-  KS3("ks3", false),
+  KS3("ks3", false, null),
   // JuiceFileSystem
-  JFS("jfs", true),
+  JFS("jfs", true, null),
   // Baidu Object Storage
-  BOS("bos", false),
+  BOS("bos", false, null),
   // Oracle Cloud Infrastructure Object Storage
-  OCI("oci", false),
+  OCI("oci", false, null),
   // Volcengine Object Storage
-  TOS("tos", false),
+  TOS("tos", false, null),
   // Volcengine Cloud HDFS
-  CFS("cfs", true);
+  CFS("cfs", true, null);
 
   private String scheme;
   private boolean supportsAppend;
+  // null for uncertain if write is transactional, please update this for each FS
+  private Boolean isWriteTransactional;
 
-  StorageSchemes(String scheme, boolean supportsAppend) {
+  StorageSchemes(String scheme, boolean supportsAppend, Boolean isWriteTransactional) {
     this.scheme = scheme;
     this.supportsAppend = supportsAppend;
+    this.isWriteTransactional = isWriteTransactional;
   }
 
   public String getScheme() {
@@ -91,6 +94,10 @@ public enum StorageSchemes {
     return supportsAppend;
   }
 
+  public boolean isWriteTransactional() {
+    return isWriteTransactional != null && isWriteTransactional;
+  }
+
   public static boolean isSchemeSupported(String scheme) {
     return Arrays.stream(values()).anyMatch(s -> s.getScheme().equals(scheme));
   }
@@ -101,4 +108,12 @@ public enum StorageSchemes {
     }
     return Arrays.stream(StorageSchemes.values()).anyMatch(s -> s.supportsAppend() && s.scheme.equals(scheme));
   }
+
+  public static boolean isWriteTransactional(String scheme) {
+    if (!isSchemeSupported(scheme)) {
+      throw new IllegalArgumentException("Unsupported scheme :" + scheme);
+    }
+
+    return Arrays.stream(StorageSchemes.values()).anyMatch(s -> s.isWriteTransactional() && s.scheme.equals(scheme));
+  }
 }
diff --git a/hudi-common/src/main/java/org/apache/hudi/common/table/log/HoodieLogFileReader.java b/hudi-common/src/main/java/org/apache/hudi/common/table/log/HoodieLogFileReader.java
index adbc0a1e1b3..0542a3b7da0 100644
--- a/hudi-common/src/main/java/org/apache/hudi/common/table/log/HoodieLogFileReader.java
+++ b/hudi-common/src/main/java/org/apache/hudi/common/table/log/HoodieLogFileReader.java
@@ -21,6 +21,7 @@ package org.apache.hudi.common.table.log;
 import org.apache.hudi.common.fs.BoundedFsDataInputStream;
 import org.apache.hudi.common.fs.FSUtils;
 import org.apache.hudi.common.fs.SchemeAwareFSDataInputStream;
+import org.apache.hudi.common.fs.StorageSchemes;
 import org.apache.hudi.common.fs.TimedFSDataInputStream;
 import org.apache.hudi.common.model.HoodieLogFile;
 import org.apache.hudi.common.model.HoodieRecord;
@@ -72,6 +73,7 @@ public class HoodieLogFileReader implements HoodieLogFormat.Reader {
   private static final int BLOCK_SCAN_READ_BUFFER_SIZE = 1024 * 1024; // 1 MB
   private static final Logger LOG = LogManager.getLogger(HoodieLogFileReader.class);
 
+  private final FileSystem fs;
   private final Configuration hadoopConf;
   private final FSDataInputStream inputStream;
   private final HoodieLogFile logFile;
@@ -107,6 +109,7 @@ public class HoodieLogFileReader implements HoodieLogFormat.Reader {
   public HoodieLogFileReader(FileSystem fs, HoodieLogFile logFile, Schema readerSchema, int bufferSize,
                              boolean readBlockLazily, boolean reverseReader, boolean enableRecordLookups,
                              String keyField, InternalSchema internalSchema) throws IOException {
+    this.fs = fs;
     this.hadoopConf = fs.getConf();
     // NOTE: We repackage {@code HoodieLogFile} here to make sure that the provided path
     //       is prefixed with an appropriate scheme given that we're not propagating the FS
@@ -282,6 +285,10 @@ public class HoodieLogFileReader implements HoodieLogFormat.Reader {
   }
 
   private boolean isBlockCorrupted(int blocksize) throws IOException {
+    if (StorageSchemes.isWriteTransactional(fs.getScheme())) {
+      // skip block corrupt check if writes are transactional. see https://issues.apache.org/jira/browse/HUDI-2118
+      return false;
+    }
     long currentPos = inputStream.getPos();
     long blockSizeFromFooter;
 
diff --git a/hudi-common/src/test/java/org/apache/hudi/common/functional/TestHoodieLogFormat.java b/hudi-common/src/test/java/org/apache/hudi/common/functional/TestHoodieLogFormat.java
index 4c12c5e8aa5..a8828514eeb 100755
--- a/hudi-common/src/test/java/org/apache/hudi/common/functional/TestHoodieLogFormat.java
+++ b/hudi-common/src/test/java/org/apache/hudi/common/functional/TestHoodieLogFormat.java
@@ -85,9 +85,11 @@ import org.junit.jupiter.params.provider.Arguments;
 import org.junit.jupiter.params.provider.EnumSource;
 import org.junit.jupiter.params.provider.MethodSource;
 import org.junit.jupiter.params.provider.ValueSource;
+import org.mockito.Mockito;
 
 import java.io.IOException;
 import java.io.UncheckedIOException;
+import java.lang.reflect.Field;
 import java.net.URISyntaxException;
 import java.util.ArrayList;
 import java.util.Arrays;
@@ -112,6 +114,7 @@ import static org.junit.jupiter.api.Assertions.assertSame;
 import static org.junit.jupiter.api.Assertions.assertThrows;
 import static org.junit.jupiter.api.Assertions.assertTrue;
 import static org.junit.jupiter.params.provider.Arguments.arguments;
+import static org.mockito.Mockito.when;
 
 /**
  * Tests hoodie log format {@link HoodieLogFormat}.
@@ -914,6 +917,34 @@ public class TestHoodieLogFormat extends HoodieCommonTestHarness {
     reader.close();
   }
 
+  @Test
+  public void testSkipCorruptedCheck() throws Exception {
+    // normal case: if the block is corrupted, we should be able to read back a corrupted block
+    Reader reader1 = createCorruptedFile("test-fileid1");
+    HoodieLogBlock block = reader1.next();
+    assertEquals(HoodieLogBlockType.CORRUPT_BLOCK, block.getBlockType(), "The read block should be a corrupt block");
+    reader1.close();
+
+    // as we can't mock a private method or directly test it, we are going this route.
+    // So adding a corrupted block which ideally should have been skipped for write transactional system. and hence when we call next() on log block reader, it will fail.
+    Reader reader2 = createCorruptedFile("test-fileid2");
+    assertTrue(reader2.hasNext(), "We should have corrupted block next");
+
+    // mock the fs to be GCS to skip isBlockCorrupted() check
+    Field f1 = reader2.getClass().getDeclaredField("fs");
+    f1.setAccessible(true);
+    FileSystem spyfs = Mockito.spy(fs);
+    when(spyfs.getScheme()).thenReturn("gs");
+    f1.set(reader2, spyfs);
+
+    // except an exception for block type since the block is corrupted
+    Exception exception = assertThrows(IllegalArgumentException.class, () -> {
+      reader2.next();
+    });
+    assertTrue(exception.getMessage().contains("Invalid block byte type found"));
+    reader2.close();
+  }
+
   @Test
   public void testMissingBlockExceptMagicBytes() throws IOException, URISyntaxException, InterruptedException {
     HoodieLogFile logFile = addValidBlock("test-fileId1", "100", 100);
@@ -2648,4 +2679,43 @@ public class TestHoodieLogFormat extends HoodieCommonTestHarness {
     copy.sort(Comparator.comparing(r -> ((GenericRecord) r).get(HoodieRecord.RECORD_KEY_METADATA_FIELD).toString()));
     return copy;
   }
+
+  private HoodieLogFormat.Reader createCorruptedFile(String fileId) throws Exception {
+    // block is corrupted, but check is skipped.
+    Writer writer =
+        HoodieLogFormat.newWriterBuilder().onParentPath(partitionPath).withFileExtension(HoodieLogFile.DELTA_EXTENSION)
+            .withFileId(fileId).overBaseCommit("100").withFs(fs).build();
+    List<IndexedRecord> records = SchemaTestUtil.generateTestRecords(0, 100);
+    Map<HoodieLogBlock.HeaderMetadataType, String> header = new HashMap<>();
+    header.put(HoodieLogBlock.HeaderMetadataType.INSTANT_TIME, "100");
+    header.put(HoodieLogBlock.HeaderMetadataType.SCHEMA, getSimpleSchema().toString());
+    HoodieDataBlock dataBlock = getDataBlock(DEFAULT_DATA_BLOCK_TYPE, records, header);
+    writer.appendBlock(dataBlock);
+    writer.close();
+
+    // Append some arbit byte[] to thee end of the log (mimics a partially written commit)
+    fs = FSUtils.getFs(fs.getUri().toString(), fs.getConf());
+    FSDataOutputStream outputStream = fs.append(writer.getLogFile().getPath());
+    // create a block with
+    outputStream.write(HoodieLogFormat.MAGIC);
+    // Write out a length that does not confirm with the content
+    outputStream.writeLong(473);
+    outputStream.writeInt(HoodieLogFormat.CURRENT_VERSION);
+    outputStream.writeInt(10000); // an invalid block type
+
+    // Write out a length that does not confirm with the content
+    outputStream.writeLong(400);
+    // Write out incomplete content
+    outputStream.write("something-random".getBytes());
+    outputStream.flush();
+    outputStream.close();
+
+    // First round of reads - we should be able to read the first block and then EOF
+    Reader reader = HoodieLogFormat.newReader(fs, writer.getLogFile(), SchemaTestUtil.getSimpleSchema());
+
+    assertTrue(reader.hasNext(), "First block should be available");
+    reader.next();
+
+    return reader;
+  }
 }