You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hudi.apache.org by si...@apache.org on 2021/11/17 18:59:51 UTC

[hudi] branch master updated: [HUDI-2716] InLineFS support for S3FS logs (#3977)

This is an automated email from the ASF dual-hosted git repository.

sivabalan pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/hudi.git


The following commit(s) were added to refs/heads/master by this push:
     new f715cf6  [HUDI-2716] InLineFS support for S3FS logs (#3977)
f715cf6 is described below

commit f715cf607f0692e6500a295a0ab5604f3979df3a
Author: Manoj Govindassamy <ma...@gmail.com>
AuthorDate: Wed Nov 17 10:59:38 2021 -0800

    [HUDI-2716] InLineFS support for S3FS logs (#3977)
---
 .../hudi/common/fs/inline/InLineFSUtils.java       | 65 +++++++++++++---------
 .../hudi/common/fs/inline/InLineFileSystem.java    |  4 +-
 .../common/table/log/HoodieLogFormatReader.java    |  5 +-
 .../common/fs/inline/TestInLineFileSystem.java     | 58 +++++++++++++++++++
 4 files changed, 104 insertions(+), 28 deletions(-)

diff --git a/hudi-common/src/main/java/org/apache/hudi/common/fs/inline/InLineFSUtils.java b/hudi-common/src/main/java/org/apache/hudi/common/fs/inline/InLineFSUtils.java
index e4570f9..a2c60bc 100644
--- a/hudi-common/src/main/java/org/apache/hudi/common/fs/inline/InLineFSUtils.java
+++ b/hudi-common/src/main/java/org/apache/hudi/common/fs/inline/InLineFSUtils.java
@@ -19,6 +19,9 @@
 package org.apache.hudi.common.fs.inline;
 
 import org.apache.hadoop.fs.Path;
+import org.apache.hudi.common.util.ValidationUtils;
+
+import java.io.File;
 
 /**
  * Utils to parse InLineFileSystem paths.
@@ -29,46 +32,58 @@ import org.apache.hadoop.fs.Path;
 public class InLineFSUtils {
   private static final String START_OFFSET_STR = "start_offset";
   private static final String LENGTH_STR = "length";
+  private static final String PATH_SEPARATOR = "/";
+  private static final String SCHEME_SEPARATOR = ":";
   private static final String EQUALS_STR = "=";
+  private static final String LOCAL_FILESYSTEM_SCHEME = "file";
 
   /**
-   * Fetch inline file path from outer path.
-   * Eg
-   * Input:
-   * Path = s3a://file1, origScheme: file, startOffset = 20, length = 40
-   * Output: "inlinefs:/file1/s3a/?start_offset=20&length=40"
+   * Get the InlineFS Path for a given schema and its Path.
+   * <p>
+   * Examples:
+   * Input Path: s3a://file1, origScheme: file, startOffset = 20, length = 40
+   * Output: "inlinefs://file1/s3a/?start_offset=20&length=40"
    *
-   * @param outerPath
-   * @param origScheme
-   * @param inLineStartOffset
-   * @param inLineLength
-   * @return
+   * @param outerPath         The outer file Path
+   * @param origScheme        The file schema
+   * @param inLineStartOffset Start offset for the inline file
+   * @param inLineLength      Length for the inline file
+   * @return InlineFS Path for the requested outer path and schema
    */
   public static Path getInlineFilePath(Path outerPath, String origScheme, long inLineStartOffset, long inLineLength) {
-    String subPath = outerPath.toString().substring(outerPath.toString().indexOf(":") + 1);
+    final String subPath = new File(outerPath.toString().substring(outerPath.toString().indexOf(":") + 1)).getPath();
     return new Path(
-        InLineFileSystem.SCHEME + "://" + subPath + "/" + origScheme
-            + "/" + "?" + START_OFFSET_STR + EQUALS_STR + inLineStartOffset
+        InLineFileSystem.SCHEME + SCHEME_SEPARATOR + PATH_SEPARATOR + subPath + PATH_SEPARATOR + origScheme
+            + PATH_SEPARATOR + "?" + START_OFFSET_STR + EQUALS_STR + inLineStartOffset
             + "&" + LENGTH_STR + EQUALS_STR + inLineLength
     );
   }
 
   /**
-   * Inline file format
-   * "inlinefs://<path_to_outer_file>/<outer_file_scheme>/?start_offset=start_offset>&length=<length>"
-   * Outer File format
-   * "<outer_file_scheme>://<path_to_outer_file>"
+   * InlineFS Path format:
+   * "inlinefs://path/to/outer/file/outer_file_schema/?start_offset=start_offset>&length=<length>"
    * <p>
-   * Eg input : "inlinefs://file1/sa3/?start_offset=20&length=40".
-   * Output : "sa3://file1"
+   * Outer File Path format:
+   * "outer_file_schema://path/to/outer/file"
+   * <p>
+   * Example
+   * Input: "inlinefs://file1/s3a/?start_offset=20&length=40".
+   * Output: "s3a://file1"
    *
-   * @param inlinePath inline file system path
-   * @return
+   * @param inlineFSPath InLineFS Path to get the outer file Path
+   * @return Outer file Path from the InLineFS Path
    */
-  public static Path getOuterfilePathFromInlinePath(Path inlinePath) {
-    String scheme = inlinePath.getParent().getName();
-    Path basePath = inlinePath.getParent().getParent();
-    return new Path(basePath.toString().replaceFirst(InLineFileSystem.SCHEME, scheme));
+  public static Path getOuterFilePathFromInlinePath(Path inlineFSPath) {
+    final String scheme = inlineFSPath.getParent().getName();
+    final Path basePath = inlineFSPath.getParent().getParent();
+    ValidationUtils.checkArgument(basePath.toString().contains(SCHEME_SEPARATOR),
+        "Invalid InLineFSPath: " + inlineFSPath);
+
+    final String pathExceptScheme = basePath.toString().substring(basePath.toString().indexOf(SCHEME_SEPARATOR) + 1);
+    final String fullPath = scheme + SCHEME_SEPARATOR
+        + (scheme.equals(LOCAL_FILESYSTEM_SCHEME) ? PATH_SEPARATOR : "")
+        + pathExceptScheme;
+    return new Path(fullPath);
   }
 
   /**
diff --git a/hudi-common/src/main/java/org/apache/hudi/common/fs/inline/InLineFileSystem.java b/hudi-common/src/main/java/org/apache/hudi/common/fs/inline/InLineFileSystem.java
index 4c693c5..712b6c7 100644
--- a/hudi-common/src/main/java/org/apache/hudi/common/fs/inline/InLineFileSystem.java
+++ b/hudi-common/src/main/java/org/apache/hudi/common/fs/inline/InLineFileSystem.java
@@ -63,7 +63,7 @@ public class InLineFileSystem extends FileSystem {
 
   @Override
   public FSDataInputStream open(Path inlinePath, int bufferSize) throws IOException {
-    Path outerPath = InLineFSUtils.getOuterfilePathFromInlinePath(inlinePath);
+    Path outerPath = InLineFSUtils.getOuterFilePathFromInlinePath(inlinePath);
     FileSystem outerFs = outerPath.getFileSystem(conf);
     FSDataInputStream outerStream = outerFs.open(outerPath, bufferSize);
     return new InLineFsDataInputStream(InLineFSUtils.startOffset(inlinePath), outerStream, InLineFSUtils.length(inlinePath));
@@ -80,7 +80,7 @@ public class InLineFileSystem extends FileSystem {
 
   @Override
   public FileStatus getFileStatus(Path inlinePath) throws IOException {
-    Path outerPath = InLineFSUtils.getOuterfilePathFromInlinePath(inlinePath);
+    Path outerPath = InLineFSUtils.getOuterFilePathFromInlinePath(inlinePath);
     FileSystem outerFs = outerPath.getFileSystem(conf);
     FileStatus status = outerFs.getFileStatus(outerPath);
     FileStatus toReturn = new FileStatus(InLineFSUtils.length(inlinePath), status.isDirectory(), status.getReplication(), status.getBlockSize(),
diff --git a/hudi-common/src/main/java/org/apache/hudi/common/table/log/HoodieLogFormatReader.java b/hudi-common/src/main/java/org/apache/hudi/common/table/log/HoodieLogFormatReader.java
index 36fa187..febdbf8 100644
--- a/hudi-common/src/main/java/org/apache/hudi/common/table/log/HoodieLogFormatReader.java
+++ b/hudi-common/src/main/java/org/apache/hudi/common/table/log/HoodieLogFormatReader.java
@@ -44,6 +44,7 @@ public class HoodieLogFormatReader implements HoodieLogFormat.Reader {
   private final Schema readerSchema;
   private final boolean readBlocksLazily;
   private final boolean reverseLogReader;
+  private final boolean enableInLineReading;
   private int bufferSize;
 
   private static final Logger LOG = LogManager.getLogger(HoodieLogFormatReader.class);
@@ -62,6 +63,7 @@ public class HoodieLogFormatReader implements HoodieLogFormat.Reader {
     this.reverseLogReader = reverseLogReader;
     this.bufferSize = bufferSize;
     this.prevReadersInOpenState = new ArrayList<>();
+    this.enableInLineReading = enableInlineReading;
     if (logFiles.size() > 0) {
       HoodieLogFile nextLogFile = logFiles.remove(0);
       this.currentReader = new HoodieLogFileReader(fs, nextLogFile, readerSchema, bufferSize, readBlocksLazily, false, enableInlineReading);
@@ -104,7 +106,8 @@ public class HoodieLogFormatReader implements HoodieLogFormat.Reader {
           this.prevReadersInOpenState.add(currentReader);
         }
         this.currentReader =
-            new HoodieLogFileReader(fs, nextLogFile, readerSchema, bufferSize, readBlocksLazily, false);
+            new HoodieLogFileReader(fs, nextLogFile, readerSchema, bufferSize, readBlocksLazily, false,
+                this.enableInLineReading);
       } catch (IOException io) {
         throw new HoodieIOException("unable to initialize read with log file ", io);
       }
diff --git a/hudi-common/src/test/java/org/apache/hudi/common/fs/inline/TestInLineFileSystem.java b/hudi-common/src/test/java/org/apache/hudi/common/fs/inline/TestInLineFileSystem.java
index 4553aa5..92f83aa 100644
--- a/hudi-common/src/test/java/org/apache/hudi/common/fs/inline/TestInLineFileSystem.java
+++ b/hudi-common/src/test/java/org/apache/hudi/common/fs/inline/TestInLineFileSystem.java
@@ -296,6 +296,64 @@ public class TestInLineFileSystem {
     }, "Should have thrown exception");
   }
 
+  static class TestFSPath {
+    final Path inputPath;
+    final Path expectedInLineFSPath;
+    final Path transformedInputPath;
+
+    TestFSPath(final Path inputPath, final Path expectedInLineFSPath, final Path transformedInputPath) {
+      this.inputPath = inputPath;
+      this.expectedInLineFSPath = expectedInLineFSPath;
+      this.transformedInputPath = transformedInputPath;
+    }
+  }
+
+  @Test
+  public void testInLineFSPathConversions() {
+    final List<TestFSPath> expectedInLinePaths = Arrays.asList(
+        new TestFSPath(
+            new Path("/zero/524bae7e-f01d-47ae-b7cd-910400a81336"),
+            new Path("inlinefs://zero/524bae7e-f01d-47ae-b7cd-910400a81336/file/?start_offset=10&length=10"),
+            new Path("file:/zero/524bae7e-f01d-47ae-b7cd-910400a81336")),
+        new TestFSPath(
+            new Path("file:/one/524bae7e-f01d-47ae-b7cd-910400a81336"),
+            new Path("inlinefs://one/524bae7e-f01d-47ae-b7cd-910400a81336/file/?start_offset=10&length=10"),
+            new Path("file:/one/524bae7e-f01d-47ae-b7cd-910400a81336")),
+        new TestFSPath(
+            new Path("file://two/524bae7e-f01d-47ae-b7cd-910400a81336"),
+            new Path("inlinefs://two/524bae7e-f01d-47ae-b7cd-910400a81336/file/?start_offset=10&length=10"),
+            new Path("file:/two/524bae7e-f01d-47ae-b7cd-910400a81336")),
+        new TestFSPath(
+            new Path("hdfs://three/524bae7e-f01d-47ae-b7cd-910400a81336"),
+            new Path("inlinefs://three/524bae7e-f01d-47ae-b7cd-910400a81336/hdfs/?start_offset=10&length=10"),
+            new Path("hdfs://three/524bae7e-f01d-47ae-b7cd-910400a81336")),
+        new TestFSPath(
+            new Path("s3://four/524bae7e-f01d-47ae-b7cd-910400a81336"),
+            new Path("inlinefs://four/524bae7e-f01d-47ae-b7cd-910400a81336/s3/?start_offset=10&length=10"),
+            new Path("s3://four/524bae7e-f01d-47ae-b7cd-910400a81336")),
+        new TestFSPath(
+            new Path("s3a://five/524bae7e-f01d-47ae-b7cd-910400a81336"),
+            new Path("inlinefs://five/524bae7e-f01d-47ae-b7cd-910400a81336/s3a/?start_offset=10&length=10"),
+            new Path("s3a://five/524bae7e-f01d-47ae-b7cd-910400a81336"))
+    );
+
+    for (TestFSPath entry : expectedInLinePaths) {
+      final Path inputPath = entry.inputPath;
+      final Path expectedInLineFSPath = entry.expectedInLineFSPath;
+      final Path expectedTransformedInputPath = entry.transformedInputPath;
+
+      String scheme = "file";
+      if (inputPath.toString().contains(":")) {
+        scheme = inputPath.toString().split(":")[0];
+      }
+      final Path actualInLineFSPath = InLineFSUtils.getInlineFilePath(inputPath, scheme, 10, 10);
+      assertEquals(expectedInLineFSPath, actualInLineFSPath);
+
+      final Path actualOuterFilePath = InLineFSUtils.getOuterFilePathFromInlinePath(actualInLineFSPath);
+      assertEquals(expectedTransformedInputPath, actualOuterFilePath);
+    }
+  }
+
   @Test
   public void testExists() throws IOException {
     Path inlinePath = getRandomInlinePath();