You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hudi.apache.org by GitBox <gi...@apache.org> on 2020/06/02 02:42:16 UTC

[GitHub] [hudi] vinothchandar commented on a change in pull request #1592: [HUDI-822] decouple Hudi related logics from HoodieInputFormat

vinothchandar commented on a change in pull request #1592:
URL: https://github.com/apache/hudi/pull/1592#discussion_r433588072



##########
File path: hudi-hadoop-mr/src/main/java/org/apache/hudi/hadoop/utils/HoodieInputFormatUtils.java
##########
@@ -0,0 +1,336 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hudi.hadoop.utils;
+
+import org.apache.hudi.common.model.HoodieBaseFile;
+import org.apache.hudi.common.model.HoodieCommitMetadata;
+import org.apache.hudi.common.model.HoodiePartitionMetadata;
+import org.apache.hudi.common.table.HoodieTableMetaClient;
+import org.apache.hudi.common.table.timeline.HoodieDefaultTimeline;
+import org.apache.hudi.common.table.timeline.HoodieInstant;
+import org.apache.hudi.common.table.timeline.HoodieTimeline;
+import org.apache.hudi.common.table.view.HoodieTableFileSystemView;
+import org.apache.hudi.common.table.view.TableFileSystemView;
+import org.apache.hudi.common.util.Option;
+import org.apache.hudi.exception.HoodieIOException;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileStatus;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.mapred.JobConf;
+import org.apache.hadoop.mapreduce.Job;
+import org.apache.log4j.LogManager;
+import org.apache.log4j.Logger;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.function.Function;
+import java.util.stream.Collectors;
+
+public class HoodieInputFormatUtils {
+
+  private static final Logger LOG = LogManager.getLogger(HoodieInputFormatUtils.class);
+
+  /**
+   * Filter any specific instants that we do not want to process.
+   * example timeline:
+   *
+   * t0 -> create bucket1.parquet
+   * t1 -> create and append updates bucket1.log
+   * t2 -> request compaction
+   * t3 -> create bucket2.parquet
+   *
+   * if compaction at t2 takes a long time, incremental readers on RO tables can move to t3 and would skip updates in t1
+   *
+   * To workaround this problem, we want to stop returning data belonging to commits > t2.
+   * After compaction is complete, incremental reader would see updates in t2, t3, so on.
+   * @param timeline
+   * @return
+   */
+  public static HoodieDefaultTimeline filterInstantsTimeline(HoodieDefaultTimeline timeline) {
+    HoodieDefaultTimeline commitsAndCompactionTimeline = timeline.getCommitsAndCompactionTimeline();
+    Option<HoodieInstant> pendingCompactionInstant = commitsAndCompactionTimeline
+        .filterPendingCompactionTimeline().firstInstant();
+    if (pendingCompactionInstant.isPresent()) {
+      HoodieDefaultTimeline instantsTimeline = commitsAndCompactionTimeline
+          .findInstantsBefore(pendingCompactionInstant.get().getTimestamp());
+      int numCommitsFilteredByCompaction = commitsAndCompactionTimeline.getCommitsTimeline().countInstants()
+          - instantsTimeline.getCommitsTimeline().countInstants();
+      LOG.info("Earliest pending compaction instant is: " + pendingCompactionInstant.get().getTimestamp()
+          + " skipping " + numCommitsFilteredByCompaction + " commits");
+
+      return instantsTimeline;
+    } else {
+      return timeline;
+    }
+  }
+
+  /**
+   * Extract partitions touched by the commitsToCheck.
+   * @param commitsToCheck
+   * @param tableMetaClient
+   * @param timeline
+   * @param inputPaths
+   * @return
+   * @throws IOException
+   */
+  public static Option<String> getAffectedPartitions(List<HoodieInstant> commitsToCheck,
+                                      HoodieTableMetaClient tableMetaClient,
+                                      HoodieTimeline timeline,
+                                      List<Path> inputPaths) throws IOException {
+    Set<String> partitionsToList = new HashSet<>();
+    for (HoodieInstant commit : commitsToCheck) {
+      HoodieCommitMetadata commitMetadata = HoodieCommitMetadata.fromBytes(timeline.getInstantDetails(commit).get(),
+          HoodieCommitMetadata.class);
+      partitionsToList.addAll(commitMetadata.getPartitionToWriteStats().keySet());
+    }
+    if (partitionsToList.isEmpty()) {
+      return Option.empty();
+    }
+    String incrementalInputPaths = partitionsToList.stream()
+        .map(s -> tableMetaClient.getBasePath() + Path.SEPARATOR + s)
+        .filter(s -> {
+          /*
+           * Ensure to return only results from the original input path that has incremental changes
+           * This check is needed for the following corner case -  When the caller invokes
+           * HoodieInputFormat.listStatus multiple times (with small batches of Hive partitions each
+           * time. Ex. Hive fetch task calls listStatus for every partition once) we do not want to
+           * accidentally return all incremental changes for the entire table in every listStatus()
+           * call. This will create redundant splits. Instead we only want to return the incremental
+           * changes (if so any) in that batch of input paths.
+           *
+           * NOTE on Hive queries that are executed using Fetch task:
+           * Since Fetch tasks invoke InputFormat.listStatus() per partition, Hoodie metadata can be
+           * listed in every such listStatus() call. In order to avoid this, it might be useful to
+           * disable fetch tasks using the hive session property for incremental queries:
+           * `set hive.fetch.task.conversion=none;`
+           * This would ensure Map Reduce execution is chosen for a Hive query, which combines
+           * partitions (comma separated) and calls InputFormat.listStatus() only once with all
+           * those partitions.
+           */
+          for (Path path : inputPaths) {
+            if (path.toString().contains(s)) {
+              return true;
+            }
+          }
+          return false;
+        })
+        .collect(Collectors.joining(","));
+    return Option.of(incrementalInputPaths);
+  }
+
+  /**
+   * Extract HoodieTimeline based on HoodieTableMetaClient.
+   * @param job
+   * @param tableMetaClient
+   * @return
+   */
+  public static Option<HoodieTimeline> getTimeline(Job job, HoodieTableMetaClient tableMetaClient) {

Review comment:
       rename : `getFilteredCommitsTimeline()` to provide more context as well, now that the code has been moved out of the original source file.

##########
File path: hudi-hadoop-mr/src/main/java/org/apache/hudi/hadoop/utils/HoodieInputFormatUtils.java
##########
@@ -0,0 +1,336 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hudi.hadoop.utils;
+
+import org.apache.hudi.common.model.HoodieBaseFile;
+import org.apache.hudi.common.model.HoodieCommitMetadata;
+import org.apache.hudi.common.model.HoodiePartitionMetadata;
+import org.apache.hudi.common.table.HoodieTableMetaClient;
+import org.apache.hudi.common.table.timeline.HoodieDefaultTimeline;
+import org.apache.hudi.common.table.timeline.HoodieInstant;
+import org.apache.hudi.common.table.timeline.HoodieTimeline;
+import org.apache.hudi.common.table.view.HoodieTableFileSystemView;
+import org.apache.hudi.common.table.view.TableFileSystemView;
+import org.apache.hudi.common.util.Option;
+import org.apache.hudi.exception.HoodieIOException;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileStatus;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.mapred.JobConf;
+import org.apache.hadoop.mapreduce.Job;
+import org.apache.log4j.LogManager;
+import org.apache.log4j.Logger;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.function.Function;
+import java.util.stream.Collectors;
+
+public class HoodieInputFormatUtils {
+
+  private static final Logger LOG = LogManager.getLogger(HoodieInputFormatUtils.class);
+
+  /**
+   * Filter any specific instants that we do not want to process.
+   * example timeline:
+   *
+   * t0 -> create bucket1.parquet
+   * t1 -> create and append updates bucket1.log
+   * t2 -> request compaction
+   * t3 -> create bucket2.parquet
+   *
+   * if compaction at t2 takes a long time, incremental readers on RO tables can move to t3 and would skip updates in t1
+   *
+   * To workaround this problem, we want to stop returning data belonging to commits > t2.
+   * After compaction is complete, incremental reader would see updates in t2, t3, so on.
+   * @param timeline
+   * @return
+   */
+  public static HoodieDefaultTimeline filterInstantsTimeline(HoodieDefaultTimeline timeline) {
+    HoodieDefaultTimeline commitsAndCompactionTimeline = timeline.getCommitsAndCompactionTimeline();
+    Option<HoodieInstant> pendingCompactionInstant = commitsAndCompactionTimeline
+        .filterPendingCompactionTimeline().firstInstant();
+    if (pendingCompactionInstant.isPresent()) {
+      HoodieDefaultTimeline instantsTimeline = commitsAndCompactionTimeline
+          .findInstantsBefore(pendingCompactionInstant.get().getTimestamp());
+      int numCommitsFilteredByCompaction = commitsAndCompactionTimeline.getCommitsTimeline().countInstants()
+          - instantsTimeline.getCommitsTimeline().countInstants();
+      LOG.info("Earliest pending compaction instant is: " + pendingCompactionInstant.get().getTimestamp()
+          + " skipping " + numCommitsFilteredByCompaction + " commits");
+
+      return instantsTimeline;
+    } else {
+      return timeline;
+    }
+  }
+
+  /**
+   * Extract partitions touched by the commitsToCheck.
+   * @param commitsToCheck
+   * @param tableMetaClient
+   * @param timeline
+   * @param inputPaths
+   * @return
+   * @throws IOException
+   */
+  public static Option<String> getAffectedPartitions(List<HoodieInstant> commitsToCheck,
+                                      HoodieTableMetaClient tableMetaClient,
+                                      HoodieTimeline timeline,
+                                      List<Path> inputPaths) throws IOException {
+    Set<String> partitionsToList = new HashSet<>();
+    for (HoodieInstant commit : commitsToCheck) {
+      HoodieCommitMetadata commitMetadata = HoodieCommitMetadata.fromBytes(timeline.getInstantDetails(commit).get(),
+          HoodieCommitMetadata.class);
+      partitionsToList.addAll(commitMetadata.getPartitionToWriteStats().keySet());
+    }
+    if (partitionsToList.isEmpty()) {
+      return Option.empty();
+    }
+    String incrementalInputPaths = partitionsToList.stream()
+        .map(s -> tableMetaClient.getBasePath() + Path.SEPARATOR + s)
+        .filter(s -> {
+          /*
+           * Ensure to return only results from the original input path that has incremental changes
+           * This check is needed for the following corner case -  When the caller invokes
+           * HoodieInputFormat.listStatus multiple times (with small batches of Hive partitions each
+           * time. Ex. Hive fetch task calls listStatus for every partition once) we do not want to
+           * accidentally return all incremental changes for the entire table in every listStatus()
+           * call. This will create redundant splits. Instead we only want to return the incremental
+           * changes (if so any) in that batch of input paths.
+           *
+           * NOTE on Hive queries that are executed using Fetch task:
+           * Since Fetch tasks invoke InputFormat.listStatus() per partition, Hoodie metadata can be
+           * listed in every such listStatus() call. In order to avoid this, it might be useful to
+           * disable fetch tasks using the hive session property for incremental queries:
+           * `set hive.fetch.task.conversion=none;`
+           * This would ensure Map Reduce execution is chosen for a Hive query, which combines
+           * partitions (comma separated) and calls InputFormat.listStatus() only once with all
+           * those partitions.
+           */
+          for (Path path : inputPaths) {
+            if (path.toString().contains(s)) {
+              return true;
+            }
+          }
+          return false;
+        })
+        .collect(Collectors.joining(","));
+    return Option.of(incrementalInputPaths);
+  }
+
+  /**
+   * Extract HoodieTimeline based on HoodieTableMetaClient.
+   * @param job
+   * @param tableMetaClient
+   * @return
+   */
+  public static Option<HoodieTimeline> getTimeline(Job job, HoodieTableMetaClient tableMetaClient) {
+    String tableName = tableMetaClient.getTableConfig().getTableName();
+    HoodieDefaultTimeline baseTimeline;
+    if (HoodieHiveUtils.stopAtCompaction(job, tableName)) {
+      baseTimeline = filterInstantsTimeline(tableMetaClient.getActiveTimeline());
+    } else {
+      baseTimeline = tableMetaClient.getActiveTimeline();
+    }
+    return Option.of(baseTimeline.getCommitsTimeline().filterCompletedInstants());
+  }
+
+  /**
+   * Get commits to check from Hive map reduce configuration.
+   * @param job
+   * @param tableName
+   * @param timeline
+   * @return
+   */
+  public static Option<List<HoodieInstant>> getCommitsToCheck(Job job, String tableName, HoodieTimeline timeline) {
+    String lastIncrementalTs = HoodieHiveUtils.readStartCommitTime(job, tableName);
+    // Total number of commits to return in this batch. Set this to -1 to get all the commits.
+    Integer maxCommits = HoodieHiveUtils.readMaxCommits(job, tableName);
+    LOG.info("Last Incremental timestamp was set as " + lastIncrementalTs);
+    return Option.of(timeline.findInstantsAfter(lastIncrementalTs, maxCommits)
+        .getInstants().collect(Collectors.toList()));
+  }
+
+  /**
+   * Generate a HoodieTableMetaClient for a given partition.
+   * @param conf
+   * @param partitions
+   * @return
+   */
+  public static Map<Path, HoodieTableMetaClient> getMetaClientPerPartition(Configuration conf, Set<Path> partitions) {

Review comment:
       rename: `getMetaClientByBasePath()` which is whatthis really is 

##########
File path: hudi-hadoop-mr/src/main/java/org/apache/hudi/hadoop/config/HoodieRealtimeConfig.java
##########
@@ -0,0 +1,47 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hudi.hadoop.config;
+
+/**
+ * Class to hold props related to Hoodie RealtimeInputFormat and RealtimeRecordReader.
+ */
+public final class HoodieRealtimeConfig {

Review comment:
       Calling this config is a bit misleading.. these are just constants

##########
File path: hudi-hadoop-mr/src/main/java/org/apache/hudi/hadoop/config/HoodieRealtimeConfig.java
##########
@@ -0,0 +1,47 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hudi.hadoop.config;
+
+/**
+ * Class to hold props related to Hoodie RealtimeInputFormat and RealtimeRecordReader.
+ */
+public final class HoodieRealtimeConfig {
+  // These positions have to be deterministic across all tables
+  public static final int HOODIE_COMMIT_TIME_COL_POS = 0;
+  public static final int HOODIE_RECORD_KEY_COL_POS = 2;
+  public static final int HOODIE_PARTITION_PATH_COL_POS = 3;
+  public static final String HOODIE_READ_COLUMNS_PROP = "hoodie.read.columns.set";
+
+  // Fraction of mapper/reducer task memory used for compaction of log files
+  public static final String COMPACTION_MEMORY_FRACTION_PROP = "compaction.memory.fraction";
+  public static final String DEFAULT_COMPACTION_MEMORY_FRACTION = "0.75";
+  // used to choose a trade off between IO vs Memory when performing compaction process
+  // Depending on outputfile size and memory provided, choose true to avoid OOM for large file
+  // size + small memory
+  public static final String COMPACTION_LAZY_BLOCK_READ_ENABLED_PROP = "compaction.lazy.block.read.enabled";
+  public static final String DEFAULT_COMPACTION_LAZY_BLOCK_READ_ENABLED = "true";
+
+  // Property to set the max memory for dfs inputstream buffer size
+  public static final String MAX_DFS_STREAM_BUFFER_SIZE_PROP = "hoodie.memory.dfs.buffer.max.size";

Review comment:
       These do seem like configs. So just leave these out in a separate file? And move lines 25-28 back to the input format? 

##########
File path: hudi-hadoop-mr/src/main/java/org/apache/hudi/hadoop/utils/HoodieInputFormatUtils.java
##########
@@ -0,0 +1,336 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hudi.hadoop.utils;
+
+import org.apache.hudi.common.model.HoodieBaseFile;
+import org.apache.hudi.common.model.HoodieCommitMetadata;
+import org.apache.hudi.common.model.HoodiePartitionMetadata;
+import org.apache.hudi.common.table.HoodieTableMetaClient;
+import org.apache.hudi.common.table.timeline.HoodieDefaultTimeline;
+import org.apache.hudi.common.table.timeline.HoodieInstant;
+import org.apache.hudi.common.table.timeline.HoodieTimeline;
+import org.apache.hudi.common.table.view.HoodieTableFileSystemView;
+import org.apache.hudi.common.table.view.TableFileSystemView;
+import org.apache.hudi.common.util.Option;
+import org.apache.hudi.exception.HoodieIOException;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileStatus;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.mapred.JobConf;
+import org.apache.hadoop.mapreduce.Job;
+import org.apache.log4j.LogManager;
+import org.apache.log4j.Logger;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.function.Function;
+import java.util.stream.Collectors;
+
+public class HoodieInputFormatUtils {
+
+  private static final Logger LOG = LogManager.getLogger(HoodieInputFormatUtils.class);
+
+  /**
+   * Filter any specific instants that we do not want to process.
+   * example timeline:
+   *
+   * t0 -> create bucket1.parquet
+   * t1 -> create and append updates bucket1.log
+   * t2 -> request compaction
+   * t3 -> create bucket2.parquet
+   *
+   * if compaction at t2 takes a long time, incremental readers on RO tables can move to t3 and would skip updates in t1
+   *
+   * To workaround this problem, we want to stop returning data belonging to commits > t2.
+   * After compaction is complete, incremental reader would see updates in t2, t3, so on.
+   * @param timeline
+   * @return
+   */
+  public static HoodieDefaultTimeline filterInstantsTimeline(HoodieDefaultTimeline timeline) {
+    HoodieDefaultTimeline commitsAndCompactionTimeline = timeline.getCommitsAndCompactionTimeline();
+    Option<HoodieInstant> pendingCompactionInstant = commitsAndCompactionTimeline
+        .filterPendingCompactionTimeline().firstInstant();
+    if (pendingCompactionInstant.isPresent()) {
+      HoodieDefaultTimeline instantsTimeline = commitsAndCompactionTimeline
+          .findInstantsBefore(pendingCompactionInstant.get().getTimestamp());
+      int numCommitsFilteredByCompaction = commitsAndCompactionTimeline.getCommitsTimeline().countInstants()
+          - instantsTimeline.getCommitsTimeline().countInstants();
+      LOG.info("Earliest pending compaction instant is: " + pendingCompactionInstant.get().getTimestamp()
+          + " skipping " + numCommitsFilteredByCompaction + " commits");
+
+      return instantsTimeline;
+    } else {
+      return timeline;
+    }
+  }
+
+  /**
+   * Extract partitions touched by the commitsToCheck.
+   * @param commitsToCheck
+   * @param tableMetaClient
+   * @param timeline
+   * @param inputPaths
+   * @return
+   * @throws IOException
+   */
+  public static Option<String> getAffectedPartitions(List<HoodieInstant> commitsToCheck,
+                                      HoodieTableMetaClient tableMetaClient,
+                                      HoodieTimeline timeline,
+                                      List<Path> inputPaths) throws IOException {
+    Set<String> partitionsToList = new HashSet<>();
+    for (HoodieInstant commit : commitsToCheck) {
+      HoodieCommitMetadata commitMetadata = HoodieCommitMetadata.fromBytes(timeline.getInstantDetails(commit).get(),
+          HoodieCommitMetadata.class);
+      partitionsToList.addAll(commitMetadata.getPartitionToWriteStats().keySet());
+    }
+    if (partitionsToList.isEmpty()) {
+      return Option.empty();
+    }
+    String incrementalInputPaths = partitionsToList.stream()
+        .map(s -> tableMetaClient.getBasePath() + Path.SEPARATOR + s)
+        .filter(s -> {
+          /*
+           * Ensure to return only results from the original input path that has incremental changes
+           * This check is needed for the following corner case -  When the caller invokes
+           * HoodieInputFormat.listStatus multiple times (with small batches of Hive partitions each
+           * time. Ex. Hive fetch task calls listStatus for every partition once) we do not want to
+           * accidentally return all incremental changes for the entire table in every listStatus()
+           * call. This will create redundant splits. Instead we only want to return the incremental
+           * changes (if so any) in that batch of input paths.
+           *
+           * NOTE on Hive queries that are executed using Fetch task:
+           * Since Fetch tasks invoke InputFormat.listStatus() per partition, Hoodie metadata can be
+           * listed in every such listStatus() call. In order to avoid this, it might be useful to
+           * disable fetch tasks using the hive session property for incremental queries:
+           * `set hive.fetch.task.conversion=none;`
+           * This would ensure Map Reduce execution is chosen for a Hive query, which combines
+           * partitions (comma separated) and calls InputFormat.listStatus() only once with all
+           * those partitions.
+           */
+          for (Path path : inputPaths) {
+            if (path.toString().contains(s)) {
+              return true;
+            }
+          }
+          return false;
+        })
+        .collect(Collectors.joining(","));
+    return Option.of(incrementalInputPaths);
+  }
+
+  /**
+   * Extract HoodieTimeline based on HoodieTableMetaClient.
+   * @param job
+   * @param tableMetaClient
+   * @return
+   */
+  public static Option<HoodieTimeline> getTimeline(Job job, HoodieTableMetaClient tableMetaClient) {
+    String tableName = tableMetaClient.getTableConfig().getTableName();
+    HoodieDefaultTimeline baseTimeline;
+    if (HoodieHiveUtils.stopAtCompaction(job, tableName)) {
+      baseTimeline = filterInstantsTimeline(tableMetaClient.getActiveTimeline());
+    } else {
+      baseTimeline = tableMetaClient.getActiveTimeline();
+    }
+    return Option.of(baseTimeline.getCommitsTimeline().filterCompletedInstants());
+  }
+
+  /**
+   * Get commits to check from Hive map reduce configuration.
+   * @param job
+   * @param tableName
+   * @param timeline
+   * @return
+   */
+  public static Option<List<HoodieInstant>> getCommitsToCheck(Job job, String tableName, HoodieTimeline timeline) {
+    String lastIncrementalTs = HoodieHiveUtils.readStartCommitTime(job, tableName);
+    // Total number of commits to return in this batch. Set this to -1 to get all the commits.
+    Integer maxCommits = HoodieHiveUtils.readMaxCommits(job, tableName);
+    LOG.info("Last Incremental timestamp was set as " + lastIncrementalTs);
+    return Option.of(timeline.findInstantsAfter(lastIncrementalTs, maxCommits)
+        .getInstants().collect(Collectors.toList()));
+  }
+
+  /**
+   * Generate a HoodieTableMetaClient for a given partition.
+   * @param conf
+   * @param partitions
+   * @return
+   */
+  public static Map<Path, HoodieTableMetaClient> getMetaClientPerPartition(Configuration conf, Set<Path> partitions) {
+    Map<String, HoodieTableMetaClient> metaClientMap = new HashMap<>();
+    return partitions.stream().collect(Collectors.toMap(Function.identity(), p -> {
+      // find if we have a metaclient already for this partition.
+      Option<String> matchingBasePath = Option.fromJavaOptional(
+          metaClientMap.keySet().stream().filter(basePath -> p.toString().startsWith(basePath)).findFirst());
+      if (matchingBasePath.isPresent()) {
+        return metaClientMap.get(matchingBasePath.get());
+      }
+
+      try {
+        HoodieTableMetaClient metaClient = getTableMetaClient(p.getFileSystem(conf), p);
+        metaClientMap.put(metaClient.getBasePath(), metaClient);
+        return metaClient;
+      } catch (IOException e) {
+        throw new HoodieIOException("Error creating hoodie meta client against : " + p, e);
+      }
+    }));
+  }
+
+  /**
+   * Extract HoodieTableMetaClient from a partition path(not base path).
+   * @param fs
+   * @param dataPath
+   * @return
+   * @throws IOException
+   */
+  public static HoodieTableMetaClient getTableMetaClient(FileSystem fs, Path dataPath) throws IOException {
+    int levels = HoodieHiveUtils.DEFAULT_LEVELS_TO_BASEPATH;
+    if (HoodiePartitionMetadata.hasPartitionMetadata(fs, dataPath)) {
+      HoodiePartitionMetadata metadata = new HoodiePartitionMetadata(fs, dataPath);
+      metadata.readFromFS();
+      levels = metadata.getPartitionDepth();
+    }
+    Path baseDir = HoodieHiveUtils.getNthParent(dataPath, levels);
+    LOG.info("Reading hoodie metadata from path " + baseDir.toString());
+    return new HoodieTableMetaClient(fs.getConf(), baseDir.toString());
+  }
+
+  /**
+   * Filter a list of FileStatus based on commitsToCheck for incremental view.
+   * @param job
+   * @param tableMetaClient
+   * @param timeline
+   * @param fileStatuses
+   * @param commitsToCheck
+   * @return
+   */
+  public static List<FileStatus> filterIncrementalFileStatus(
+      Job job, HoodieTableMetaClient tableMetaClient, HoodieTimeline timeline,

Review comment:
       optional: consistent folding style for args? 

##########
File path: hudi-hadoop-mr/src/main/java/org/apache/hudi/hadoop/utils/HoodieInputFormatUtils.java
##########
@@ -0,0 +1,336 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hudi.hadoop.utils;
+
+import org.apache.hudi.common.model.HoodieBaseFile;
+import org.apache.hudi.common.model.HoodieCommitMetadata;
+import org.apache.hudi.common.model.HoodiePartitionMetadata;
+import org.apache.hudi.common.table.HoodieTableMetaClient;
+import org.apache.hudi.common.table.timeline.HoodieDefaultTimeline;
+import org.apache.hudi.common.table.timeline.HoodieInstant;
+import org.apache.hudi.common.table.timeline.HoodieTimeline;
+import org.apache.hudi.common.table.view.HoodieTableFileSystemView;
+import org.apache.hudi.common.table.view.TableFileSystemView;
+import org.apache.hudi.common.util.Option;
+import org.apache.hudi.exception.HoodieIOException;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileStatus;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.mapred.JobConf;
+import org.apache.hadoop.mapreduce.Job;
+import org.apache.log4j.LogManager;
+import org.apache.log4j.Logger;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.function.Function;
+import java.util.stream.Collectors;
+
+public class HoodieInputFormatUtils {
+
+  private static final Logger LOG = LogManager.getLogger(HoodieInputFormatUtils.class);
+
+  /**
+   * Filter any specific instants that we do not want to process.
+   * example timeline:
+   *
+   * t0 -> create bucket1.parquet
+   * t1 -> create and append updates bucket1.log
+   * t2 -> request compaction
+   * t3 -> create bucket2.parquet
+   *
+   * if compaction at t2 takes a long time, incremental readers on RO tables can move to t3 and would skip updates in t1
+   *
+   * To workaround this problem, we want to stop returning data belonging to commits > t2.
+   * After compaction is complete, incremental reader would see updates in t2, t3, so on.
+   * @param timeline
+   * @return
+   */
+  public static HoodieDefaultTimeline filterInstantsTimeline(HoodieDefaultTimeline timeline) {
+    HoodieDefaultTimeline commitsAndCompactionTimeline = timeline.getCommitsAndCompactionTimeline();
+    Option<HoodieInstant> pendingCompactionInstant = commitsAndCompactionTimeline
+        .filterPendingCompactionTimeline().firstInstant();
+    if (pendingCompactionInstant.isPresent()) {
+      HoodieDefaultTimeline instantsTimeline = commitsAndCompactionTimeline
+          .findInstantsBefore(pendingCompactionInstant.get().getTimestamp());
+      int numCommitsFilteredByCompaction = commitsAndCompactionTimeline.getCommitsTimeline().countInstants()
+          - instantsTimeline.getCommitsTimeline().countInstants();
+      LOG.info("Earliest pending compaction instant is: " + pendingCompactionInstant.get().getTimestamp()
+          + " skipping " + numCommitsFilteredByCompaction + " commits");
+
+      return instantsTimeline;
+    } else {
+      return timeline;
+    }
+  }
+
+  /**
+   * Extract partitions touched by the commitsToCheck.
+   * @param commitsToCheck
+   * @param tableMetaClient
+   * @param timeline
+   * @param inputPaths
+   * @return
+   * @throws IOException
+   */
+  public static Option<String> getAffectedPartitions(List<HoodieInstant> commitsToCheck,
+                                      HoodieTableMetaClient tableMetaClient,
+                                      HoodieTimeline timeline,
+                                      List<Path> inputPaths) throws IOException {
+    Set<String> partitionsToList = new HashSet<>();
+    for (HoodieInstant commit : commitsToCheck) {
+      HoodieCommitMetadata commitMetadata = HoodieCommitMetadata.fromBytes(timeline.getInstantDetails(commit).get(),
+          HoodieCommitMetadata.class);
+      partitionsToList.addAll(commitMetadata.getPartitionToWriteStats().keySet());
+    }
+    if (partitionsToList.isEmpty()) {
+      return Option.empty();
+    }
+    String incrementalInputPaths = partitionsToList.stream()
+        .map(s -> tableMetaClient.getBasePath() + Path.SEPARATOR + s)
+        .filter(s -> {
+          /*
+           * Ensure to return only results from the original input path that has incremental changes
+           * This check is needed for the following corner case -  When the caller invokes
+           * HoodieInputFormat.listStatus multiple times (with small batches of Hive partitions each
+           * time. Ex. Hive fetch task calls listStatus for every partition once) we do not want to
+           * accidentally return all incremental changes for the entire table in every listStatus()
+           * call. This will create redundant splits. Instead we only want to return the incremental
+           * changes (if so any) in that batch of input paths.
+           *
+           * NOTE on Hive queries that are executed using Fetch task:
+           * Since Fetch tasks invoke InputFormat.listStatus() per partition, Hoodie metadata can be
+           * listed in every such listStatus() call. In order to avoid this, it might be useful to
+           * disable fetch tasks using the hive session property for incremental queries:
+           * `set hive.fetch.task.conversion=none;`
+           * This would ensure Map Reduce execution is chosen for a Hive query, which combines
+           * partitions (comma separated) and calls InputFormat.listStatus() only once with all
+           * those partitions.
+           */
+          for (Path path : inputPaths) {
+            if (path.toString().contains(s)) {
+              return true;
+            }
+          }
+          return false;
+        })
+        .collect(Collectors.joining(","));
+    return Option.of(incrementalInputPaths);
+  }
+
+  /**
+   * Extract HoodieTimeline based on HoodieTableMetaClient.
+   * @param job
+   * @param tableMetaClient
+   * @return
+   */
+  public static Option<HoodieTimeline> getTimeline(Job job, HoodieTableMetaClient tableMetaClient) {
+    String tableName = tableMetaClient.getTableConfig().getTableName();
+    HoodieDefaultTimeline baseTimeline;
+    if (HoodieHiveUtils.stopAtCompaction(job, tableName)) {
+      baseTimeline = filterInstantsTimeline(tableMetaClient.getActiveTimeline());
+    } else {
+      baseTimeline = tableMetaClient.getActiveTimeline();
+    }
+    return Option.of(baseTimeline.getCommitsTimeline().filterCompletedInstants());
+  }
+
+  /**
+   * Get commits to check from Hive map reduce configuration.
+   * @param job
+   * @param tableName
+   * @param timeline
+   * @return
+   */
+  public static Option<List<HoodieInstant>> getCommitsToCheck(Job job, String tableName, HoodieTimeline timeline) {
+    String lastIncrementalTs = HoodieHiveUtils.readStartCommitTime(job, tableName);
+    // Total number of commits to return in this batch. Set this to -1 to get all the commits.
+    Integer maxCommits = HoodieHiveUtils.readMaxCommits(job, tableName);
+    LOG.info("Last Incremental timestamp was set as " + lastIncrementalTs);
+    return Option.of(timeline.findInstantsAfter(lastIncrementalTs, maxCommits)
+        .getInstants().collect(Collectors.toList()));
+  }
+
+  /**
+   * Generate a HoodieTableMetaClient for a given partition.
+   * @param conf
+   * @param partitions
+   * @return
+   */
+  public static Map<Path, HoodieTableMetaClient> getMetaClientPerPartition(Configuration conf, Set<Path> partitions) {
+    Map<String, HoodieTableMetaClient> metaClientMap = new HashMap<>();
+    return partitions.stream().collect(Collectors.toMap(Function.identity(), p -> {
+      // find if we have a metaclient already for this partition.
+      Option<String> matchingBasePath = Option.fromJavaOptional(
+          metaClientMap.keySet().stream().filter(basePath -> p.toString().startsWith(basePath)).findFirst());
+      if (matchingBasePath.isPresent()) {
+        return metaClientMap.get(matchingBasePath.get());
+      }
+
+      try {
+        HoodieTableMetaClient metaClient = getTableMetaClient(p.getFileSystem(conf), p);
+        metaClientMap.put(metaClient.getBasePath(), metaClient);
+        return metaClient;
+      } catch (IOException e) {
+        throw new HoodieIOException("Error creating hoodie meta client against : " + p, e);
+      }
+    }));
+  }
+
+  /**
+   * Extract HoodieTableMetaClient from a partition path(not base path).
+   * @param fs
+   * @param dataPath
+   * @return
+   * @throws IOException
+   */
+  public static HoodieTableMetaClient getTableMetaClient(FileSystem fs, Path dataPath) throws IOException {
+    int levels = HoodieHiveUtils.DEFAULT_LEVELS_TO_BASEPATH;
+    if (HoodiePartitionMetadata.hasPartitionMetadata(fs, dataPath)) {
+      HoodiePartitionMetadata metadata = new HoodiePartitionMetadata(fs, dataPath);
+      metadata.readFromFS();
+      levels = metadata.getPartitionDepth();
+    }
+    Path baseDir = HoodieHiveUtils.getNthParent(dataPath, levels);
+    LOG.info("Reading hoodie metadata from path " + baseDir.toString());
+    return new HoodieTableMetaClient(fs.getConf(), baseDir.toString());
+  }
+
+  /**
+   * Filter a list of FileStatus based on commitsToCheck for incremental view.
+   * @param job
+   * @param tableMetaClient
+   * @param timeline
+   * @param fileStatuses
+   * @param commitsToCheck
+   * @return
+   */
+  public static List<FileStatus> filterIncrementalFileStatus(
+      Job job, HoodieTableMetaClient tableMetaClient, HoodieTimeline timeline,
+      FileStatus[] fileStatuses, List<HoodieInstant> commitsToCheck) {
+    TableFileSystemView.BaseFileOnlyView roView = new HoodieTableFileSystemView(tableMetaClient, timeline, fileStatuses);
+    List<String> commitsList = commitsToCheck.stream().map(HoodieInstant::getTimestamp).collect(Collectors.toList());
+    List<HoodieBaseFile> filteredFiles = roView.getLatestBaseFilesInRange(commitsList).collect(Collectors.toList());
+    List<FileStatus> returns = new ArrayList<>();
+    for (HoodieBaseFile filteredFile : filteredFiles) {
+      LOG.debug("Processing incremental hoodie file - " + filteredFile.getPath());
+      filteredFile = checkFileStatus(job.getConfiguration(), filteredFile);
+      returns.add(filteredFile.getFileStatus());
+    }
+    LOG.info("Total paths to process after hoodie incremental filter " + filteredFiles.size());
+    return returns;
+  }
+
+  /**
+   * Takes in a list of filesStatus and a list of table metadatas. Groups the files status list
+   * based on given table metadata.
+   * @param fileStatuses
+   * @param metaClientList
+   * @return
+   * @throws IOException
+   */
+  public static Map<HoodieTableMetaClient, List<FileStatus>> groupFileStatusForSnapshotPaths(
+      FileStatus[] fileStatuses, Collection<HoodieTableMetaClient> metaClientList) {
+    // This assumes the paths for different tables are grouped together
+    Map<HoodieTableMetaClient, List<FileStatus>> grouped = new HashMap<>();
+    HoodieTableMetaClient metadata = null;
+    for (FileStatus status : fileStatuses) {
+      Path inputPath = status.getPath();
+      if (!inputPath.getName().endsWith(".parquet")) {
+        //FIXME(vc): skip non parquet files for now. This wont be needed once log file name start
+        // with "."
+        continue;
+      }
+      if ((metadata == null) || (!inputPath.toString().contains(metadata.getBasePath()))) {
+        for (HoodieTableMetaClient metaClient : metaClientList) {
+          if (inputPath.toString().contains(metaClient.getBasePath())) {
+            metadata = metaClient;
+            if (!grouped.containsKey(metadata)) {
+              grouped.put(metadata, new ArrayList<>());
+            }
+            break;
+          }
+        }
+      }
+      grouped.get(metadata).add(status);
+    }
+    return grouped;
+  }
+
+  /**
+   * Filters data files for a snapshot queried table.
+   * @param job
+   * @param metadata
+   * @param fileStatuses
+   * @return
+   */
+  public static List<FileStatus> filterFileStatusForSnapshotMode(
+      JobConf job, HoodieTableMetaClient metadata, List<FileStatus> fileStatuses) {
+    FileStatus[] statuses = fileStatuses.toArray(new FileStatus[0]);
+    if (LOG.isDebugEnabled()) {
+      LOG.debug("Hoodie Metadata initialized with completed commit Ts as :" + metadata);
+    }
+    // Get all commits, delta commits, compactions, as all of them produce a base parquet file today
+    HoodieTimeline timeline = metadata.getActiveTimeline().getCommitsTimeline().filterCompletedInstants();
+    TableFileSystemView.BaseFileOnlyView roView = new HoodieTableFileSystemView(metadata, timeline, statuses);
+    // filter files on the latest commit found
+    List<HoodieBaseFile> filteredFiles = roView.getLatestBaseFiles().collect(Collectors.toList());
+    LOG.info("Total paths to process after hoodie filter " + filteredFiles.size());
+    List<FileStatus> returns = new ArrayList<>();
+    for (HoodieBaseFile filteredFile : filteredFiles) {
+      if (LOG.isDebugEnabled()) {
+        LOG.debug("Processing latest hoodie file - " + filteredFile.getPath());
+      }
+      filteredFile = checkFileStatus(job, filteredFile);
+      returns.add(filteredFile.getFileStatus());
+    }
+    return returns;
+  }
+
+  /**
+   * Checks the file status for a race condition which can set the file size to 0. 1. HiveInputFormat does
+   * super.listStatus() and gets back a FileStatus[] 2. Then it creates the HoodieTableMetaClient for the paths listed.
+   * 3. Generation of splits looks at FileStatus size to create splits, which skips this file
+   * @param conf
+   * @param dataFile
+   * @return
+   */
+  private static HoodieBaseFile checkFileStatus(Configuration conf, HoodieBaseFile dataFile) {

Review comment:
       rename: refreshFileStatus

##########
File path: hudi-hadoop-mr/src/main/java/org/apache/hudi/hadoop/utils/HoodieInputFormatUtils.java
##########
@@ -0,0 +1,336 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hudi.hadoop.utils;
+
+import org.apache.hudi.common.model.HoodieBaseFile;
+import org.apache.hudi.common.model.HoodieCommitMetadata;
+import org.apache.hudi.common.model.HoodiePartitionMetadata;
+import org.apache.hudi.common.table.HoodieTableMetaClient;
+import org.apache.hudi.common.table.timeline.HoodieDefaultTimeline;
+import org.apache.hudi.common.table.timeline.HoodieInstant;
+import org.apache.hudi.common.table.timeline.HoodieTimeline;
+import org.apache.hudi.common.table.view.HoodieTableFileSystemView;
+import org.apache.hudi.common.table.view.TableFileSystemView;
+import org.apache.hudi.common.util.Option;
+import org.apache.hudi.exception.HoodieIOException;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileStatus;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.mapred.JobConf;
+import org.apache.hadoop.mapreduce.Job;
+import org.apache.log4j.LogManager;
+import org.apache.log4j.Logger;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.function.Function;
+import java.util.stream.Collectors;
+
+public class HoodieInputFormatUtils {
+
+  private static final Logger LOG = LogManager.getLogger(HoodieInputFormatUtils.class);
+
+  /**
+   * Filter any specific instants that we do not want to process.
+   * example timeline:
+   *
+   * t0 -> create bucket1.parquet
+   * t1 -> create and append updates bucket1.log
+   * t2 -> request compaction
+   * t3 -> create bucket2.parquet
+   *
+   * if compaction at t2 takes a long time, incremental readers on RO tables can move to t3 and would skip updates in t1
+   *
+   * To workaround this problem, we want to stop returning data belonging to commits > t2.
+   * After compaction is complete, incremental reader would see updates in t2, t3, so on.
+   * @param timeline
+   * @return
+   */
+  public static HoodieDefaultTimeline filterInstantsTimeline(HoodieDefaultTimeline timeline) {
+    HoodieDefaultTimeline commitsAndCompactionTimeline = timeline.getCommitsAndCompactionTimeline();
+    Option<HoodieInstant> pendingCompactionInstant = commitsAndCompactionTimeline
+        .filterPendingCompactionTimeline().firstInstant();
+    if (pendingCompactionInstant.isPresent()) {
+      HoodieDefaultTimeline instantsTimeline = commitsAndCompactionTimeline
+          .findInstantsBefore(pendingCompactionInstant.get().getTimestamp());
+      int numCommitsFilteredByCompaction = commitsAndCompactionTimeline.getCommitsTimeline().countInstants()
+          - instantsTimeline.getCommitsTimeline().countInstants();
+      LOG.info("Earliest pending compaction instant is: " + pendingCompactionInstant.get().getTimestamp()
+          + " skipping " + numCommitsFilteredByCompaction + " commits");
+
+      return instantsTimeline;
+    } else {
+      return timeline;
+    }
+  }
+
+  /**
+   * Extract partitions touched by the commitsToCheck.
+   * @param commitsToCheck
+   * @param tableMetaClient
+   * @param timeline
+   * @param inputPaths
+   * @return
+   * @throws IOException
+   */
+  public static Option<String> getAffectedPartitions(List<HoodieInstant> commitsToCheck,
+                                      HoodieTableMetaClient tableMetaClient,
+                                      HoodieTimeline timeline,
+                                      List<Path> inputPaths) throws IOException {
+    Set<String> partitionsToList = new HashSet<>();
+    for (HoodieInstant commit : commitsToCheck) {
+      HoodieCommitMetadata commitMetadata = HoodieCommitMetadata.fromBytes(timeline.getInstantDetails(commit).get(),
+          HoodieCommitMetadata.class);
+      partitionsToList.addAll(commitMetadata.getPartitionToWriteStats().keySet());
+    }
+    if (partitionsToList.isEmpty()) {
+      return Option.empty();
+    }
+    String incrementalInputPaths = partitionsToList.stream()
+        .map(s -> tableMetaClient.getBasePath() + Path.SEPARATOR + s)
+        .filter(s -> {
+          /*
+           * Ensure to return only results from the original input path that has incremental changes
+           * This check is needed for the following corner case -  When the caller invokes
+           * HoodieInputFormat.listStatus multiple times (with small batches of Hive partitions each
+           * time. Ex. Hive fetch task calls listStatus for every partition once) we do not want to
+           * accidentally return all incremental changes for the entire table in every listStatus()
+           * call. This will create redundant splits. Instead we only want to return the incremental
+           * changes (if so any) in that batch of input paths.
+           *
+           * NOTE on Hive queries that are executed using Fetch task:
+           * Since Fetch tasks invoke InputFormat.listStatus() per partition, Hoodie metadata can be
+           * listed in every such listStatus() call. In order to avoid this, it might be useful to
+           * disable fetch tasks using the hive session property for incremental queries:
+           * `set hive.fetch.task.conversion=none;`
+           * This would ensure Map Reduce execution is chosen for a Hive query, which combines
+           * partitions (comma separated) and calls InputFormat.listStatus() only once with all
+           * those partitions.
+           */
+          for (Path path : inputPaths) {
+            if (path.toString().contains(s)) {
+              return true;
+            }
+          }
+          return false;
+        })
+        .collect(Collectors.joining(","));
+    return Option.of(incrementalInputPaths);
+  }
+
+  /**
+   * Extract HoodieTimeline based on HoodieTableMetaClient.
+   * @param job
+   * @param tableMetaClient
+   * @return
+   */
+  public static Option<HoodieTimeline> getTimeline(Job job, HoodieTableMetaClient tableMetaClient) {
+    String tableName = tableMetaClient.getTableConfig().getTableName();
+    HoodieDefaultTimeline baseTimeline;
+    if (HoodieHiveUtils.stopAtCompaction(job, tableName)) {
+      baseTimeline = filterInstantsTimeline(tableMetaClient.getActiveTimeline());
+    } else {
+      baseTimeline = tableMetaClient.getActiveTimeline();
+    }
+    return Option.of(baseTimeline.getCommitsTimeline().filterCompletedInstants());
+  }
+
+  /**
+   * Get commits to check from Hive map reduce configuration.
+   * @param job
+   * @param tableName
+   * @param timeline
+   * @return
+   */
+  public static Option<List<HoodieInstant>> getCommitsToCheck(Job job, String tableName, HoodieTimeline timeline) {
+    String lastIncrementalTs = HoodieHiveUtils.readStartCommitTime(job, tableName);
+    // Total number of commits to return in this batch. Set this to -1 to get all the commits.
+    Integer maxCommits = HoodieHiveUtils.readMaxCommits(job, tableName);
+    LOG.info("Last Incremental timestamp was set as " + lastIncrementalTs);
+    return Option.of(timeline.findInstantsAfter(lastIncrementalTs, maxCommits)
+        .getInstants().collect(Collectors.toList()));
+  }
+
+  /**
+   * Generate a HoodieTableMetaClient for a given partition.
+   * @param conf
+   * @param partitions
+   * @return
+   */
+  public static Map<Path, HoodieTableMetaClient> getMetaClientPerPartition(Configuration conf, Set<Path> partitions) {
+    Map<String, HoodieTableMetaClient> metaClientMap = new HashMap<>();
+    return partitions.stream().collect(Collectors.toMap(Function.identity(), p -> {
+      // find if we have a metaclient already for this partition.
+      Option<String> matchingBasePath = Option.fromJavaOptional(
+          metaClientMap.keySet().stream().filter(basePath -> p.toString().startsWith(basePath)).findFirst());
+      if (matchingBasePath.isPresent()) {
+        return metaClientMap.get(matchingBasePath.get());
+      }
+
+      try {
+        HoodieTableMetaClient metaClient = getTableMetaClient(p.getFileSystem(conf), p);
+        metaClientMap.put(metaClient.getBasePath(), metaClient);
+        return metaClient;
+      } catch (IOException e) {
+        throw new HoodieIOException("Error creating hoodie meta client against : " + p, e);
+      }
+    }));
+  }
+
+  /**
+   * Extract HoodieTableMetaClient from a partition path(not base path).
+   * @param fs
+   * @param dataPath
+   * @return
+   * @throws IOException
+   */
+  public static HoodieTableMetaClient getTableMetaClient(FileSystem fs, Path dataPath) throws IOException {

Review comment:
       rename: getTableMetaClientForBasePath() 

##########
File path: hudi-hadoop-mr/src/main/java/org/apache/hudi/hadoop/utils/HoodieInputFormatUtils.java
##########
@@ -0,0 +1,336 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hudi.hadoop.utils;
+
+import org.apache.hudi.common.model.HoodieBaseFile;
+import org.apache.hudi.common.model.HoodieCommitMetadata;
+import org.apache.hudi.common.model.HoodiePartitionMetadata;
+import org.apache.hudi.common.table.HoodieTableMetaClient;
+import org.apache.hudi.common.table.timeline.HoodieDefaultTimeline;
+import org.apache.hudi.common.table.timeline.HoodieInstant;
+import org.apache.hudi.common.table.timeline.HoodieTimeline;
+import org.apache.hudi.common.table.view.HoodieTableFileSystemView;
+import org.apache.hudi.common.table.view.TableFileSystemView;
+import org.apache.hudi.common.util.Option;
+import org.apache.hudi.exception.HoodieIOException;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileStatus;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.mapred.JobConf;
+import org.apache.hadoop.mapreduce.Job;
+import org.apache.log4j.LogManager;
+import org.apache.log4j.Logger;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.function.Function;
+import java.util.stream.Collectors;
+
+public class HoodieInputFormatUtils {
+
+  private static final Logger LOG = LogManager.getLogger(HoodieInputFormatUtils.class);
+
+  /**
+   * Filter any specific instants that we do not want to process.
+   * example timeline:
+   *
+   * t0 -> create bucket1.parquet
+   * t1 -> create and append updates bucket1.log
+   * t2 -> request compaction
+   * t3 -> create bucket2.parquet
+   *
+   * if compaction at t2 takes a long time, incremental readers on RO tables can move to t3 and would skip updates in t1
+   *
+   * To workaround this problem, we want to stop returning data belonging to commits > t2.
+   * After compaction is complete, incremental reader would see updates in t2, t3, so on.
+   * @param timeline
+   * @return
+   */
+  public static HoodieDefaultTimeline filterInstantsTimeline(HoodieDefaultTimeline timeline) {
+    HoodieDefaultTimeline commitsAndCompactionTimeline = timeline.getCommitsAndCompactionTimeline();
+    Option<HoodieInstant> pendingCompactionInstant = commitsAndCompactionTimeline
+        .filterPendingCompactionTimeline().firstInstant();
+    if (pendingCompactionInstant.isPresent()) {
+      HoodieDefaultTimeline instantsTimeline = commitsAndCompactionTimeline
+          .findInstantsBefore(pendingCompactionInstant.get().getTimestamp());
+      int numCommitsFilteredByCompaction = commitsAndCompactionTimeline.getCommitsTimeline().countInstants()
+          - instantsTimeline.getCommitsTimeline().countInstants();
+      LOG.info("Earliest pending compaction instant is: " + pendingCompactionInstant.get().getTimestamp()
+          + " skipping " + numCommitsFilteredByCompaction + " commits");
+
+      return instantsTimeline;
+    } else {
+      return timeline;
+    }
+  }
+
+  /**
+   * Extract partitions touched by the commitsToCheck.
+   * @param commitsToCheck
+   * @param tableMetaClient
+   * @param timeline
+   * @param inputPaths
+   * @return
+   * @throws IOException
+   */
+  public static Option<String> getAffectedPartitions(List<HoodieInstant> commitsToCheck,
+                                      HoodieTableMetaClient tableMetaClient,
+                                      HoodieTimeline timeline,
+                                      List<Path> inputPaths) throws IOException {
+    Set<String> partitionsToList = new HashSet<>();
+    for (HoodieInstant commit : commitsToCheck) {
+      HoodieCommitMetadata commitMetadata = HoodieCommitMetadata.fromBytes(timeline.getInstantDetails(commit).get(),
+          HoodieCommitMetadata.class);
+      partitionsToList.addAll(commitMetadata.getPartitionToWriteStats().keySet());
+    }
+    if (partitionsToList.isEmpty()) {
+      return Option.empty();
+    }
+    String incrementalInputPaths = partitionsToList.stream()
+        .map(s -> tableMetaClient.getBasePath() + Path.SEPARATOR + s)
+        .filter(s -> {
+          /*
+           * Ensure to return only results from the original input path that has incremental changes
+           * This check is needed for the following corner case -  When the caller invokes
+           * HoodieInputFormat.listStatus multiple times (with small batches of Hive partitions each
+           * time. Ex. Hive fetch task calls listStatus for every partition once) we do not want to
+           * accidentally return all incremental changes for the entire table in every listStatus()
+           * call. This will create redundant splits. Instead we only want to return the incremental
+           * changes (if so any) in that batch of input paths.
+           *
+           * NOTE on Hive queries that are executed using Fetch task:
+           * Since Fetch tasks invoke InputFormat.listStatus() per partition, Hoodie metadata can be
+           * listed in every such listStatus() call. In order to avoid this, it might be useful to
+           * disable fetch tasks using the hive session property for incremental queries:
+           * `set hive.fetch.task.conversion=none;`
+           * This would ensure Map Reduce execution is chosen for a Hive query, which combines
+           * partitions (comma separated) and calls InputFormat.listStatus() only once with all
+           * those partitions.
+           */
+          for (Path path : inputPaths) {
+            if (path.toString().contains(s)) {
+              return true;
+            }
+          }
+          return false;
+        })
+        .collect(Collectors.joining(","));
+    return Option.of(incrementalInputPaths);
+  }
+
+  /**
+   * Extract HoodieTimeline based on HoodieTableMetaClient.
+   * @param job
+   * @param tableMetaClient
+   * @return
+   */
+  public static Option<HoodieTimeline> getTimeline(Job job, HoodieTableMetaClient tableMetaClient) {
+    String tableName = tableMetaClient.getTableConfig().getTableName();
+    HoodieDefaultTimeline baseTimeline;
+    if (HoodieHiveUtils.stopAtCompaction(job, tableName)) {
+      baseTimeline = filterInstantsTimeline(tableMetaClient.getActiveTimeline());
+    } else {
+      baseTimeline = tableMetaClient.getActiveTimeline();
+    }
+    return Option.of(baseTimeline.getCommitsTimeline().filterCompletedInstants());
+  }
+
+  /**
+   * Get commits to check from Hive map reduce configuration.
+   * @param job
+   * @param tableName
+   * @param timeline
+   * @return
+   */
+  public static Option<List<HoodieInstant>> getCommitsToCheck(Job job, String tableName, HoodieTimeline timeline) {

Review comment:
       rename: `getCommitsForIncrementalQuery()` to give more context. 

##########
File path: hudi-hadoop-mr/src/main/java/org/apache/hudi/hadoop/utils/HoodieInputFormatUtils.java
##########
@@ -0,0 +1,336 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hudi.hadoop.utils;
+
+import org.apache.hudi.common.model.HoodieBaseFile;
+import org.apache.hudi.common.model.HoodieCommitMetadata;
+import org.apache.hudi.common.model.HoodiePartitionMetadata;
+import org.apache.hudi.common.table.HoodieTableMetaClient;
+import org.apache.hudi.common.table.timeline.HoodieDefaultTimeline;
+import org.apache.hudi.common.table.timeline.HoodieInstant;
+import org.apache.hudi.common.table.timeline.HoodieTimeline;
+import org.apache.hudi.common.table.view.HoodieTableFileSystemView;
+import org.apache.hudi.common.table.view.TableFileSystemView;
+import org.apache.hudi.common.util.Option;
+import org.apache.hudi.exception.HoodieIOException;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileStatus;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.mapred.JobConf;
+import org.apache.hadoop.mapreduce.Job;
+import org.apache.log4j.LogManager;
+import org.apache.log4j.Logger;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.function.Function;
+import java.util.stream.Collectors;
+
+public class HoodieInputFormatUtils {
+
+  private static final Logger LOG = LogManager.getLogger(HoodieInputFormatUtils.class);
+
+  /**
+   * Filter any specific instants that we do not want to process.
+   * example timeline:
+   *
+   * t0 -> create bucket1.parquet
+   * t1 -> create and append updates bucket1.log
+   * t2 -> request compaction
+   * t3 -> create bucket2.parquet
+   *
+   * if compaction at t2 takes a long time, incremental readers on RO tables can move to t3 and would skip updates in t1
+   *
+   * To workaround this problem, we want to stop returning data belonging to commits > t2.
+   * After compaction is complete, incremental reader would see updates in t2, t3, so on.
+   * @param timeline
+   * @return
+   */
+  public static HoodieDefaultTimeline filterInstantsTimeline(HoodieDefaultTimeline timeline) {
+    HoodieDefaultTimeline commitsAndCompactionTimeline = timeline.getCommitsAndCompactionTimeline();
+    Option<HoodieInstant> pendingCompactionInstant = commitsAndCompactionTimeline
+        .filterPendingCompactionTimeline().firstInstant();
+    if (pendingCompactionInstant.isPresent()) {
+      HoodieDefaultTimeline instantsTimeline = commitsAndCompactionTimeline
+          .findInstantsBefore(pendingCompactionInstant.get().getTimestamp());
+      int numCommitsFilteredByCompaction = commitsAndCompactionTimeline.getCommitsTimeline().countInstants()
+          - instantsTimeline.getCommitsTimeline().countInstants();
+      LOG.info("Earliest pending compaction instant is: " + pendingCompactionInstant.get().getTimestamp()
+          + " skipping " + numCommitsFilteredByCompaction + " commits");
+
+      return instantsTimeline;
+    } else {
+      return timeline;
+    }
+  }
+
+  /**
+   * Extract partitions touched by the commitsToCheck.
+   * @param commitsToCheck
+   * @param tableMetaClient
+   * @param timeline
+   * @param inputPaths
+   * @return
+   * @throws IOException
+   */
+  public static Option<String> getAffectedPartitions(List<HoodieInstant> commitsToCheck,
+                                      HoodieTableMetaClient tableMetaClient,
+                                      HoodieTimeline timeline,
+                                      List<Path> inputPaths) throws IOException {
+    Set<String> partitionsToList = new HashSet<>();
+    for (HoodieInstant commit : commitsToCheck) {
+      HoodieCommitMetadata commitMetadata = HoodieCommitMetadata.fromBytes(timeline.getInstantDetails(commit).get(),
+          HoodieCommitMetadata.class);
+      partitionsToList.addAll(commitMetadata.getPartitionToWriteStats().keySet());
+    }
+    if (partitionsToList.isEmpty()) {
+      return Option.empty();
+    }
+    String incrementalInputPaths = partitionsToList.stream()
+        .map(s -> tableMetaClient.getBasePath() + Path.SEPARATOR + s)
+        .filter(s -> {
+          /*
+           * Ensure to return only results from the original input path that has incremental changes
+           * This check is needed for the following corner case -  When the caller invokes
+           * HoodieInputFormat.listStatus multiple times (with small batches of Hive partitions each
+           * time. Ex. Hive fetch task calls listStatus for every partition once) we do not want to
+           * accidentally return all incremental changes for the entire table in every listStatus()
+           * call. This will create redundant splits. Instead we only want to return the incremental
+           * changes (if so any) in that batch of input paths.
+           *
+           * NOTE on Hive queries that are executed using Fetch task:
+           * Since Fetch tasks invoke InputFormat.listStatus() per partition, Hoodie metadata can be
+           * listed in every such listStatus() call. In order to avoid this, it might be useful to
+           * disable fetch tasks using the hive session property for incremental queries:
+           * `set hive.fetch.task.conversion=none;`
+           * This would ensure Map Reduce execution is chosen for a Hive query, which combines
+           * partitions (comma separated) and calls InputFormat.listStatus() only once with all
+           * those partitions.
+           */
+          for (Path path : inputPaths) {
+            if (path.toString().contains(s)) {
+              return true;
+            }
+          }
+          return false;
+        })
+        .collect(Collectors.joining(","));
+    return Option.of(incrementalInputPaths);
+  }
+
+  /**
+   * Extract HoodieTimeline based on HoodieTableMetaClient.
+   * @param job
+   * @param tableMetaClient
+   * @return
+   */
+  public static Option<HoodieTimeline> getTimeline(Job job, HoodieTableMetaClient tableMetaClient) {
+    String tableName = tableMetaClient.getTableConfig().getTableName();
+    HoodieDefaultTimeline baseTimeline;
+    if (HoodieHiveUtils.stopAtCompaction(job, tableName)) {
+      baseTimeline = filterInstantsTimeline(tableMetaClient.getActiveTimeline());
+    } else {
+      baseTimeline = tableMetaClient.getActiveTimeline();
+    }
+    return Option.of(baseTimeline.getCommitsTimeline().filterCompletedInstants());
+  }
+
+  /**
+   * Get commits to check from Hive map reduce configuration.
+   * @param job
+   * @param tableName
+   * @param timeline
+   * @return
+   */
+  public static Option<List<HoodieInstant>> getCommitsToCheck(Job job, String tableName, HoodieTimeline timeline) {
+    String lastIncrementalTs = HoodieHiveUtils.readStartCommitTime(job, tableName);
+    // Total number of commits to return in this batch. Set this to -1 to get all the commits.
+    Integer maxCommits = HoodieHiveUtils.readMaxCommits(job, tableName);
+    LOG.info("Last Incremental timestamp was set as " + lastIncrementalTs);
+    return Option.of(timeline.findInstantsAfter(lastIncrementalTs, maxCommits)
+        .getInstants().collect(Collectors.toList()));
+  }
+
+  /**
+   * Generate a HoodieTableMetaClient for a given partition.
+   * @param conf
+   * @param partitions
+   * @return
+   */
+  public static Map<Path, HoodieTableMetaClient> getMetaClientPerPartition(Configuration conf, Set<Path> partitions) {
+    Map<String, HoodieTableMetaClient> metaClientMap = new HashMap<>();
+    return partitions.stream().collect(Collectors.toMap(Function.identity(), p -> {
+      // find if we have a metaclient already for this partition.

Review comment:
       this comment is wrong.

##########
File path: hudi-hadoop-mr/src/main/java/org/apache/hudi/hadoop/utils/HoodieRealtimeRecordReaderUtils.java
##########
@@ -0,0 +1,261 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hudi.hadoop.utils;
+
+import org.apache.hudi.avro.HoodieAvroUtils;
+import org.apache.hudi.common.util.collection.Pair;
+import org.apache.hudi.exception.HoodieException;
+import org.apache.hudi.exception.HoodieIOException;
+
+import org.apache.avro.Schema;
+import org.apache.avro.generic.GenericArray;
+import org.apache.avro.generic.GenericFixed;
+import org.apache.avro.generic.GenericRecord;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.hive.serde2.io.DoubleWritable;
+import org.apache.hadoop.io.ArrayWritable;
+import org.apache.hadoop.io.BooleanWritable;
+import org.apache.hadoop.io.BytesWritable;
+import org.apache.hadoop.io.FloatWritable;
+import org.apache.hadoop.io.IntWritable;
+import org.apache.hadoop.io.LongWritable;
+import org.apache.hadoop.io.Text;
+import org.apache.hadoop.io.Writable;
+import org.apache.parquet.hadoop.ParquetFileReader;
+import org.apache.parquet.schema.MessageType;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.LinkedHashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.TreeMap;
+import java.util.stream.Collectors;
+
+public class HoodieRealtimeRecordReaderUtils {
+
+  /**
+   * Reads the schema from the parquet file. This is different from ParquetUtils as it uses the twitter parquet to
+   * support hive 1.1.0
+   */
+  public static MessageType readSchema(Configuration conf, Path parquetFilePath) {

Review comment:
       we should really see if we can move this to hudi-common `ParquetUtils` 

##########
File path: hudi-hadoop-mr/src/main/java/org/apache/hudi/hadoop/utils/HoodieRealtimeInputFormatUtils.java
##########
@@ -0,0 +1,154 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hudi.hadoop.utils;
+
+import org.apache.hudi.common.fs.FSUtils;
+import org.apache.hudi.common.model.FileSlice;
+import org.apache.hudi.common.model.HoodieLogFile;
+import org.apache.hudi.common.table.HoodieTableMetaClient;
+import org.apache.hudi.common.table.timeline.HoodieInstant;
+import org.apache.hudi.common.table.timeline.HoodieTimeline;
+import org.apache.hudi.common.table.view.HoodieTableFileSystemView;
+import org.apache.hudi.common.util.CollectionUtils;
+import org.apache.hudi.common.util.Option;
+import org.apache.hudi.exception.HoodieException;
+import org.apache.hudi.exception.HoodieIOException;
+import org.apache.hudi.hadoop.realtime.HoodieRealtimeFileSplit;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileStatus;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.mapred.FileSplit;
+import org.apache.hadoop.mapred.InputSplit;
+import org.apache.log4j.LogManager;
+import org.apache.log4j.Logger;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.stream.Collectors;
+import java.util.stream.Stream;
+
+public class HoodieRealtimeInputFormatUtils extends HoodieInputFormatUtils {
+
+  private static final Logger LOG = LogManager.getLogger(HoodieRealtimeInputFormatUtils.class);
+
+  public static InputSplit[] getRealtimeSplits(Configuration conf, Stream<FileSplit> fileSplits) throws IOException {
+    Map<Path, List<FileSplit>> partitionsToParquetSplits =
+        fileSplits.collect(Collectors.groupingBy(split -> split.getPath().getParent()));
+    // TODO(vc): Should we handle also non-hoodie splits here?
+    Map<Path, HoodieTableMetaClient> partitionsToMetaClient = getMetaClientPerPartition(conf, partitionsToParquetSplits.keySet());
+
+    // for all unique split parents, obtain all delta files based on delta commit timeline,
+    // grouped on file id
+    List<HoodieRealtimeFileSplit> rtSplits = new ArrayList<>();
+    partitionsToParquetSplits.keySet().forEach(partitionPath -> {
+      // for each partition path obtain the data & log file groupings, then map back to inputsplits
+      HoodieTableMetaClient metaClient = partitionsToMetaClient.get(partitionPath);
+      HoodieTableFileSystemView fsView = new HoodieTableFileSystemView(metaClient, metaClient.getActiveTimeline());
+      String relPartitionPath = FSUtils.getRelativePartitionPath(new Path(metaClient.getBasePath()), partitionPath);
+
+      try {
+        // Both commit and delta-commits are included - pick the latest completed one
+        Option<HoodieInstant> latestCompletedInstant =
+            metaClient.getActiveTimeline().getCommitsTimeline().filterCompletedInstants().lastInstant();
+
+        Stream<FileSlice> latestFileSlices = latestCompletedInstant
+            .map(instant -> fsView.getLatestMergedFileSlicesBeforeOrOn(relPartitionPath, instant.getTimestamp()))
+            .orElse(Stream.empty());
+
+        // subgroup splits again by file id & match with log files.
+        Map<String, List<FileSplit>> groupedInputSplits = partitionsToParquetSplits.get(partitionPath).stream()
+            .collect(Collectors.groupingBy(split -> FSUtils.getFileId(split.getPath().getName())));
+        latestFileSlices.forEach(fileSlice -> {
+          List<FileSplit> dataFileSplits = groupedInputSplits.get(fileSlice.getFileId());
+          dataFileSplits.forEach(split -> {
+            try {
+              List<String> logFilePaths = fileSlice.getLogFiles().sorted(HoodieLogFile.getLogFileComparator())
+                  .map(logFile -> logFile.getPath().toString()).collect(Collectors.toList());
+              // Get the maxCommit from the last delta or compaction or commit - when
+              // bootstrapped from COW table
+              String maxCommitTime = metaClient
+                  .getActiveTimeline().getTimelineOfActions(CollectionUtils.createSet(HoodieTimeline.COMMIT_ACTION,
+                      HoodieTimeline.ROLLBACK_ACTION, HoodieTimeline.DELTA_COMMIT_ACTION))
+                  .filterCompletedInstants().lastInstant().get().getTimestamp();
+              rtSplits.add(new HoodieRealtimeFileSplit(split, metaClient.getBasePath(), logFilePaths, maxCommitTime));
+            } catch (IOException e) {
+              throw new HoodieIOException("Error creating hoodie real time split ", e);
+            }
+          });
+        });
+      } catch (Exception e) {
+        throw new HoodieException("Error obtaining data file/log file grouping: " + partitionPath, e);
+      }
+    });
+    LOG.info("Returning a total splits of " + rtSplits.size());
+    return rtSplits.toArray(new InputSplit[0]);
+  }
+
+  // Return parquet file with a list of log files in the same file group.
+  public static Map<String, List<String>> getRealtimeFileGroup(Configuration conf, Stream<FileStatus> fileStatuses) {

Review comment:
       rename: groupLogsByBaseFile()




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org