You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hudi.apache.org by na...@apache.org on 2021/01/19 07:07:41 UTC
[hudi] branch master updated: [HUDI-1532] Fixed suboptimal
implementation of a magic sequence search (#2440)
This is an automated email from the ASF dual-hosted git repository.
nagarwal pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/hudi.git
The following commit(s) were added to refs/heads/master by this push:
new a38612b [HUDI-1532] Fixed suboptimal implementation of a magic sequence search (#2440)
a38612b is described below
commit a38612b10f6ae04644519270f9b5eb631a77c148
Author: Volodymyr Burenin <vb...@gmail.com>
AuthorDate: Tue Jan 19 01:07:27 2021 -0600
[HUDI-1532] Fixed suboptimal implementation of a magic sequence search (#2440)
* Fixed suboptimal implementation of a magic sequence search on GCS.
* Fix comparison.
* Added buffered reader around plugged storage plugin such as GCS.
* 1. Corrected some comments 2. Refactored GCS input stream check
Co-authored-by: volodymyr.burenin <vo...@cloudkitchens.com>
Co-authored-by: Nishith Agarwal <na...@uber.com>
---
.../java/org/apache/hudi/common/fs/FSUtils.java | 22 ++++++++--------
.../hudi/common/table/log/HoodieLogFileReader.java | 30 +++++++++++++++-------
2 files changed, 31 insertions(+), 21 deletions(-)
diff --git a/hudi-common/src/main/java/org/apache/hudi/common/fs/FSUtils.java b/hudi-common/src/main/java/org/apache/hudi/common/fs/FSUtils.java
index 1990c0a..fb36bfe 100644
--- a/hudi-common/src/main/java/org/apache/hudi/common/fs/FSUtils.java
+++ b/hudi-common/src/main/java/org/apache/hudi/common/fs/FSUtils.java
@@ -18,6 +18,15 @@
package org.apache.hudi.common.fs;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FSDataInputStream;
+import org.apache.hadoop.fs.FileStatus;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.LocatedFileStatus;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.fs.PathFilter;
+import org.apache.hadoop.fs.RemoteIterator;
+import org.apache.hadoop.hdfs.DistributedFileSystem;
import org.apache.hudi.common.config.SerializableConfiguration;
import org.apache.hudi.common.engine.HoodieEngineContext;
import org.apache.hudi.common.model.HoodieFileFormat;
@@ -31,16 +40,6 @@ import org.apache.hudi.exception.HoodieException;
import org.apache.hudi.exception.HoodieIOException;
import org.apache.hudi.exception.InvalidHoodiePathException;
import org.apache.hudi.metadata.HoodieTableMetadata;
-
-import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.fs.FSDataInputStream;
-import org.apache.hadoop.fs.FileStatus;
-import org.apache.hadoop.fs.FileSystem;
-import org.apache.hadoop.fs.LocatedFileStatus;
-import org.apache.hadoop.fs.Path;
-import org.apache.hadoop.fs.PathFilter;
-import org.apache.hadoop.fs.RemoteIterator;
-import org.apache.hadoop.hdfs.DistributedFileSystem;
import org.apache.log4j.LogManager;
import org.apache.log4j.Logger;
@@ -50,11 +49,10 @@ import java.io.IOException;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.HashSet;
-import java.util.Set;
-
import java.util.List;
import java.util.Map.Entry;
import java.util.Objects;
+import java.util.Set;
import java.util.UUID;
import java.util.function.Function;
import java.util.regex.Matcher;
diff --git a/hudi-common/src/main/java/org/apache/hudi/common/table/log/HoodieLogFileReader.java b/hudi-common/src/main/java/org/apache/hudi/common/table/log/HoodieLogFileReader.java
index 5bd43ac..e437b78 100644
--- a/hudi-common/src/main/java/org/apache/hudi/common/table/log/HoodieLogFileReader.java
+++ b/hudi-common/src/main/java/org/apache/hudi/common/table/log/HoodieLogFileReader.java
@@ -40,6 +40,7 @@ import org.apache.hadoop.fs.BufferedFSInputStream;
import org.apache.hadoop.fs.FSDataInputStream;
import org.apache.hadoop.fs.FSInputStream;
import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.hbase.util.Bytes;
import org.apache.log4j.LogManager;
import org.apache.log4j.Logger;
@@ -57,6 +58,7 @@ import java.util.Objects;
public class HoodieLogFileReader implements HoodieLogFormat.Reader {
public static final int DEFAULT_BUFFER_SIZE = 16 * 1024 * 1024; // 16 MB
+ private static final int BLOCK_SCAN_READ_BUFFER_SIZE = 1024 * 1024; // 1 MB
private static final Logger LOG = LogManager.getLogger(HoodieLogFileReader.class);
private final FSDataInputStream inputStream;
@@ -71,9 +73,13 @@ public class HoodieLogFileReader implements HoodieLogFormat.Reader {
private transient Thread shutdownThread = null;
public HoodieLogFileReader(FileSystem fs, HoodieLogFile logFile, Schema readerSchema, int bufferSize,
- boolean readBlockLazily, boolean reverseReader) throws IOException {
+ boolean readBlockLazily, boolean reverseReader) throws IOException {
FSDataInputStream fsDataInputStream = fs.open(logFile.getPath(), bufferSize);
- if (fsDataInputStream.getWrappedStream() instanceof FSInputStream) {
+ if (FSUtils.isGCSInputStream(fsDataInputStream)) {
+ this.inputStream = new TimedFSDataInputStream(logFile.getPath(), new FSDataInputStream(
+ new BufferedFSInputStream((FSInputStream) ((
+ (FSDataInputStream) fsDataInputStream.getWrappedStream()).getWrappedStream()), bufferSize)));
+ } else if (fsDataInputStream.getWrappedStream() instanceof FSInputStream) {
this.inputStream = new TimedFSDataInputStream(logFile.getPath(), new FSDataInputStream(
new BufferedFSInputStream((FSInputStream) fsDataInputStream.getWrappedStream(), bufferSize)));
} else {
@@ -274,19 +280,25 @@ public class HoodieLogFileReader implements HoodieLogFormat.Reader {
}
private long scanForNextAvailableBlockOffset() throws IOException {
+ // Make buffer large enough to scan through the file as quick as possible especially if it is on S3/GCS.
+ byte[] dataBuf = new byte[BLOCK_SCAN_READ_BUFFER_SIZE];
+ boolean eof = false;
while (true) {
long currentPos = inputStream.getPos();
try {
- boolean hasNextMagic = hasNextMagic();
- if (hasNextMagic) {
- return currentPos;
- } else {
- // No luck - advance and try again
- inputStream.seek(currentPos + 1);
- }
+ Arrays.fill(dataBuf, (byte) 0);
+ inputStream.readFully(dataBuf, 0, dataBuf.length);
} catch (EOFException e) {
+ eof = true;
+ }
+ long pos = Bytes.indexOf(dataBuf, HoodieLogFormat.MAGIC);
+ if (pos >= 0) {
+ return currentPos + pos;
+ }
+ if (eof) {
return inputStream.getPos();
}
+ inputStream.seek(currentPos + dataBuf.length - HoodieLogFormat.MAGIC.length);
}
}