You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hudi.apache.org by si...@apache.org on 2021/10/25 05:21:23 UTC

[hudi] branch master updated: [HUDI-2005] Avoiding direct fs calls in HoodieLogFileReader (#3757)

This is an automated email from the ASF dual-hosted git repository.

sivabalan pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/hudi.git


The following commit(s) were added to refs/heads/master by this push:
     new 1bb0532  [HUDI-2005] Avoiding direct fs calls in HoodieLogFileReader (#3757)
1bb0532 is described below

commit 1bb05325637740498cac548872cf7223e34950d0
Author: Sivabalan Narayanan <si...@uber.com>
AuthorDate: Mon Oct 25 01:21:08 2021 -0400

    [HUDI-2005] Avoiding direct fs calls in HoodieLogFileReader (#3757)
---
 .../hudi/common/table/log/HoodieLogFileReader.java     | 12 ++++++------
 .../apache/hudi/common/table/log/HoodieLogFormat.java  |  2 +-
 .../hudi/common/table/log/HoodieLogFormatReader.java   |  4 ++--
 .../apache/hudi/common/table/log/LogReaderUtils.java   | 18 +++++++++++-------
 .../hudi/metadata/HoodieMetadataFileSystemView.java    |  2 +-
 .../hadoop/realtime/AbstractRealtimeRecordReader.java  |  2 +-
 .../hudi/hadoop/realtime/HoodieRealtimeFileSplit.java  | 12 ++++++++++--
 .../realtime/RealtimeBootstrapBaseFileSplit.java       | 13 +++++++++++--
 .../org/apache/hudi/hadoop/realtime/RealtimeSplit.java |  3 +++
 .../hadoop/utils/HoodieRealtimeInputFormatUtils.java   |  5 +++--
 .../hadoop/realtime/TestHoodieRealtimeFileSplit.java   |  5 ++++-
 .../realtime/TestHoodieRealtimeRecordReader.java       | 17 +++++++++--------
 12 files changed, 62 insertions(+), 33 deletions(-)

diff --git a/hudi-common/src/main/java/org/apache/hudi/common/table/log/HoodieLogFileReader.java b/hudi-common/src/main/java/org/apache/hudi/common/table/log/HoodieLogFileReader.java
index f0f3842..88b7e32 100644
--- a/hudi-common/src/main/java/org/apache/hudi/common/table/log/HoodieLogFileReader.java
+++ b/hudi-common/src/main/java/org/apache/hudi/common/table/log/HoodieLogFileReader.java
@@ -74,6 +74,11 @@ public class HoodieLogFileReader implements HoodieLogFormat.Reader {
   private transient Thread shutdownThread = null;
 
   public HoodieLogFileReader(FileSystem fs, HoodieLogFile logFile, Schema readerSchema, int bufferSize,
+                             boolean readBlockLazily) throws IOException {
+    this(fs, logFile, readerSchema, bufferSize, readBlockLazily, false);
+  }
+
+  public HoodieLogFileReader(FileSystem fs, HoodieLogFile logFile, Schema readerSchema, int bufferSize,
                              boolean readBlockLazily, boolean reverseReader) throws IOException {
     FSDataInputStream fsDataInputStream = fs.open(logFile.getPath(), bufferSize);
     this.logFile = logFile;
@@ -82,16 +87,11 @@ public class HoodieLogFileReader implements HoodieLogFormat.Reader {
     this.readBlockLazily = readBlockLazily;
     this.reverseReader = reverseReader;
     if (this.reverseReader) {
-      this.reverseLogFilePosition = this.lastReverseLogFilePosition = fs.getFileStatus(logFile.getPath()).getLen();
+      this.reverseLogFilePosition = this.lastReverseLogFilePosition = logFile.getFileSize();
     }
     addShutDownHook();
   }
 
-  public HoodieLogFileReader(FileSystem fs, HoodieLogFile logFile, Schema readerSchema, boolean readBlockLazily,
-      boolean reverseReader) throws IOException {
-    this(fs, logFile, readerSchema, DEFAULT_BUFFER_SIZE, readBlockLazily, reverseReader);
-  }
-
   public HoodieLogFileReader(FileSystem fs, HoodieLogFile logFile, Schema readerSchema) throws IOException {
     this(fs, logFile, readerSchema, DEFAULT_BUFFER_SIZE, false, false);
   }
diff --git a/hudi-common/src/main/java/org/apache/hudi/common/table/log/HoodieLogFormat.java b/hudi-common/src/main/java/org/apache/hudi/common/table/log/HoodieLogFormat.java
index c566788..569b4a2 100644
--- a/hudi-common/src/main/java/org/apache/hudi/common/table/log/HoodieLogFormat.java
+++ b/hudi-common/src/main/java/org/apache/hudi/common/table/log/HoodieLogFormat.java
@@ -274,7 +274,7 @@ public interface HoodieLogFormat {
 
   static HoodieLogFormat.Reader newReader(FileSystem fs, HoodieLogFile logFile, Schema readerSchema)
       throws IOException {
-    return new HoodieLogFileReader(fs, logFile, readerSchema, HoodieLogFileReader.DEFAULT_BUFFER_SIZE, false, false);
+    return new HoodieLogFileReader(fs, logFile, readerSchema, HoodieLogFileReader.DEFAULT_BUFFER_SIZE, false);
   }
 
   static HoodieLogFormat.Reader newReader(FileSystem fs, HoodieLogFile logFile, Schema readerSchema,
diff --git a/hudi-common/src/main/java/org/apache/hudi/common/table/log/HoodieLogFormatReader.java b/hudi-common/src/main/java/org/apache/hudi/common/table/log/HoodieLogFormatReader.java
index 7267227..e64e1a1 100644
--- a/hudi-common/src/main/java/org/apache/hudi/common/table/log/HoodieLogFormatReader.java
+++ b/hudi-common/src/main/java/org/apache/hudi/common/table/log/HoodieLogFormatReader.java
@@ -59,7 +59,7 @@ public class HoodieLogFormatReader implements HoodieLogFormat.Reader {
     this.prevReadersInOpenState = new ArrayList<>();
     if (logFiles.size() > 0) {
       HoodieLogFile nextLogFile = logFiles.remove(0);
-      this.currentReader = new HoodieLogFileReader(fs, nextLogFile, readerSchema, bufferSize, readBlocksLazily, false);
+      this.currentReader = new HoodieLogFileReader(fs, nextLogFile, readerSchema, bufferSize, readBlocksLazily);
     }
   }
 
@@ -99,7 +99,7 @@ public class HoodieLogFormatReader implements HoodieLogFormat.Reader {
           this.prevReadersInOpenState.add(currentReader);
         }
         this.currentReader =
-            new HoodieLogFileReader(fs, nextLogFile, readerSchema, bufferSize, readBlocksLazily, false);
+            new HoodieLogFileReader(fs, nextLogFile, readerSchema, bufferSize, readBlocksLazily);
       } catch (IOException io) {
         throw new HoodieIOException("unable to initialize read with log file ", io);
       }
diff --git a/hudi-common/src/main/java/org/apache/hudi/common/table/log/LogReaderUtils.java b/hudi-common/src/main/java/org/apache/hudi/common/table/log/LogReaderUtils.java
index fe159df..c2a0396 100644
--- a/hudi-common/src/main/java/org/apache/hudi/common/table/log/LogReaderUtils.java
+++ b/hudi-common/src/main/java/org/apache/hudi/common/table/log/LogReaderUtils.java
@@ -27,14 +27,16 @@ import org.apache.hudi.common.table.log.block.HoodieLogBlock;
 import org.apache.hudi.common.table.log.block.HoodieLogBlock.HeaderMetadataType;
 import org.apache.hudi.common.table.timeline.HoodieActiveTimeline;
 import org.apache.hudi.common.table.timeline.HoodieTimeline;
+import org.apache.hudi.common.util.collection.Pair;
 
 import org.apache.avro.Schema;
 import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileStatus;
 import org.apache.hadoop.fs.FileSystem;
-import org.apache.hadoop.fs.Path;
 
 import java.io.IOException;
 import java.util.List;
+import java.util.Map;
 import java.util.stream.Collectors;
 
 /**
@@ -42,9 +44,10 @@ import java.util.stream.Collectors;
  */
 public class LogReaderUtils {
 
-  private static Schema readSchemaFromLogFileInReverse(FileSystem fs, HoodieActiveTimeline activeTimeline, Path path)
+  private static Schema readSchemaFromLogFileInReverse(FileSystem fs, HoodieActiveTimeline activeTimeline, FileStatus logPathFileStatus)
       throws IOException {
-    Reader reader = HoodieLogFormat.newReader(fs, new HoodieLogFile(path), null, true, true);
+    // set length for the HoodieLogFile as it will be leveraged by HoodieLogFormat.Reader with reverseReading enabled
+    Reader reader = HoodieLogFormat.newReader(fs, new HoodieLogFile(logPathFileStatus.getPath(), logPathFileStatus.getLen()), null, true, true);
     Schema writerSchema = null;
     HoodieTimeline completedTimeline = activeTimeline.getCommitsTimeline().filterCompletedInstants();
     while (reader.hasPrev()) {
@@ -62,17 +65,19 @@ public class LogReaderUtils {
     return writerSchema;
   }
 
-  public static Schema readLatestSchemaFromLogFiles(String basePath, List<String> deltaFilePaths, Configuration config)
+  public static Schema readLatestSchemaFromLogFiles(String basePath, List<FileStatus> deltaFileStatus, Configuration config)
       throws IOException {
     HoodieTableMetaClient metaClient = HoodieTableMetaClient.builder().setConf(config).setBasePath(basePath).build();
-    List<String> deltaPaths = deltaFilePaths.stream().map(s -> new HoodieLogFile(new Path(s)))
+    List<String> deltaPaths = deltaFileStatus.stream().map(s -> new HoodieLogFile(s.getPath()))
         .sorted(HoodieLogFile.getReverseLogFileComparator()).map(s -> s.getPath().toString())
         .collect(Collectors.toList());
+    Map<String, FileStatus> deltaFilePathToFileStatus = deltaFileStatus.stream().map(entry -> Pair.of(entry.getPath().toString(), entry))
+        .collect(Collectors.toMap(Pair::getKey, Pair::getValue));
     if (deltaPaths.size() > 0) {
       for (String logPath : deltaPaths) {
         FileSystem fs = FSUtils.getFs(logPath, config);
         Schema schemaFromLogFile =
-            readSchemaFromLogFileInReverse(fs, metaClient.getActiveTimeline(), new Path(logPath));
+            readSchemaFromLogFileInReverse(fs, metaClient.getActiveTimeline(), deltaFilePathToFileStatus.get(logPath));
         if (schemaFromLogFile != null) {
           return schemaFromLogFile;
         }
@@ -80,5 +85,4 @@ public class LogReaderUtils {
     }
     return null;
   }
-
 }
diff --git a/hudi-common/src/main/java/org/apache/hudi/metadata/HoodieMetadataFileSystemView.java b/hudi-common/src/main/java/org/apache/hudi/metadata/HoodieMetadataFileSystemView.java
index 453ec8f..a918055 100644
--- a/hudi-common/src/main/java/org/apache/hudi/metadata/HoodieMetadataFileSystemView.java
+++ b/hudi-common/src/main/java/org/apache/hudi/metadata/HoodieMetadataFileSystemView.java
@@ -61,7 +61,7 @@ public class HoodieMetadataFileSystemView extends HoodieTableFileSystemView {
    * @throws IOException
    */
   @Override
-  protected FileStatus[] listPartition(Path partitionPath) throws IOException {
+  public FileStatus[] listPartition(Path partitionPath) throws IOException {
     return tableMetadata.getAllFilesInPartition(partitionPath);
   }
 
diff --git a/hudi-hadoop-mr/src/main/java/org/apache/hudi/hadoop/realtime/AbstractRealtimeRecordReader.java b/hudi-hadoop-mr/src/main/java/org/apache/hudi/hadoop/realtime/AbstractRealtimeRecordReader.java
index ef3d4f1..45c01ea 100644
--- a/hudi-hadoop-mr/src/main/java/org/apache/hudi/hadoop/realtime/AbstractRealtimeRecordReader.java
+++ b/hudi-hadoop-mr/src/main/java/org/apache/hudi/hadoop/realtime/AbstractRealtimeRecordReader.java
@@ -82,7 +82,7 @@ public abstract class AbstractRealtimeRecordReader {
    * job conf.
    */
   private void init() throws IOException {
-    Schema schemaFromLogFile = LogReaderUtils.readLatestSchemaFromLogFiles(split.getBasePath(), split.getDeltaLogPaths(), jobConf);
+    Schema schemaFromLogFile = LogReaderUtils.readLatestSchemaFromLogFiles(split.getBasePath(), split.getDeltaLogFileStatus(), jobConf);
     if (schemaFromLogFile == null) {
       writerSchema = InputSplitUtils.getBaseFileSchema((FileSplit)split, jobConf);
       LOG.info("Writer Schema From Parquet => " + writerSchema.getFields());
diff --git a/hudi-hadoop-mr/src/main/java/org/apache/hudi/hadoop/realtime/HoodieRealtimeFileSplit.java b/hudi-hadoop-mr/src/main/java/org/apache/hudi/hadoop/realtime/HoodieRealtimeFileSplit.java
index 6423f2c..3d9b62f 100644
--- a/hudi-hadoop-mr/src/main/java/org/apache/hudi/hadoop/realtime/HoodieRealtimeFileSplit.java
+++ b/hudi-hadoop-mr/src/main/java/org/apache/hudi/hadoop/realtime/HoodieRealtimeFileSplit.java
@@ -20,12 +20,14 @@ package org.apache.hudi.hadoop.realtime;
 
 import org.apache.hudi.common.util.Option;
 
+import org.apache.hadoop.fs.FileStatus;
 import org.apache.hadoop.mapred.FileSplit;
 
 import java.io.DataInput;
 import java.io.DataOutput;
 import java.io.IOException;
 import java.util.List;
+import java.util.stream.Collectors;
 
 /**
  * Filesplit that wraps the base split and a list of log files to merge deltas from.
@@ -33,6 +35,7 @@ import java.util.List;
 public class HoodieRealtimeFileSplit extends FileSplit implements RealtimeSplit {
 
   private List<String> deltaLogPaths;
+  private List<FileStatus> deltaLogFileStatus;
 
   private String maxCommitTime;
 
@@ -44,11 +47,12 @@ public class HoodieRealtimeFileSplit extends FileSplit implements RealtimeSplit
     super();
   }
 
-  public HoodieRealtimeFileSplit(FileSplit baseSplit, String basePath, List<String> deltaLogPaths, String maxCommitTime,
+  public HoodieRealtimeFileSplit(FileSplit baseSplit, String basePath, List<FileStatus> deltaLogFileStatus, String maxCommitTime,
                                  Option<HoodieVirtualKeyInfo> hoodieVirtualKeyInfo)
       throws IOException {
     super(baseSplit.getPath(), baseSplit.getStart(), baseSplit.getLength(), baseSplit.getLocations());
-    this.deltaLogPaths = deltaLogPaths;
+    this.deltaLogFileStatus = deltaLogFileStatus;
+    this.deltaLogPaths = deltaLogFileStatus.stream().map(entry -> entry.getPath().toString()).collect(Collectors.toList());
     this.maxCommitTime = maxCommitTime;
     this.basePath = basePath;
     this.hoodieVirtualKeyInfo = hoodieVirtualKeyInfo;
@@ -58,6 +62,10 @@ public class HoodieRealtimeFileSplit extends FileSplit implements RealtimeSplit
     return deltaLogPaths;
   }
 
+  public List<FileStatus> getDeltaLogFileStatus() {
+    return deltaLogFileStatus;
+  }
+
   public String getMaxCommitTime() {
     return maxCommitTime;
   }
diff --git a/hudi-hadoop-mr/src/main/java/org/apache/hudi/hadoop/realtime/RealtimeBootstrapBaseFileSplit.java b/hudi-hadoop-mr/src/main/java/org/apache/hudi/hadoop/realtime/RealtimeBootstrapBaseFileSplit.java
index 4da310d..f9b0bd0 100644
--- a/hudi-hadoop-mr/src/main/java/org/apache/hudi/hadoop/realtime/RealtimeBootstrapBaseFileSplit.java
+++ b/hudi-hadoop-mr/src/main/java/org/apache/hudi/hadoop/realtime/RealtimeBootstrapBaseFileSplit.java
@@ -21,12 +21,14 @@ package org.apache.hudi.hadoop.realtime;
 import org.apache.hudi.common.util.Option;
 import org.apache.hudi.hadoop.BootstrapBaseFileSplit;
 
+import org.apache.hadoop.fs.FileStatus;
 import org.apache.hadoop.mapred.FileSplit;
 
 import java.io.DataInput;
 import java.io.DataOutput;
 import java.io.IOException;
 import java.util.List;
+import java.util.stream.Collectors;
 
 /**
  * Realtime File Split with external base file.
@@ -34,6 +36,7 @@ import java.util.List;
 public class RealtimeBootstrapBaseFileSplit extends BootstrapBaseFileSplit implements RealtimeSplit {
 
   private List<String> deltaLogPaths;
+  private List<FileStatus> deltaLogFileStatus;
 
   private String maxInstantTime;
 
@@ -43,11 +46,12 @@ public class RealtimeBootstrapBaseFileSplit extends BootstrapBaseFileSplit imple
     super();
   }
 
-  public RealtimeBootstrapBaseFileSplit(FileSplit baseSplit, String basePath, List<String> deltaLogPaths,
+  public RealtimeBootstrapBaseFileSplit(FileSplit baseSplit, String basePath, List<FileStatus> deltaLogFileStatus,
                                         String maxInstantTime, FileSplit externalFileSplit) throws IOException {
     super(baseSplit, externalFileSplit);
     this.maxInstantTime = maxInstantTime;
-    this.deltaLogPaths = deltaLogPaths;
+    this.deltaLogFileStatus = deltaLogFileStatus;
+    this.deltaLogPaths = deltaLogFileStatus.stream().map(entry -> entry.getPath().toString()).collect(Collectors.toList());
     this.basePath = basePath;
   }
 
@@ -69,6 +73,11 @@ public class RealtimeBootstrapBaseFileSplit extends BootstrapBaseFileSplit imple
   }
 
   @Override
+  public List<FileStatus> getDeltaLogFileStatus() {
+    return deltaLogFileStatus;
+  }
+
+  @Override
   public String getMaxCommitTime() {
     return maxInstantTime;
   }
diff --git a/hudi-hadoop-mr/src/main/java/org/apache/hudi/hadoop/realtime/RealtimeSplit.java b/hudi-hadoop-mr/src/main/java/org/apache/hudi/hadoop/realtime/RealtimeSplit.java
index 108613c..6dfaf16 100644
--- a/hudi-hadoop-mr/src/main/java/org/apache/hudi/hadoop/realtime/RealtimeSplit.java
+++ b/hudi-hadoop-mr/src/main/java/org/apache/hudi/hadoop/realtime/RealtimeSplit.java
@@ -21,6 +21,7 @@ package org.apache.hudi.hadoop.realtime;
 import org.apache.hudi.common.util.Option;
 import org.apache.hudi.hadoop.InputSplitUtils;
 
+import org.apache.hadoop.fs.FileStatus;
 import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.mapred.InputSplitWithLocationInfo;
 
@@ -41,6 +42,8 @@ public interface RealtimeSplit extends InputSplitWithLocationInfo {
    */
   List<String> getDeltaLogPaths();
 
+  List<FileStatus> getDeltaLogFileStatus();
+
   /**
    * Return Max Instant Time.
    * @return
diff --git a/hudi-hadoop-mr/src/main/java/org/apache/hudi/hadoop/utils/HoodieRealtimeInputFormatUtils.java b/hudi-hadoop-mr/src/main/java/org/apache/hudi/hadoop/utils/HoodieRealtimeInputFormatUtils.java
index f84e344..9cf61b2 100644
--- a/hudi-hadoop-mr/src/main/java/org/apache/hudi/hadoop/utils/HoodieRealtimeInputFormatUtils.java
+++ b/hudi-hadoop-mr/src/main/java/org/apache/hudi/hadoop/utils/HoodieRealtimeInputFormatUtils.java
@@ -43,6 +43,7 @@ import org.apache.hudi.hadoop.realtime.RealtimeBootstrapBaseFileSplit;
 import org.apache.hudi.hadoop.realtime.RealtimeSplit;
 
 import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileStatus;
 import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.hive.serde2.ColumnProjectionUtils;
 import org.apache.hadoop.mapred.FileSplit;
@@ -130,8 +131,8 @@ public class HoodieRealtimeInputFormatUtils extends HoodieInputFormatUtils {
           List<FileSplit> dataFileSplits = groupedInputSplits.get(fileSlice.getFileId());
           dataFileSplits.forEach(split -> {
             try {
-              List<String> logFilePaths = fileSlice.getLogFiles().sorted(HoodieLogFile.getLogFileComparator())
-                  .map(logFile -> logFile.getPath().toString()).collect(Collectors.toList());
+              List<FileStatus> logFilePaths = fileSlice.getLogFiles().sorted(HoodieLogFile.getLogFileComparator())
+                  .map(logFile -> logFile.getFileStatus()).collect(Collectors.toList());
               if (split instanceof BootstrapBaseFileSplit) {
                 BootstrapBaseFileSplit eSplit = (BootstrapBaseFileSplit) split;
                 String[] hosts = split.getLocationInfo() != null ? Arrays.stream(split.getLocationInfo())
diff --git a/hudi-hadoop-mr/src/test/java/org/apache/hudi/hadoop/realtime/TestHoodieRealtimeFileSplit.java b/hudi-hadoop-mr/src/test/java/org/apache/hudi/hadoop/realtime/TestHoodieRealtimeFileSplit.java
index ac85786..06f7b72 100644
--- a/hudi-hadoop-mr/src/test/java/org/apache/hudi/hadoop/realtime/TestHoodieRealtimeFileSplit.java
+++ b/hudi-hadoop-mr/src/test/java/org/apache/hudi/hadoop/realtime/TestHoodieRealtimeFileSplit.java
@@ -20,6 +20,7 @@ package org.apache.hudi.hadoop.realtime;
 
 import org.apache.hudi.common.util.Option;
 
+import org.apache.hadoop.fs.FileStatus;
 import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.io.Text;
 import org.apache.hadoop.mapred.FileSplit;
@@ -56,6 +57,7 @@ public class TestHoodieRealtimeFileSplit {
 
   private HoodieRealtimeFileSplit split;
   private String basePath;
+  private List<FileStatus> deltaLogFileStatus;
   private List<String> deltaLogPaths;
   private String fileSplitName;
   private FileSplit baseFileSplit;
@@ -64,12 +66,13 @@ public class TestHoodieRealtimeFileSplit {
   @BeforeEach
   public void setUp(@TempDir java.nio.file.Path tempDir) throws Exception {
     basePath = tempDir.toAbsolutePath().toString();
+    deltaLogFileStatus = Collections.singletonList(new FileStatus(0L, false, 0, 0L, 0, new Path(basePath + "/1.log")));
     deltaLogPaths = Collections.singletonList(basePath + "/1.log");
     fileSplitName = basePath + "/test.file";
     baseFileSplit = new FileSplit(new Path(fileSplitName), 0, 100, new String[] {});
     maxCommitTime = "10001";
 
-    split = new HoodieRealtimeFileSplit(baseFileSplit, basePath, deltaLogPaths, maxCommitTime, Option.empty());
+    split = new HoodieRealtimeFileSplit(baseFileSplit, basePath, deltaLogFileStatus, maxCommitTime, Option.empty());
   }
 
   @Test
diff --git a/hudi-hadoop-mr/src/test/java/org/apache/hudi/hadoop/realtime/TestHoodieRealtimeRecordReader.java b/hudi-hadoop-mr/src/test/java/org/apache/hudi/hadoop/realtime/TestHoodieRealtimeRecordReader.java
index a647da9..f0c1ab1 100644
--- a/hudi-hadoop-mr/src/test/java/org/apache/hudi/hadoop/realtime/TestHoodieRealtimeRecordReader.java
+++ b/hudi-hadoop-mr/src/test/java/org/apache/hudi/hadoop/realtime/TestHoodieRealtimeRecordReader.java
@@ -41,6 +41,7 @@ import org.apache.hudi.hadoop.testutils.InputFormatTestUtil;
 import org.apache.avro.Schema;
 import org.apache.avro.Schema.Field;
 import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileStatus;
 import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.hive.metastore.api.hive_metastoreConstants;
@@ -187,7 +188,7 @@ public class TestHoodieRealtimeRecordReader {
         HoodieRealtimeFileSplit split = new HoodieRealtimeFileSplit(
             new FileSplit(new Path(partitionDir + "/fileid0_1-0-1_" + baseInstant + ".parquet"), 0, 1, baseJobConf),
             basePath.toUri().toString(), fileSlice.getLogFiles().sorted(HoodieLogFile.getLogFileComparator())
-            .map(h -> h.getPath().toString()).collect(Collectors.toList()),
+            .map(h -> new FileStatus(0L, false, 0, 0L, 0, h.getPath())).collect(Collectors.toList()),
             instantTime, Option.empty());
 
         // create a RecordReader to be used by HoodieRealtimeRecordReader
@@ -256,10 +257,10 @@ public class TestHoodieRealtimeRecordReader {
     FileCreateUtils.createDeltaCommit(basePath.toString(), newCommitTime);
 
     // create a split with baseFile (parquet file written earlier) and new log file(s)
-    String logFilePath = writer.getLogFile().getPath().toString();
+    FileStatus logFileFileStatus = new FileStatus(0L, false, 0, 0L, 0, writer.getLogFile().getPath());
     HoodieRealtimeFileSplit split = new HoodieRealtimeFileSplit(
         new FileSplit(new Path(partitionDir + "/fileid0_1-0-1_" + instantTime + ".parquet"), 0, 1, baseJobConf),
-        basePath.toUri().toString(), Collections.singletonList(logFilePath), newCommitTime, Option.empty());
+        basePath.toUri().toString(), Collections.singletonList(logFileFileStatus), newCommitTime, Option.empty());
 
     // create a RecordReader to be used by HoodieRealtimeRecordReader
     RecordReader<NullWritable, ArrayWritable> reader = new MapredParquetInputFormat().getRecordReader(
@@ -336,10 +337,10 @@ public class TestHoodieRealtimeRecordReader {
     InputFormatTestUtil.deltaCommit(basePath, newCommitTime);
 
     // create a split with baseFile (parquet file written earlier) and new log file(s)
-    String logFilePath = writer.getLogFile().getPath().toString();
+    FileStatus logFileStatus = new FileStatus(0L, false, 0, 0L, 0, writer.getLogFile().getPath());
     HoodieRealtimeFileSplit split = new HoodieRealtimeFileSplit(
         new FileSplit(new Path(partitionDir + "/fileid0_1-0-1_" + instantTime + ".parquet"), 0, 1, baseJobConf),
-        basePath.toUri().toString(), Collections.singletonList(logFilePath), newCommitTime, Option.empty());
+        basePath.toUri().toString(), Collections.singletonList(logFileStatus), newCommitTime, Option.empty());
 
     // create a RecordReader to be used by HoodieRealtimeRecordReader
     RecordReader<NullWritable, ArrayWritable> reader = new MapredParquetInputFormat().getRecordReader(
@@ -449,7 +450,7 @@ public class TestHoodieRealtimeRecordReader {
   public void testSchemaEvolutionAndRollbackBlockInLastLogFile(ExternalSpillableMap.DiskMapType diskMapType,
                                                                boolean isCompressionEnabled) throws Exception {
     // initial commit
-    List<String> logFilePaths = new ArrayList<>();
+    List<FileStatus> logFilePaths = new ArrayList<>();
     Schema schema = HoodieAvroUtils.addMetadataFields(SchemaTestUtil.getSimpleSchema());
     HoodieTestUtils.init(hadoopConf, basePath.toString(), HoodieTableType.MERGE_ON_READ);
     String instantTime = "100";
@@ -470,7 +471,7 @@ public class TestHoodieRealtimeRecordReader {
         InputFormatTestUtil.writeDataBlockToLogFile(partitionDir, fs, schema, "fileid0", instantTime, newCommitTime,
             numberOfLogRecords, 0, 1);
     long size = writer.getCurrentSize();
-    logFilePaths.add(writer.getLogFile().getPath().toString());
+    logFilePaths.add(new FileStatus(0L, false, 0, 0L, 0, writer.getLogFile().getPath()));
     writer.close();
     assertTrue(size > 0, "block - size should be > 0");
 
@@ -478,7 +479,7 @@ public class TestHoodieRealtimeRecordReader {
     newCommitTime = "102";
     writer = InputFormatTestUtil.writeRollbackBlockToLogFile(partitionDir, fs, schema, "fileid0", instantTime,
         newCommitTime, "101", 1);
-    logFilePaths.add(writer.getLogFile().getPath().toString());
+    logFilePaths.add(new FileStatus(0L, false, 0, 0L, 0, writer.getLogFile().getPath()));
     writer.close();
     InputFormatTestUtil.deltaCommit(basePath, newCommitTime);