You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hudi.apache.org by si...@apache.org on 2021/11/17 18:59:51 UTC
[hudi] branch master updated: [HUDI-2716] InLineFS support for S3FS logs (#3977)
This is an automated email from the ASF dual-hosted git repository.
sivabalan pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/hudi.git
The following commit(s) were added to refs/heads/master by this push:
new f715cf6 [HUDI-2716] InLineFS support for S3FS logs (#3977)
f715cf6 is described below
commit f715cf607f0692e6500a295a0ab5604f3979df3a
Author: Manoj Govindassamy <ma...@gmail.com>
AuthorDate: Wed Nov 17 10:59:38 2021 -0800
[HUDI-2716] InLineFS support for S3FS logs (#3977)
---
.../hudi/common/fs/inline/InLineFSUtils.java | 65 +++++++++++++---------
.../hudi/common/fs/inline/InLineFileSystem.java | 4 +-
.../common/table/log/HoodieLogFormatReader.java | 5 +-
.../common/fs/inline/TestInLineFileSystem.java | 58 +++++++++++++++++++
4 files changed, 104 insertions(+), 28 deletions(-)
diff --git a/hudi-common/src/main/java/org/apache/hudi/common/fs/inline/InLineFSUtils.java b/hudi-common/src/main/java/org/apache/hudi/common/fs/inline/InLineFSUtils.java
index e4570f9..a2c60bc 100644
--- a/hudi-common/src/main/java/org/apache/hudi/common/fs/inline/InLineFSUtils.java
+++ b/hudi-common/src/main/java/org/apache/hudi/common/fs/inline/InLineFSUtils.java
@@ -19,6 +19,9 @@
package org.apache.hudi.common.fs.inline;
import org.apache.hadoop.fs.Path;
+import org.apache.hudi.common.util.ValidationUtils;
+
+import java.io.File;
/**
* Utils to parse InLineFileSystem paths.
@@ -29,46 +32,58 @@ import org.apache.hadoop.fs.Path;
public class InLineFSUtils {
private static final String START_OFFSET_STR = "start_offset";
private static final String LENGTH_STR = "length";
+ private static final String PATH_SEPARATOR = "/";
+ private static final String SCHEME_SEPARATOR = ":";
private static final String EQUALS_STR = "=";
+ private static final String LOCAL_FILESYSTEM_SCHEME = "file";
/**
- * Fetch inline file path from outer path.
- * Eg
- * Input:
- * Path = s3a://file1, origScheme: file, startOffset = 20, length = 40
- * Output: "inlinefs:/file1/s3a/?start_offset=20&length=40"
+ * Get the InlineFS Path for a given schema and its Path.
+ * <p>
+ * Examples:
+ * Input Path: s3a://file1, origScheme: file, startOffset = 20, length = 40
+ * Output: "inlinefs://file1/s3a/?start_offset=20&length=40"
*
- * @param outerPath
- * @param origScheme
- * @param inLineStartOffset
- * @param inLineLength
- * @return
+ * @param outerPath The outer file Path
+ * @param origScheme The file schema
+ * @param inLineStartOffset Start offset for the inline file
+ * @param inLineLength Length for the inline file
+ * @return InlineFS Path for the requested outer path and schema
*/
public static Path getInlineFilePath(Path outerPath, String origScheme, long inLineStartOffset, long inLineLength) {
- String subPath = outerPath.toString().substring(outerPath.toString().indexOf(":") + 1);
+ final String subPath = new File(outerPath.toString().substring(outerPath.toString().indexOf(":") + 1)).getPath();
return new Path(
- InLineFileSystem.SCHEME + "://" + subPath + "/" + origScheme
- + "/" + "?" + START_OFFSET_STR + EQUALS_STR + inLineStartOffset
+ InLineFileSystem.SCHEME + SCHEME_SEPARATOR + PATH_SEPARATOR + subPath + PATH_SEPARATOR + origScheme
+ + PATH_SEPARATOR + "?" + START_OFFSET_STR + EQUALS_STR + inLineStartOffset
+ "&" + LENGTH_STR + EQUALS_STR + inLineLength
);
}
/**
- * Inline file format
- * "inlinefs://<path_to_outer_file>/<outer_file_scheme>/?start_offset=start_offset>&length=<length>"
- * Outer File format
- * "<outer_file_scheme>://<path_to_outer_file>"
+ * InlineFS Path format:
+ * "inlinefs://path/to/outer/file/outer_file_schema/?start_offset=start_offset>&length=<length>"
* <p>
- * Eg input : "inlinefs://file1/sa3/?start_offset=20&length=40".
- * Output : "sa3://file1"
+ * Outer File Path format:
+ * "outer_file_schema://path/to/outer/file"
+ * <p>
+ * Example
+ * Input: "inlinefs://file1/s3a/?start_offset=20&length=40".
+ * Output: "s3a://file1"
*
- * @param inlinePath inline file system path
- * @return
+ * @param inlineFSPath InLineFS Path to get the outer file Path
+ * @return Outer file Path from the InLineFS Path
*/
- public static Path getOuterfilePathFromInlinePath(Path inlinePath) {
- String scheme = inlinePath.getParent().getName();
- Path basePath = inlinePath.getParent().getParent();
- return new Path(basePath.toString().replaceFirst(InLineFileSystem.SCHEME, scheme));
+ public static Path getOuterFilePathFromInlinePath(Path inlineFSPath) {
+ final String scheme = inlineFSPath.getParent().getName();
+ final Path basePath = inlineFSPath.getParent().getParent();
+ ValidationUtils.checkArgument(basePath.toString().contains(SCHEME_SEPARATOR),
+ "Invalid InLineFSPath: " + inlineFSPath);
+
+ final String pathExceptScheme = basePath.toString().substring(basePath.toString().indexOf(SCHEME_SEPARATOR) + 1);
+ final String fullPath = scheme + SCHEME_SEPARATOR
+ + (scheme.equals(LOCAL_FILESYSTEM_SCHEME) ? PATH_SEPARATOR : "")
+ + pathExceptScheme;
+ return new Path(fullPath);
}
/**
diff --git a/hudi-common/src/main/java/org/apache/hudi/common/fs/inline/InLineFileSystem.java b/hudi-common/src/main/java/org/apache/hudi/common/fs/inline/InLineFileSystem.java
index 4c693c5..712b6c7 100644
--- a/hudi-common/src/main/java/org/apache/hudi/common/fs/inline/InLineFileSystem.java
+++ b/hudi-common/src/main/java/org/apache/hudi/common/fs/inline/InLineFileSystem.java
@@ -63,7 +63,7 @@ public class InLineFileSystem extends FileSystem {
@Override
public FSDataInputStream open(Path inlinePath, int bufferSize) throws IOException {
- Path outerPath = InLineFSUtils.getOuterfilePathFromInlinePath(inlinePath);
+ Path outerPath = InLineFSUtils.getOuterFilePathFromInlinePath(inlinePath);
FileSystem outerFs = outerPath.getFileSystem(conf);
FSDataInputStream outerStream = outerFs.open(outerPath, bufferSize);
return new InLineFsDataInputStream(InLineFSUtils.startOffset(inlinePath), outerStream, InLineFSUtils.length(inlinePath));
@@ -80,7 +80,7 @@ public class InLineFileSystem extends FileSystem {
@Override
public FileStatus getFileStatus(Path inlinePath) throws IOException {
- Path outerPath = InLineFSUtils.getOuterfilePathFromInlinePath(inlinePath);
+ Path outerPath = InLineFSUtils.getOuterFilePathFromInlinePath(inlinePath);
FileSystem outerFs = outerPath.getFileSystem(conf);
FileStatus status = outerFs.getFileStatus(outerPath);
FileStatus toReturn = new FileStatus(InLineFSUtils.length(inlinePath), status.isDirectory(), status.getReplication(), status.getBlockSize(),
diff --git a/hudi-common/src/main/java/org/apache/hudi/common/table/log/HoodieLogFormatReader.java b/hudi-common/src/main/java/org/apache/hudi/common/table/log/HoodieLogFormatReader.java
index 36fa187..febdbf8 100644
--- a/hudi-common/src/main/java/org/apache/hudi/common/table/log/HoodieLogFormatReader.java
+++ b/hudi-common/src/main/java/org/apache/hudi/common/table/log/HoodieLogFormatReader.java
@@ -44,6 +44,7 @@ public class HoodieLogFormatReader implements HoodieLogFormat.Reader {
private final Schema readerSchema;
private final boolean readBlocksLazily;
private final boolean reverseLogReader;
+ private final boolean enableInLineReading;
private int bufferSize;
private static final Logger LOG = LogManager.getLogger(HoodieLogFormatReader.class);
@@ -62,6 +63,7 @@ public class HoodieLogFormatReader implements HoodieLogFormat.Reader {
this.reverseLogReader = reverseLogReader;
this.bufferSize = bufferSize;
this.prevReadersInOpenState = new ArrayList<>();
+ this.enableInLineReading = enableInlineReading;
if (logFiles.size() > 0) {
HoodieLogFile nextLogFile = logFiles.remove(0);
this.currentReader = new HoodieLogFileReader(fs, nextLogFile, readerSchema, bufferSize, readBlocksLazily, false, enableInlineReading);
@@ -104,7 +106,8 @@ public class HoodieLogFormatReader implements HoodieLogFormat.Reader {
this.prevReadersInOpenState.add(currentReader);
}
this.currentReader =
- new HoodieLogFileReader(fs, nextLogFile, readerSchema, bufferSize, readBlocksLazily, false);
+ new HoodieLogFileReader(fs, nextLogFile, readerSchema, bufferSize, readBlocksLazily, false,
+ this.enableInLineReading);
} catch (IOException io) {
throw new HoodieIOException("unable to initialize read with log file ", io);
}
diff --git a/hudi-common/src/test/java/org/apache/hudi/common/fs/inline/TestInLineFileSystem.java b/hudi-common/src/test/java/org/apache/hudi/common/fs/inline/TestInLineFileSystem.java
index 4553aa5..92f83aa 100644
--- a/hudi-common/src/test/java/org/apache/hudi/common/fs/inline/TestInLineFileSystem.java
+++ b/hudi-common/src/test/java/org/apache/hudi/common/fs/inline/TestInLineFileSystem.java
@@ -296,6 +296,64 @@ public class TestInLineFileSystem {
}, "Should have thrown exception");
}
+ static class TestFSPath {
+ final Path inputPath;
+ final Path expectedInLineFSPath;
+ final Path transformedInputPath;
+
+ TestFSPath(final Path inputPath, final Path expectedInLineFSPath, final Path transformedInputPath) {
+ this.inputPath = inputPath;
+ this.expectedInLineFSPath = expectedInLineFSPath;
+ this.transformedInputPath = transformedInputPath;
+ }
+ }
+
+ @Test
+ public void testInLineFSPathConversions() {
+ final List<TestFSPath> expectedInLinePaths = Arrays.asList(
+ new TestFSPath(
+ new Path("/zero/524bae7e-f01d-47ae-b7cd-910400a81336"),
+ new Path("inlinefs://zero/524bae7e-f01d-47ae-b7cd-910400a81336/file/?start_offset=10&length=10"),
+ new Path("file:/zero/524bae7e-f01d-47ae-b7cd-910400a81336")),
+ new TestFSPath(
+ new Path("file:/one/524bae7e-f01d-47ae-b7cd-910400a81336"),
+ new Path("inlinefs://one/524bae7e-f01d-47ae-b7cd-910400a81336/file/?start_offset=10&length=10"),
+ new Path("file:/one/524bae7e-f01d-47ae-b7cd-910400a81336")),
+ new TestFSPath(
+ new Path("file://two/524bae7e-f01d-47ae-b7cd-910400a81336"),
+ new Path("inlinefs://two/524bae7e-f01d-47ae-b7cd-910400a81336/file/?start_offset=10&length=10"),
+ new Path("file:/two/524bae7e-f01d-47ae-b7cd-910400a81336")),
+ new TestFSPath(
+ new Path("hdfs://three/524bae7e-f01d-47ae-b7cd-910400a81336"),
+ new Path("inlinefs://three/524bae7e-f01d-47ae-b7cd-910400a81336/hdfs/?start_offset=10&length=10"),
+ new Path("hdfs://three/524bae7e-f01d-47ae-b7cd-910400a81336")),
+ new TestFSPath(
+ new Path("s3://four/524bae7e-f01d-47ae-b7cd-910400a81336"),
+ new Path("inlinefs://four/524bae7e-f01d-47ae-b7cd-910400a81336/s3/?start_offset=10&length=10"),
+ new Path("s3://four/524bae7e-f01d-47ae-b7cd-910400a81336")),
+ new TestFSPath(
+ new Path("s3a://five/524bae7e-f01d-47ae-b7cd-910400a81336"),
+ new Path("inlinefs://five/524bae7e-f01d-47ae-b7cd-910400a81336/s3a/?start_offset=10&length=10"),
+ new Path("s3a://five/524bae7e-f01d-47ae-b7cd-910400a81336"))
+ );
+
+ for (TestFSPath entry : expectedInLinePaths) {
+ final Path inputPath = entry.inputPath;
+ final Path expectedInLineFSPath = entry.expectedInLineFSPath;
+ final Path expectedTransformedInputPath = entry.transformedInputPath;
+
+ String scheme = "file";
+ if (inputPath.toString().contains(":")) {
+ scheme = inputPath.toString().split(":")[0];
+ }
+ final Path actualInLineFSPath = InLineFSUtils.getInlineFilePath(inputPath, scheme, 10, 10);
+ assertEquals(expectedInLineFSPath, actualInLineFSPath);
+
+ final Path actualOuterFilePath = InLineFSUtils.getOuterFilePathFromInlinePath(actualInLineFSPath);
+ assertEquals(expectedTransformedInputPath, actualOuterFilePath);
+ }
+ }
+
@Test
public void testExists() throws IOException {
Path inlinePath = getRandomInlinePath();