You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hudi.apache.org by na...@apache.org on 2021/01/19 07:07:41 UTC

[hudi] branch master updated: [HUDI-1532] Fixed suboptimal implementation of a magic sequence search (#2440)

This is an automated email from the ASF dual-hosted git repository.

nagarwal pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/hudi.git


The following commit(s) were added to refs/heads/master by this push:
     new a38612b  [HUDI-1532] Fixed suboptimal implementation of a magic sequence search  (#2440)
a38612b is described below

commit a38612b10f6ae04644519270f9b5eb631a77c148
Author: Volodymyr Burenin <vb...@gmail.com>
AuthorDate: Tue Jan 19 01:07:27 2021 -0600

    [HUDI-1532] Fixed suboptimal implementation of a magic sequence search  (#2440)
    
    * Fixed suboptimal implementation of a magic sequence search on GCS.
    
    * Fix comparison.
    
    * Added buffered reader around plugged storage plugin such as GCS.
    
    * 1. Corrected some comments 2. Refactored GCS input stream check
    
    Co-authored-by: volodymyr.burenin <vo...@cloudkitchens.com>
    Co-authored-by: Nishith Agarwal <na...@uber.com>
---
 .../java/org/apache/hudi/common/fs/FSUtils.java    | 22 ++++++++--------
 .../hudi/common/table/log/HoodieLogFileReader.java | 30 +++++++++++++++-------
 2 files changed, 31 insertions(+), 21 deletions(-)

diff --git a/hudi-common/src/main/java/org/apache/hudi/common/fs/FSUtils.java b/hudi-common/src/main/java/org/apache/hudi/common/fs/FSUtils.java
index 1990c0a..fb36bfe 100644
--- a/hudi-common/src/main/java/org/apache/hudi/common/fs/FSUtils.java
+++ b/hudi-common/src/main/java/org/apache/hudi/common/fs/FSUtils.java
@@ -18,6 +18,15 @@
 
 package org.apache.hudi.common.fs;
 
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FSDataInputStream;
+import org.apache.hadoop.fs.FileStatus;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.LocatedFileStatus;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.fs.PathFilter;
+import org.apache.hadoop.fs.RemoteIterator;
+import org.apache.hadoop.hdfs.DistributedFileSystem;
 import org.apache.hudi.common.config.SerializableConfiguration;
 import org.apache.hudi.common.engine.HoodieEngineContext;
 import org.apache.hudi.common.model.HoodieFileFormat;
@@ -31,16 +40,6 @@ import org.apache.hudi.exception.HoodieException;
 import org.apache.hudi.exception.HoodieIOException;
 import org.apache.hudi.exception.InvalidHoodiePathException;
 import org.apache.hudi.metadata.HoodieTableMetadata;
-
-import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.fs.FSDataInputStream;
-import org.apache.hadoop.fs.FileStatus;
-import org.apache.hadoop.fs.FileSystem;
-import org.apache.hadoop.fs.LocatedFileStatus;
-import org.apache.hadoop.fs.Path;
-import org.apache.hadoop.fs.PathFilter;
-import org.apache.hadoop.fs.RemoteIterator;
-import org.apache.hadoop.hdfs.DistributedFileSystem;
 import org.apache.log4j.LogManager;
 import org.apache.log4j.Logger;
 
@@ -50,11 +49,10 @@ import java.io.IOException;
 import java.util.ArrayList;
 import java.util.Arrays;
 import java.util.HashSet;
-import java.util.Set;
-
 import java.util.List;
 import java.util.Map.Entry;
 import java.util.Objects;
+import java.util.Set;
 import java.util.UUID;
 import java.util.function.Function;
 import java.util.regex.Matcher;
diff --git a/hudi-common/src/main/java/org/apache/hudi/common/table/log/HoodieLogFileReader.java b/hudi-common/src/main/java/org/apache/hudi/common/table/log/HoodieLogFileReader.java
index 5bd43ac..e437b78 100644
--- a/hudi-common/src/main/java/org/apache/hudi/common/table/log/HoodieLogFileReader.java
+++ b/hudi-common/src/main/java/org/apache/hudi/common/table/log/HoodieLogFileReader.java
@@ -40,6 +40,7 @@ import org.apache.hadoop.fs.BufferedFSInputStream;
 import org.apache.hadoop.fs.FSDataInputStream;
 import org.apache.hadoop.fs.FSInputStream;
 import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.hbase.util.Bytes;
 import org.apache.log4j.LogManager;
 import org.apache.log4j.Logger;
 
@@ -57,6 +58,7 @@ import java.util.Objects;
 public class HoodieLogFileReader implements HoodieLogFormat.Reader {
 
   public static final int DEFAULT_BUFFER_SIZE = 16 * 1024 * 1024; // 16 MB
+  private static final int BLOCK_SCAN_READ_BUFFER_SIZE = 1024 * 1024; // 1 MB
   private static final Logger LOG = LogManager.getLogger(HoodieLogFileReader.class);
 
   private final FSDataInputStream inputStream;
@@ -71,9 +73,13 @@ public class HoodieLogFileReader implements HoodieLogFormat.Reader {
   private transient Thread shutdownThread = null;
 
   public HoodieLogFileReader(FileSystem fs, HoodieLogFile logFile, Schema readerSchema, int bufferSize,
-      boolean readBlockLazily, boolean reverseReader) throws IOException {
+                             boolean readBlockLazily, boolean reverseReader) throws IOException {
     FSDataInputStream fsDataInputStream = fs.open(logFile.getPath(), bufferSize);
-    if (fsDataInputStream.getWrappedStream() instanceof FSInputStream) {
+    if (FSUtils.isGCSInputStream(fsDataInputStream)) {
+      this.inputStream = new TimedFSDataInputStream(logFile.getPath(), new FSDataInputStream(
+          new BufferedFSInputStream((FSInputStream) ((
+              (FSDataInputStream) fsDataInputStream.getWrappedStream()).getWrappedStream()), bufferSize)));
+    } else if (fsDataInputStream.getWrappedStream() instanceof FSInputStream) {
       this.inputStream = new TimedFSDataInputStream(logFile.getPath(), new FSDataInputStream(
           new BufferedFSInputStream((FSInputStream) fsDataInputStream.getWrappedStream(), bufferSize)));
     } else {
@@ -274,19 +280,25 @@ public class HoodieLogFileReader implements HoodieLogFormat.Reader {
   }
 
   private long scanForNextAvailableBlockOffset() throws IOException {
+    // Make buffer large enough to scan through the file as quick as possible especially if it is on S3/GCS.
+    byte[] dataBuf = new byte[BLOCK_SCAN_READ_BUFFER_SIZE];
+    boolean eof = false;
     while (true) {
       long currentPos = inputStream.getPos();
       try {
-        boolean hasNextMagic = hasNextMagic();
-        if (hasNextMagic) {
-          return currentPos;
-        } else {
-          // No luck - advance and try again
-          inputStream.seek(currentPos + 1);
-        }
+        Arrays.fill(dataBuf, (byte) 0);
+        inputStream.readFully(dataBuf, 0, dataBuf.length);
       } catch (EOFException e) {
+        eof = true;
+      }
+      long pos = Bytes.indexOf(dataBuf, HoodieLogFormat.MAGIC);
+      if (pos >= 0) {
+        return currentPos + pos;
+      }
+      if (eof) {
         return inputStream.getPos();
       }
+      inputStream.seek(currentPos + dataBuf.length - HoodieLogFormat.MAGIC.length);
     }
   }