You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hudi.apache.org by vi...@apache.org on 2020/06/09 13:10:23 UTC

[hudi] branch master updated: [HUDI-822] decouple Hudi related logics from HoodieInputFormat (#1592)

This is an automated email from the ASF dual-hosted git repository.

vinoth pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/hudi.git


The following commit(s) were added to refs/heads/master by this push:
     new 37838ce  [HUDI-822] decouple Hudi related logics from HoodieInputFormat (#1592)
37838ce is described below

commit 37838cea6094ddc66191df42e8b2c84f132d1623
Author: Gary Li <ya...@gmail.com>
AuthorDate: Tue Jun 9 06:10:16 2020 -0700

    [HUDI-822] decouple Hudi related logics from HoodieInputFormat (#1592)
    
    - Refactoring business logic out of InputFormat into Utils helpers.
---
 .../hudi/table/TestHoodieMergeOnReadTable.java     |  12 +-
 .../commit/TestCopyOnWriteActionExecutor.java      |  10 +-
 .../hudi/hadoop/HoodieParquetInputFormat.java      | 236 ++--------------
 .../hudi/hadoop/HoodieROTablePathFilter.java       |   3 +-
 .../org/apache/hudi/hadoop/InputPathHandler.java   |   4 +-
 .../hudi/hadoop/config/HoodieRealtimeConfig.java   |  42 +++
 .../realtime/AbstractRealtimeRecordReader.java     | 271 +------------------
 .../HoodieCombineRealtimeRecordReader.java         |   3 +-
 .../realtime/HoodieParquetRealtimeInputFormat.java | 106 +-------
 .../realtime/RealtimeCompactedRecordReader.java    |  35 ++-
 .../realtime/RealtimeUnmergedRecordReader.java     |   8 +-
 .../HoodieHiveUtils.java}                          |   6 +-
 .../HoodieInputFormatUtils.java}                   | 300 ++++++++++-----------
 .../utils/HoodieRealtimeInputFormatUtils.java      | 154 +++++++++++
 .../HoodieRealtimeRecordReaderUtils.java}          | 232 +++-------------
 .../hudi/hadoop/TestHoodieParquetInputFormat.java  |  27 +-
 .../realtime/TestHoodieRealtimeRecordReader.java   |   3 +-
 .../hudi/hadoop/testutils/InputFormatTestUtil.java |  10 +-
 18 files changed, 493 insertions(+), 969 deletions(-)

diff --git a/hudi-client/src/test/java/org/apache/hudi/table/TestHoodieMergeOnReadTable.java b/hudi-client/src/test/java/org/apache/hudi/table/TestHoodieMergeOnReadTable.java
index 57c0d8d..53de65e 100644
--- a/hudi-client/src/test/java/org/apache/hudi/table/TestHoodieMergeOnReadTable.java
+++ b/hudi-client/src/test/java/org/apache/hudi/table/TestHoodieMergeOnReadTable.java
@@ -46,7 +46,7 @@ import org.apache.hudi.config.HoodieCompactionConfig;
 import org.apache.hudi.config.HoodieIndexConfig;
 import org.apache.hudi.config.HoodieStorageConfig;
 import org.apache.hudi.config.HoodieWriteConfig;
-import org.apache.hudi.hadoop.HoodieHiveUtil;
+import org.apache.hudi.hadoop.utils.HoodieHiveUtils;
 import org.apache.hudi.hadoop.HoodieParquetInputFormat;
 import org.apache.hudi.hadoop.realtime.HoodieParquetRealtimeInputFormat;
 import org.apache.hudi.index.HoodieIndex;
@@ -1463,19 +1463,19 @@ public class TestHoodieMergeOnReadTable extends HoodieClientTestHarness {
 
   private void setupIncremental(JobConf jobConf, String startCommit, int numberOfCommitsToPull, boolean stopAtCompaction) {
     String modePropertyName =
-            String.format(HoodieHiveUtil.HOODIE_CONSUME_MODE_PATTERN, HoodieTestUtils.RAW_TRIPS_TEST_NAME);
-    jobConf.set(modePropertyName, HoodieHiveUtil.INCREMENTAL_SCAN_MODE);
+            String.format(HoodieHiveUtils.HOODIE_CONSUME_MODE_PATTERN, HoodieTestUtils.RAW_TRIPS_TEST_NAME);
+    jobConf.set(modePropertyName, HoodieHiveUtils.INCREMENTAL_SCAN_MODE);
 
     String startCommitTimestampName =
-            String.format(HoodieHiveUtil.HOODIE_START_COMMIT_PATTERN, HoodieTestUtils.RAW_TRIPS_TEST_NAME);
+            String.format(HoodieHiveUtils.HOODIE_START_COMMIT_PATTERN, HoodieTestUtils.RAW_TRIPS_TEST_NAME);
     jobConf.set(startCommitTimestampName, startCommit);
 
     String maxCommitPulls =
-        String.format(HoodieHiveUtil.HOODIE_MAX_COMMIT_PATTERN, HoodieTestUtils.RAW_TRIPS_TEST_NAME);
+        String.format(HoodieHiveUtils.HOODIE_MAX_COMMIT_PATTERN, HoodieTestUtils.RAW_TRIPS_TEST_NAME);
     jobConf.setInt(maxCommitPulls, numberOfCommitsToPull);
 
     String stopAtCompactionPropName =
-        String.format(HoodieHiveUtil.HOODIE_STOP_AT_COMPACTION_PATTERN, HoodieTestUtils.RAW_TRIPS_TEST_NAME);
+        String.format(HoodieHiveUtils.HOODIE_STOP_AT_COMPACTION_PATTERN, HoodieTestUtils.RAW_TRIPS_TEST_NAME);
     jobConf.setBoolean(stopAtCompactionPropName, stopAtCompaction);
   }
 
diff --git a/hudi-client/src/test/java/org/apache/hudi/table/action/commit/TestCopyOnWriteActionExecutor.java b/hudi-client/src/test/java/org/apache/hudi/table/action/commit/TestCopyOnWriteActionExecutor.java
index 63a04ec..c95a917 100644
--- a/hudi-client/src/test/java/org/apache/hudi/table/action/commit/TestCopyOnWriteActionExecutor.java
+++ b/hudi-client/src/test/java/org/apache/hudi/table/action/commit/TestCopyOnWriteActionExecutor.java
@@ -32,7 +32,7 @@ import org.apache.hudi.common.util.ParquetUtils;
 import org.apache.hudi.common.util.collection.Pair;
 import org.apache.hudi.config.HoodieStorageConfig;
 import org.apache.hudi.config.HoodieWriteConfig;
-import org.apache.hudi.hadoop.HoodieHiveUtil;
+import org.apache.hudi.hadoop.utils.HoodieHiveUtils;
 import org.apache.hudi.hadoop.HoodieParquetInputFormat;
 import org.apache.hudi.io.HoodieCreateHandle;
 import org.apache.hudi.table.HoodieCopyOnWriteTable;
@@ -225,15 +225,15 @@ public class TestCopyOnWriteActionExecutor extends HoodieClientTestBase {
 
   private void setupIncremental(JobConf jobConf, String startCommit, int numberOfCommitsToPull) {
     String modePropertyName =
-            String.format(HoodieHiveUtil.HOODIE_CONSUME_MODE_PATTERN, HoodieTestUtils.RAW_TRIPS_TEST_NAME);
-    jobConf.set(modePropertyName, HoodieHiveUtil.INCREMENTAL_SCAN_MODE);
+            String.format(HoodieHiveUtils.HOODIE_CONSUME_MODE_PATTERN, HoodieTestUtils.RAW_TRIPS_TEST_NAME);
+    jobConf.set(modePropertyName, HoodieHiveUtils.INCREMENTAL_SCAN_MODE);
 
     String startCommitTimestampName =
-            String.format(HoodieHiveUtil.HOODIE_START_COMMIT_PATTERN, HoodieTestUtils.RAW_TRIPS_TEST_NAME);
+            String.format(HoodieHiveUtils.HOODIE_START_COMMIT_PATTERN, HoodieTestUtils.RAW_TRIPS_TEST_NAME);
     jobConf.set(startCommitTimestampName, startCommit);
 
     String maxCommitPulls =
-            String.format(HoodieHiveUtil.HOODIE_MAX_COMMIT_PATTERN, HoodieTestUtils.RAW_TRIPS_TEST_NAME);
+            String.format(HoodieHiveUtils.HOODIE_MAX_COMMIT_PATTERN, HoodieTestUtils.RAW_TRIPS_TEST_NAME);
     jobConf.setInt(maxCommitPulls, numberOfCommitsToPull);
   }
 
diff --git a/hudi-hadoop-mr/src/main/java/org/apache/hudi/hadoop/HoodieParquetInputFormat.java b/hudi-hadoop-mr/src/main/java/org/apache/hudi/hadoop/HoodieParquetInputFormat.java
index 5a10f3c..4db928c 100644
--- a/hudi-hadoop-mr/src/main/java/org/apache/hudi/hadoop/HoodieParquetInputFormat.java
+++ b/hudi-hadoop-mr/src/main/java/org/apache/hudi/hadoop/HoodieParquetInputFormat.java
@@ -18,10 +18,17 @@
 
 package org.apache.hudi.hadoop;
 
+import org.apache.hudi.common.table.HoodieTableMetaClient;
+import org.apache.hudi.common.table.timeline.HoodieDefaultTimeline;
+import org.apache.hudi.common.table.timeline.HoodieInstant;
+import org.apache.hudi.common.table.timeline.HoodieTimeline;
+import org.apache.hudi.common.util.Option;
+import org.apache.hudi.hadoop.utils.HoodieHiveUtils;
+import org.apache.hudi.hadoop.utils.HoodieInputFormatUtils;
+
 import org.apache.hadoop.conf.Configurable;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.fs.FileStatus;
-import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.hive.ql.io.parquet.MapredParquetInputFormat;
 import org.apache.hadoop.io.ArrayWritable;
@@ -31,31 +38,14 @@ import org.apache.hadoop.mapred.JobConf;
 import org.apache.hadoop.mapred.RecordReader;
 import org.apache.hadoop.mapred.Reporter;
 import org.apache.hadoop.mapreduce.Job;
-import org.apache.hudi.common.model.HoodieBaseFile;
-import org.apache.hudi.common.model.HoodieCommitMetadata;
-import org.apache.hudi.common.model.HoodiePartitionMetadata;
-import org.apache.hudi.common.table.HoodieTableMetaClient;
-import org.apache.hudi.common.table.timeline.HoodieDefaultTimeline;
-import org.apache.hudi.common.table.timeline.HoodieInstant;
-import org.apache.hudi.common.table.timeline.HoodieTimeline;
-import org.apache.hudi.common.table.view.HoodieTableFileSystemView;
-import org.apache.hudi.common.table.view.TableFileSystemView.BaseFileOnlyView;
-import org.apache.hudi.common.util.Option;
-import org.apache.hudi.common.util.StringUtils;
-import org.apache.hudi.exception.HoodieIOException;
 import org.apache.log4j.LogManager;
 import org.apache.log4j.Logger;
 
 import java.io.IOException;
 import java.util.ArrayList;
 import java.util.Arrays;
-import java.util.Collection;
-import java.util.HashMap;
-import java.util.HashSet;
 import java.util.List;
 import java.util.Map;
-import java.util.Set;
-import java.util.stream.Collectors;
 
 /**
  * HoodieInputFormat which understands the Hoodie File Structure and filters files based on the Hoodie Mode. If paths
@@ -69,10 +59,14 @@ public class HoodieParquetInputFormat extends MapredParquetInputFormat implement
 
   protected Configuration conf;
 
+  protected HoodieDefaultTimeline filterInstantsTimeline(HoodieDefaultTimeline timeline) {
+    return HoodieInputFormatUtils.filterInstantsTimeline(timeline);
+  }
+
   @Override
   public FileStatus[] listStatus(JobConf job) throws IOException {
     // Segregate inputPaths[] to incremental, snapshot and non hoodie paths
-    List<String> incrementalTables = HoodieHiveUtil.getIncrementalTableNames(Job.getInstance(job));
+    List<String> incrementalTables = HoodieHiveUtils.getIncrementalTableNames(Job.getInstance(job));
     InputPathHandler inputPathHandler = new InputPathHandler(conf, getInputPaths(job), incrementalTables);
     List<FileStatus> returns = new ArrayList<>();
 
@@ -107,10 +101,10 @@ public class HoodieParquetInputFormat extends MapredParquetInputFormat implement
       setInputPaths(job, snapshotPaths.toArray(new Path[snapshotPaths.size()]));
       FileStatus[] fileStatuses = super.listStatus(job);
       Map<HoodieTableMetaClient, List<FileStatus>> groupedFileStatus =
-          groupFileStatusForSnapshotPaths(fileStatuses, tableMetaClientMap.values());
+          HoodieInputFormatUtils.groupFileStatusForSnapshotPaths(fileStatuses, tableMetaClientMap.values());
       LOG.info("Found a total of " + groupedFileStatus.size() + " groups");
       for (Map.Entry<HoodieTableMetaClient, List<FileStatus>> entry : groupedFileStatus.entrySet()) {
-        List<FileStatus> result = filterFileStatusForSnapshotMode(entry.getKey(), entry.getValue());
+        List<FileStatus> result = HoodieInputFormatUtils.filterFileStatusForSnapshotMode(job, entry.getKey(), entry.getValue());
         if (result != null) {
           returns.addAll(result);
         }
@@ -120,36 +114,6 @@ public class HoodieParquetInputFormat extends MapredParquetInputFormat implement
   }
 
   /**
-   * Filter any specific instants that we do not want to process.
-   * example timeline:
-   *
-   * t0 -> create bucket1.parquet
-   * t1 -> create and append updates bucket1.log
-   * t2 -> request compaction
-   * t3 -> create bucket2.parquet
-   *
-   * if compaction at t2 takes a long time, incremental readers on RO tables can move to t3 and would skip updates in t1
-   *
-   * To workaround this problem, we want to stop returning data belonging to commits > t2.
-   * After compaction is complete, incremental reader would see updates in t2, t3, so on.
-   */
-  protected HoodieDefaultTimeline filterInstantsTimeline(HoodieDefaultTimeline timeline) {
-    HoodieDefaultTimeline commitsAndCompactionTimeline = timeline.getCommitsAndCompactionTimeline();
-    Option<HoodieInstant> pendingCompactionInstant = commitsAndCompactionTimeline.filterPendingCompactionTimeline().firstInstant();
-    if (pendingCompactionInstant.isPresent()) {
-      HoodieDefaultTimeline instantsTimeline = commitsAndCompactionTimeline.findInstantsBefore(pendingCompactionInstant.get().getTimestamp());
-      int numCommitsFilteredByCompaction = commitsAndCompactionTimeline.getCommitsTimeline().countInstants()
-          - instantsTimeline.getCommitsTimeline().countInstants();
-      LOG.info("Earliest pending compaction instant is: " + pendingCompactionInstant.get().getTimestamp()
-              + " skipping " + numCommitsFilteredByCompaction + " commits");
-
-      return instantsTimeline;
-    } else {
-      return timeline;
-    }
-  }
-
-  /**
    * Achieves listStatus functionality for an incrementally queried table. Instead of listing all
    * partitions and then filtering based on the commits of interest, this logic first extracts the
    * partitions touched by the desired commits and then lists only those partitions.
@@ -158,157 +122,22 @@ public class HoodieParquetInputFormat extends MapredParquetInputFormat implement
       JobConf job, HoodieTableMetaClient tableMetaClient, List<Path> inputPaths) throws IOException {
     String tableName = tableMetaClient.getTableConfig().getTableName();
     Job jobContext = Job.getInstance(job);
-    HoodieDefaultTimeline baseTimeline;
-    if (HoodieHiveUtil.stopAtCompaction(jobContext, tableName)) {
-      baseTimeline = filterInstantsTimeline(tableMetaClient.getActiveTimeline());
-    } else {
-      baseTimeline = tableMetaClient.getActiveTimeline();
-    }
-
-    HoodieTimeline timeline = baseTimeline.getCommitsTimeline().filterCompletedInstants();
-    String lastIncrementalTs = HoodieHiveUtil.readStartCommitTime(jobContext, tableName);
-    // Total number of commits to return in this batch. Set this to -1 to get all the commits.
-    Integer maxCommits = HoodieHiveUtil.readMaxCommits(jobContext, tableName);
-    LOG.info("Last Incremental timestamp was set as " + lastIncrementalTs);
-    List<HoodieInstant> commitsToCheck = timeline.findInstantsAfter(lastIncrementalTs, maxCommits)
-        .getInstants().collect(Collectors.toList());
-    // Extract partitions touched by the commitsToCheck
-    Set<String> partitionsToList = new HashSet<>();
-    for (HoodieInstant commit : commitsToCheck) {
-      HoodieCommitMetadata commitMetadata = HoodieCommitMetadata.fromBytes(timeline.getInstantDetails(commit).get(),
-          HoodieCommitMetadata.class);
-      partitionsToList.addAll(commitMetadata.getPartitionToWriteStats().keySet());
-    }
-    if (partitionsToList.isEmpty()) {
+    Option<HoodieTimeline> timeline = HoodieInputFormatUtils.getFilteredCommitsTimeline(jobContext, tableMetaClient);
+    if (!timeline.isPresent()) {
       return null;
     }
-    String incrementalInputPaths = partitionsToList.stream()
-        .map(s -> StringUtils.isNullOrEmpty(s) ? tableMetaClient.getBasePath() : tableMetaClient.getBasePath() + Path.SEPARATOR + s)
-        .filter(s -> {
-          /*
-           * Ensure to return only results from the original input path that has incremental changes
-           * This check is needed for the following corner case -  When the caller invokes
-           * HoodieInputFormat.listStatus multiple times (with small batches of Hive partitions each
-           * time. Ex. Hive fetch task calls listStatus for every partition once) we do not want to
-           * accidentally return all incremental changes for the entire table in every listStatus()
-           * call. This will create redundant splits. Instead we only want to return the incremental
-           * changes (if so any) in that batch of input paths.
-           *
-           * NOTE on Hive queries that are executed using Fetch task:
-           * Since Fetch tasks invoke InputFormat.listStatus() per partition, Hoodie metadata can be
-           * listed in every such listStatus() call. In order to avoid this, it might be useful to
-           * disable fetch tasks using the hive session property for incremental queries:
-           * `set hive.fetch.task.conversion=none;`
-           * This would ensure Map Reduce execution is chosen for a Hive query, which combines
-           * partitions (comma separated) and calls InputFormat.listStatus() only once with all
-           * those partitions.
-           */
-          for (Path path : inputPaths) {
-            if (path.toString().contains(s)) {
-              return true;
-            }
-          }
-          return false;
-        })
-        .collect(Collectors.joining(","));
-    if (StringUtils.isNullOrEmpty(incrementalInputPaths)) {
+    Option<List<HoodieInstant>> commitsToCheck = HoodieInputFormatUtils.getCommitsForIncrementalQuery(jobContext, tableName, timeline.get());
+    if (!commitsToCheck.isPresent()) {
       return null;
     }
+    Option<String> incrementalInputPaths = HoodieInputFormatUtils.getAffectedPartitions(commitsToCheck.get(), tableMetaClient, timeline.get(), inputPaths);
     // Mutate the JobConf to set the input paths to only partitions touched by incremental pull.
-    setInputPaths(job, incrementalInputPaths);
-    FileStatus[] fileStatuses = super.listStatus(job);
-    BaseFileOnlyView roView = new HoodieTableFileSystemView(tableMetaClient, timeline, fileStatuses);
-    List<String> commitsList = commitsToCheck.stream().map(HoodieInstant::getTimestamp).collect(Collectors.toList());
-    List<HoodieBaseFile> filteredFiles = roView.getLatestBaseFilesInRange(commitsList).collect(Collectors.toList());
-    List<FileStatus> returns = new ArrayList<>();
-    for (HoodieBaseFile filteredFile : filteredFiles) {
-      LOG.debug("Processing incremental hoodie file - " + filteredFile.getPath());
-      filteredFile = checkFileStatus(filteredFile);
-      returns.add(filteredFile.getFileStatus());
-    }
-    LOG.info("Total paths to process after hoodie incremental filter " + filteredFiles.size());
-    return returns;
-  }
-
-  /**
-   * Takes in a list of filesStatus and a list of table metadatas. Groups the files status list
-   * based on given table metadata.
-   * @param fileStatuses
-   * @param metaClientList
-   * @return
-   * @throws IOException
-   */
-  private Map<HoodieTableMetaClient, List<FileStatus>> groupFileStatusForSnapshotPaths(
-      FileStatus[] fileStatuses, Collection<HoodieTableMetaClient> metaClientList) {
-    // This assumes the paths for different tables are grouped together
-    Map<HoodieTableMetaClient, List<FileStatus>> grouped = new HashMap<>();
-    HoodieTableMetaClient metadata = null;
-    for (FileStatus status : fileStatuses) {
-      Path inputPath = status.getPath();
-      if (!inputPath.getName().endsWith(".parquet")) {
-        //FIXME(vc): skip non parquet files for now. This wont be needed once log file name start
-        // with "."
-        continue;
-      }
-      if ((metadata == null) || (!inputPath.toString().contains(metadata.getBasePath()))) {
-        for (HoodieTableMetaClient metaClient : metaClientList) {
-          if (inputPath.toString().contains(metaClient.getBasePath())) {
-            metadata = metaClient;
-            if (!grouped.containsKey(metadata)) {
-              grouped.put(metadata, new ArrayList<>());
-            }
-            break;
-          }
-        }
-      }
-      grouped.get(metadata).add(status);
-    }
-    return grouped;
-  }
-
-  /**
-   * Filters data files for a snapshot queried table.
-   */
-  private List<FileStatus> filterFileStatusForSnapshotMode(
-      HoodieTableMetaClient metadata, List<FileStatus> fileStatuses) {
-    FileStatus[] statuses = fileStatuses.toArray(new FileStatus[0]);
-    if (LOG.isDebugEnabled()) {
-      LOG.debug("Hoodie Metadata initialized with completed commit Ts as :" + metadata);
-    }
-    // Get all commits, delta commits, compactions, as all of them produce a base parquet file today
-    HoodieTimeline timeline = metadata.getActiveTimeline().getCommitsTimeline().filterCompletedInstants();
-    BaseFileOnlyView roView = new HoodieTableFileSystemView(metadata, timeline, statuses);
-    // filter files on the latest commit found
-    List<HoodieBaseFile> filteredFiles = roView.getLatestBaseFiles().collect(Collectors.toList());
-    LOG.info("Total paths to process after hoodie filter " + filteredFiles.size());
-    List<FileStatus> returns = new ArrayList<>();
-    for (HoodieBaseFile filteredFile : filteredFiles) {
-      if (LOG.isDebugEnabled()) {
-        LOG.debug("Processing latest hoodie file - " + filteredFile.getPath());
-      }
-      filteredFile = checkFileStatus(filteredFile);
-      returns.add(filteredFile.getFileStatus());
-    }
-    return returns;
-  }
-
-  /**
-   * Checks the file status for a race condition which can set the file size to 0. 1. HiveInputFormat does
-   * super.listStatus() and gets back a FileStatus[] 2. Then it creates the HoodieTableMetaClient for the paths listed.
-   * 3. Generation of splits looks at FileStatus size to create splits, which skips this file
-   */
-  private HoodieBaseFile checkFileStatus(HoodieBaseFile dataFile) {
-    Path dataPath = dataFile.getFileStatus().getPath();
-    try {
-      if (dataFile.getFileSize() == 0) {
-        FileSystem fs = dataPath.getFileSystem(conf);
-        LOG.info("Refreshing file status " + dataFile.getPath());
-        return new HoodieBaseFile(fs.getFileStatus(dataPath));
-      }
-      return dataFile;
-    } catch (IOException e) {
-      throw new HoodieIOException("Could not get FileStatus on path " + dataPath);
+    if (!incrementalInputPaths.isPresent()) {
+      return null;
     }
+    setInputPaths(job, incrementalInputPaths.get());
+    FileStatus[] fileStatuses = super.listStatus(job);
+    return HoodieInputFormatUtils.filterIncrementalFileStatus(jobContext, tableMetaClient, timeline.get(), fileStatuses, commitsToCheck.get());
   }
 
   public void setConf(Configuration conf) {
@@ -338,19 +167,4 @@ public class HoodieParquetInputFormat extends MapredParquetInputFormat implement
     return super.getRecordReader(split, job, reporter);
   }
 
-  /**
-   * Read the table metadata from a data path. This assumes certain hierarchy of files which should be changed once a
-   * better way is figured out to pass in the hoodie meta directory
-   */
-  protected static HoodieTableMetaClient getTableMetaClient(FileSystem fs, Path dataPath) throws IOException {
-    int levels = HoodieHiveUtil.DEFAULT_LEVELS_TO_BASEPATH;
-    if (HoodiePartitionMetadata.hasPartitionMetadata(fs, dataPath)) {
-      HoodiePartitionMetadata metadata = new HoodiePartitionMetadata(fs, dataPath);
-      metadata.readFromFS();
-      levels = metadata.getPartitionDepth();
-    }
-    Path baseDir = HoodieHiveUtil.getNthParent(dataPath, levels);
-    LOG.info("Reading hoodie metadata from path " + baseDir.toString());
-    return new HoodieTableMetaClient(fs.getConf(), baseDir.toString());
-  }
 }
diff --git a/hudi-hadoop-mr/src/main/java/org/apache/hudi/hadoop/HoodieROTablePathFilter.java b/hudi-hadoop-mr/src/main/java/org/apache/hudi/hadoop/HoodieROTablePathFilter.java
index 4058875..d27d6ad 100644
--- a/hudi-hadoop-mr/src/main/java/org/apache/hudi/hadoop/HoodieROTablePathFilter.java
+++ b/hudi-hadoop-mr/src/main/java/org/apache/hudi/hadoop/HoodieROTablePathFilter.java
@@ -25,6 +25,7 @@ import org.apache.hudi.common.table.HoodieTableMetaClient;
 import org.apache.hudi.common.table.view.HoodieTableFileSystemView;
 import org.apache.hudi.exception.HoodieException;
 import org.apache.hudi.exception.TableNotFoundException;
+import org.apache.hudi.hadoop.utils.HoodieHiveUtils;
 
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.fs.FileSystem;
@@ -140,7 +141,7 @@ public class HoodieROTablePathFilter implements PathFilter, Serializable {
       if (HoodiePartitionMetadata.hasPartitionMetadata(fs, folder)) {
         HoodiePartitionMetadata metadata = new HoodiePartitionMetadata(fs, folder);
         metadata.readFromFS();
-        baseDir = HoodieHiveUtil.getNthParent(folder, metadata.getPartitionDepth());
+        baseDir = HoodieHiveUtils.getNthParent(folder, metadata.getPartitionDepth());
       } else {
         baseDir = safeGetParentsParent(folder);
       }
diff --git a/hudi-hadoop-mr/src/main/java/org/apache/hudi/hadoop/InputPathHandler.java b/hudi-hadoop-mr/src/main/java/org/apache/hudi/hadoop/InputPathHandler.java
index e4a57aa..1ad3812 100644
--- a/hudi-hadoop-mr/src/main/java/org/apache/hudi/hadoop/InputPathHandler.java
+++ b/hudi-hadoop-mr/src/main/java/org/apache/hudi/hadoop/InputPathHandler.java
@@ -33,7 +33,7 @@ import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
 
-import static org.apache.hudi.hadoop.HoodieParquetInputFormat.getTableMetaClient;
+import static org.apache.hudi.hadoop.utils.HoodieInputFormatUtils.getTableMetaClientForBasePath;
 
 /**
  * InputPathHandler takes in a set of input paths and incremental tables list. Then, classifies the
@@ -97,7 +97,7 @@ public class InputPathHandler {
         // This path is for a table that we dont know about yet.
         HoodieTableMetaClient metaClient;
         try {
-          metaClient = getTableMetaClient(inputPath.getFileSystem(conf), inputPath);
+          metaClient = getTableMetaClientForBasePath(inputPath.getFileSystem(conf), inputPath);
           String tableName = metaClient.getTableConfig().getTableName();
           tableMetaClientMap.put(tableName, metaClient);
           tagAsIncrementalOrSnapshot(inputPath, tableName, metaClient, incrementalTables);
diff --git a/hudi-hadoop-mr/src/main/java/org/apache/hudi/hadoop/config/HoodieRealtimeConfig.java b/hudi-hadoop-mr/src/main/java/org/apache/hudi/hadoop/config/HoodieRealtimeConfig.java
new file mode 100644
index 0000000..95945f3
--- /dev/null
+++ b/hudi-hadoop-mr/src/main/java/org/apache/hudi/hadoop/config/HoodieRealtimeConfig.java
@@ -0,0 +1,42 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hudi.hadoop.config;
+
+/**
+ * Class to hold props related to Hoodie RealtimeInputFormat and RealtimeRecordReader.
+ */
+public final class HoodieRealtimeConfig {
+
+  // Fraction of mapper/reducer task memory used for compaction of log files
+  public static final String COMPACTION_MEMORY_FRACTION_PROP = "compaction.memory.fraction";
+  public static final String DEFAULT_COMPACTION_MEMORY_FRACTION = "0.75";
+  // used to choose a trade off between IO vs Memory when performing compaction process
+  // Depending on outputfile size and memory provided, choose true to avoid OOM for large file
+  // size + small memory
+  public static final String COMPACTION_LAZY_BLOCK_READ_ENABLED_PROP = "compaction.lazy.block.read.enabled";
+  public static final String DEFAULT_COMPACTION_LAZY_BLOCK_READ_ENABLED = "true";
+
+  // Property to set the max memory for dfs inputstream buffer size
+  public static final String MAX_DFS_STREAM_BUFFER_SIZE_PROP = "hoodie.memory.dfs.buffer.max.size";
+  // Setting this to lower value of 1 MB since no control over how many RecordReaders will be started in a mapper
+  public static final int DEFAULT_MAX_DFS_STREAM_BUFFER_SIZE = 1024 * 1024; // 1 MB
+  // Property to set file path prefix for spillable file
+  public static final String SPILLABLE_MAP_BASE_PATH_PROP = "hoodie.memory.spillable.map.path";
+  // Default file path prefix for spillable file
+  public static final String DEFAULT_SPILLABLE_MAP_BASE_PATH = "/tmp/";
+}
diff --git a/hudi-hadoop-mr/src/main/java/org/apache/hudi/hadoop/realtime/AbstractRealtimeRecordReader.java b/hudi-hadoop-mr/src/main/java/org/apache/hudi/hadoop/realtime/AbstractRealtimeRecordReader.java
index 5f675bb..b7da749 100644
--- a/hudi-hadoop-mr/src/main/java/org/apache/hudi/hadoop/realtime/AbstractRealtimeRecordReader.java
+++ b/hudi-hadoop-mr/src/main/java/org/apache/hudi/hadoop/realtime/AbstractRealtimeRecordReader.java
@@ -18,74 +18,34 @@
 
 package org.apache.hudi.hadoop.realtime;
 
-import org.apache.hudi.avro.HoodieAvroUtils;
 import org.apache.hudi.common.model.HoodieAvroPayload;
 import org.apache.hudi.common.table.HoodieTableMetaClient;
 import org.apache.hudi.common.table.log.LogReaderUtils;
-import org.apache.hudi.common.util.collection.Pair;
-import org.apache.hudi.exception.HoodieException;
 import org.apache.hudi.exception.HoodieIOException;
+import org.apache.hudi.hadoop.config.HoodieRealtimeConfig;
+import org.apache.hudi.hadoop.utils.HoodieRealtimeRecordReaderUtils;
 
-import org.apache.avro.LogicalTypes;
 import org.apache.avro.Schema;
 import org.apache.avro.Schema.Field;
-import org.apache.avro.generic.GenericArray;
-import org.apache.avro.generic.GenericFixed;
-import org.apache.avro.generic.GenericRecord;
-import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.hive.metastore.api.hive_metastoreConstants;
 import org.apache.hadoop.hive.serde2.ColumnProjectionUtils;
-import org.apache.hadoop.hive.serde2.io.DoubleWritable;
-import org.apache.hadoop.hive.serde2.io.HiveDecimalWritable;
-import org.apache.hadoop.io.ArrayWritable;
-import org.apache.hadoop.io.BooleanWritable;
-import org.apache.hadoop.io.BytesWritable;
-import org.apache.hadoop.io.FloatWritable;
-import org.apache.hadoop.io.IntWritable;
-import org.apache.hadoop.io.LongWritable;
-import org.apache.hadoop.io.Text;
-import org.apache.hadoop.io.Writable;
 import org.apache.hadoop.mapred.JobConf;
 import org.apache.log4j.LogManager;
 import org.apache.log4j.Logger;
 import org.apache.parquet.avro.AvroSchemaConverter;
-import org.apache.parquet.hadoop.ParquetFileReader;
 import org.apache.parquet.schema.MessageType;
 
 import java.io.IOException;
 import java.util.ArrayList;
 import java.util.Arrays;
-import java.util.LinkedHashSet;
 import java.util.List;
 import java.util.Map;
-import java.util.Set;
-import java.util.TreeMap;
 import java.util.stream.Collectors;
 
 /**
  * Record Reader implementation to merge fresh avro data with base parquet data, to support real time queries.
  */
 public abstract class AbstractRealtimeRecordReader {
-
-  // Fraction of mapper/reducer task memory used for compaction of log files
-  public static final String COMPACTION_MEMORY_FRACTION_PROP = "compaction.memory.fraction";
-  public static final String DEFAULT_COMPACTION_MEMORY_FRACTION = "0.75";
-  // used to choose a trade off between IO vs Memory when performing compaction process
-  // Depending on outputfile size and memory provided, choose true to avoid OOM for large file
-  // size + small memory
-  public static final String COMPACTION_LAZY_BLOCK_READ_ENABLED_PROP = "compaction.lazy.block.read.enabled";
-  public static final String DEFAULT_COMPACTION_LAZY_BLOCK_READ_ENABLED = "true";
-
-  // Property to set the max memory for dfs inputstream buffer size
-  public static final String MAX_DFS_STREAM_BUFFER_SIZE_PROP = "hoodie.memory.dfs.buffer.max.size";
-  // Setting this to lower value of 1 MB since no control over how many RecordReaders will be started in a mapper
-  public static final int DEFAULT_MAX_DFS_STREAM_BUFFER_SIZE = 1024 * 1024; // 1 MB
-  // Property to set file path prefix for spillable file
-  public static final String SPILLABLE_MAP_BASE_PATH_PROP = "hoodie.memory.spillable.map.path";
-  // Default file path prefix for spillable file
-  public static final String DEFAULT_SPILLABLE_MAP_BASE_PATH = "/tmp/";
-
   private static final Logger LOG = LogManager.getLogger(AbstractRealtimeRecordReader.class);
 
   protected final HoodieRealtimeFileSplit split;
@@ -106,7 +66,7 @@ public abstract class AbstractRealtimeRecordReader {
     try {
       this.usesCustomPayload = usesCustomPayload();
       LOG.info("usesCustomPayload ==> " + this.usesCustomPayload);
-      baseFileSchema = readSchema(jobConf, split.getPath());
+      baseFileSchema = HoodieRealtimeRecordReaderUtils.readSchema(jobConf, split.getPath());
       init();
     } catch (IOException e) {
       throw new HoodieIOException("Could not create HoodieRealtimeRecordReader on path " + this.split.getPath(), e);
@@ -120,220 +80,6 @@ public abstract class AbstractRealtimeRecordReader {
   }
 
   /**
-   * Reads the schema from the parquet file. This is different from ParquetUtils as it uses the twitter parquet to
-   * support hive 1.1.0
-   */
-  private static MessageType readSchema(Configuration conf, Path parquetFilePath) {
-    try {
-      return ParquetFileReader.readFooter(conf, parquetFilePath).getFileMetaData().getSchema();
-    } catch (IOException e) {
-      throw new HoodieIOException("Failed to read footer for parquet " + parquetFilePath, e);
-    }
-  }
-
-  /**
-   * Prints a JSON representation of the ArrayWritable for easier debuggability.
-   */
-  protected static String arrayWritableToString(ArrayWritable writable) {
-    if (writable == null) {
-      return "null";
-    }
-    StringBuilder builder = new StringBuilder();
-    Writable[] values = writable.get();
-    builder.append("\"values_" + Math.random() + "_" + values.length + "\": {");
-    int i = 0;
-    for (Writable w : values) {
-      if (w instanceof ArrayWritable) {
-        builder.append(arrayWritableToString((ArrayWritable) w)).append(",");
-      } else {
-        builder.append("\"value" + i + "\":\"" + w + "\"").append(",");
-        if (w == null) {
-          builder.append("\"type" + i + "\":\"unknown\"").append(",");
-        } else {
-          builder.append("\"type" + i + "\":\"" + w.getClass().getSimpleName() + "\"").append(",");
-        }
-      }
-      i++;
-    }
-    builder.deleteCharAt(builder.length() - 1);
-    builder.append("}");
-    return builder.toString();
-  }
-
-  /**
-   * Given a comma separated list of field names and positions at which they appear on Hive, return
-   * an ordered list of field names, that can be passed onto storage.
-   */
-  private static List<String> orderFields(String fieldNameCsv, String fieldOrderCsv, List<String> partitioningFields) {
-    // Need to convert the following to Set first since Hive does not handle duplicate field names correctly but
-    // handles duplicate fields orders correctly.
-    // Fields Orders -> {@link https://github
-    // .com/apache/hive/blob/f37c5de6c32b9395d1b34fa3c02ed06d1bfbf6eb/serde/src/java
-    // /org/apache/hadoop/hive/serde2/ColumnProjectionUtils.java#L188}
-    // Field Names -> {@link https://github.com/apache/hive/blob/f37c5de6c32b9395d1b34fa3c02ed06d1bfbf6eb/serde/src/java
-    // /org/apache/hadoop/hive/serde2/ColumnProjectionUtils.java#L229}
-    String[] fieldOrdersWithDups = fieldOrderCsv.split(",");
-    Set<String> fieldOrdersSet = new LinkedHashSet<>(Arrays.asList(fieldOrdersWithDups));
-    String[] fieldOrders = fieldOrdersSet.toArray(new String[0]);
-    List<String> fieldNames = Arrays.stream(fieldNameCsv.split(","))
-        .filter(fn -> !partitioningFields.contains(fn)).collect(Collectors.toList());
-    Set<String> fieldNamesSet = new LinkedHashSet<>(fieldNames);
-    // Hive does not provide ids for partitioning fields, so check for lengths excluding that.
-    if (fieldNamesSet.size() != fieldOrders.length) {
-      throw new HoodieException(String
-          .format("Error ordering fields for storage read. #fieldNames: %d, #fieldPositions: %d",
-              fieldNames.size(), fieldOrders.length));
-    }
-    TreeMap<Integer, String> orderedFieldMap = new TreeMap<>();
-    String[] fieldNamesArray = fieldNamesSet.toArray(new String[0]);
-    for (int ox = 0; ox < fieldOrders.length; ox++) {
-      orderedFieldMap.put(Integer.parseInt(fieldOrders[ox]), fieldNamesArray[ox]);
-    }
-    return new ArrayList<>(orderedFieldMap.values());
-  }
-
-  /**
-   * Generate a reader schema off the provided writeSchema, to just project out the provided columns.
-   */
-  public static Schema generateProjectionSchema(Schema writeSchema, Map<String, Field> schemaFieldsMap,
-      List<String> fieldNames) {
-    /**
-     * Avro & Presto field names seems to be case sensitive (support fields differing only in case) whereas
-     * Hive/Impala/SparkSQL(default) are case-insensitive. Spark allows this to be configurable using
-     * spark.sql.caseSensitive=true
-     *
-     * For a RT table setup with no delta-files (for a latest file-slice) -> we translate parquet schema to Avro Here
-     * the field-name case is dependent on parquet schema. Hive (1.x/2.x/CDH) translate column projections to
-     * lower-cases
-     *
-     */
-    List<Schema.Field> projectedFields = new ArrayList<>();
-    for (String fn : fieldNames) {
-      Schema.Field field = schemaFieldsMap.get(fn.toLowerCase());
-      if (field == null) {
-        throw new HoodieException("Field " + fn + " not found in log schema. Query cannot proceed! "
-            + "Derived Schema Fields: " + new ArrayList<>(schemaFieldsMap.keySet()));
-      } else {
-        projectedFields.add(new Schema.Field(field.name(), field.schema(), field.doc(), field.defaultVal()));
-      }
-    }
-
-    Schema projectedSchema = Schema.createRecord(writeSchema.getName(), writeSchema.getDoc(),
-        writeSchema.getNamespace(), writeSchema.isError());
-    projectedSchema.setFields(projectedFields);
-    return projectedSchema;
-  }
-
-  public static Map<String, Field> getNameToFieldMap(Schema schema) {
-    return schema.getFields().stream().map(r -> Pair.of(r.name().toLowerCase(), r))
-        .collect(Collectors.toMap(Pair::getLeft, Pair::getRight));
-  }
-
-  /**
-   * Convert the projected read from delta record into an array writable.
-   */
-  public static Writable avroToArrayWritable(Object value, Schema schema) {
-
-    if (value == null) {
-      return null;
-    }
-
-    switch (schema.getType()) {
-      case STRING:
-        return new Text(value.toString());
-      case BYTES:
-        return new BytesWritable((byte[]) value);
-      case INT:
-        return new IntWritable((Integer) value);
-      case LONG:
-        return new LongWritable((Long) value);
-      case FLOAT:
-        return new FloatWritable((Float) value);
-      case DOUBLE:
-        return new DoubleWritable((Double) value);
-      case BOOLEAN:
-        return new BooleanWritable((Boolean) value);
-      case NULL:
-        return null;
-      case RECORD:
-        GenericRecord record = (GenericRecord) value;
-        Writable[] recordValues = new Writable[schema.getFields().size()];
-        int recordValueIndex = 0;
-        for (Schema.Field field : schema.getFields()) {
-          recordValues[recordValueIndex++] = avroToArrayWritable(record.get(field.name()), field.schema());
-        }
-        return new ArrayWritable(Writable.class, recordValues);
-      case ENUM:
-        return new Text(value.toString());
-      case ARRAY:
-        GenericArray arrayValue = (GenericArray) value;
-        Writable[] arrayValues = new Writable[arrayValue.size()];
-        int arrayValueIndex = 0;
-        for (Object obj : arrayValue) {
-          arrayValues[arrayValueIndex++] = avroToArrayWritable(obj, schema.getElementType());
-        }
-        // Hive 1.x will fail here, it requires values2 to be wrapped into another ArrayWritable
-        return new ArrayWritable(Writable.class, arrayValues);
-      case MAP:
-        Map mapValue = (Map) value;
-        Writable[] mapValues = new Writable[mapValue.size()];
-        int mapValueIndex = 0;
-        for (Object entry : mapValue.entrySet()) {
-          Map.Entry mapEntry = (Map.Entry) entry;
-          Writable[] nestedMapValues = new Writable[2];
-          nestedMapValues[0] = new Text(mapEntry.getKey().toString());
-          nestedMapValues[1] = avroToArrayWritable(mapEntry.getValue(), schema.getValueType());
-          mapValues[mapValueIndex++] = new ArrayWritable(Writable.class, nestedMapValues);
-        }
-        // Hive 1.x will fail here, it requires values3 to be wrapped into another ArrayWritable
-        return new ArrayWritable(Writable.class, mapValues);
-      case UNION:
-        List<Schema> types = schema.getTypes();
-        if (types.size() != 2) {
-          throw new IllegalArgumentException("Only support union with 2 fields");
-        }
-        Schema s1 = types.get(0);
-        Schema s2 = types.get(1);
-        if (s1.getType() == Schema.Type.NULL) {
-          return avroToArrayWritable(value, s2);
-        } else if (s2.getType() == Schema.Type.NULL) {
-          return avroToArrayWritable(value, s1);
-        } else {
-          throw new IllegalArgumentException("Only support union with null");
-        }
-      case FIXED:
-        if (schema.getLogicalType() != null && schema.getLogicalType().getName().equals("decimal")) {
-          LogicalTypes.Decimal decimal = (LogicalTypes.Decimal) LogicalTypes.fromSchema(schema);
-          HiveDecimalWritable writable = new HiveDecimalWritable(((GenericFixed) value).bytes(),
-                  decimal.getScale());
-          return HiveDecimalWritable.enforcePrecisionScale(writable,
-                  decimal.getPrecision(),
-                  decimal.getScale());
-        }
-        return new BytesWritable(((GenericFixed) value).bytes());
-      default:
-        return null;
-    }
-  }
-
-  /**
-   * Hive implementation of ParquetRecordReader results in partition columns not present in the original parquet file to
-   * also be part of the projected schema. Hive expects the record reader implementation to return the row in its
-   * entirety (with un-projected column having null values). As we use writerSchema for this, make sure writer schema
-   * also includes partition columns
-   *
-   * @param schema Schema to be changed
-   */
-  private static Schema addPartitionFields(Schema schema, List<String> partitioningFields) {
-    final Set<String> firstLevelFieldNames =
-        schema.getFields().stream().map(Field::name).map(String::toLowerCase).collect(Collectors.toSet());
-    List<String> fieldsToAdd = partitioningFields.stream().map(String::toLowerCase)
-        .filter(x -> !firstLevelFieldNames.contains(x)).collect(Collectors.toList());
-
-    return HoodieAvroUtils.appendNullSchemaFields(schema, fieldsToAdd);
-  }
-
-  /**
    * Goes through the log files in reverse order and finds the schema from the last available data block. If not, falls
    * back to the schema from the latest parquet file. Finally, sets the partition column and projection fields into the
    * job conf.
@@ -353,16 +99,16 @@ public abstract class AbstractRealtimeRecordReader {
     List<String> partitioningFields =
         partitionFields.length() > 0 ? Arrays.stream(partitionFields.split("/")).collect(Collectors.toList())
             : new ArrayList<>();
-    writerSchema = addPartitionFields(writerSchema, partitioningFields);
-    List<String> projectionFields = orderFields(jobConf.get(ColumnProjectionUtils.READ_COLUMN_NAMES_CONF_STR),
+    writerSchema = HoodieRealtimeRecordReaderUtils.addPartitionFields(writerSchema, partitioningFields);
+    List<String> projectionFields = HoodieRealtimeRecordReaderUtils.orderFields(jobConf.get(ColumnProjectionUtils.READ_COLUMN_NAMES_CONF_STR),
         jobConf.get(ColumnProjectionUtils.READ_COLUMN_IDS_CONF_STR), partitioningFields);
 
-    Map<String, Field> schemaFieldsMap = getNameToFieldMap(writerSchema);
+    Map<String, Field> schemaFieldsMap = HoodieRealtimeRecordReaderUtils.getNameToFieldMap(writerSchema);
     hiveSchema = constructHiveOrderedSchema(writerSchema, schemaFieldsMap);
     // TODO(vc): In the future, the reader schema should be updated based on log files & be able
     // to null out fields not present before
 
-    readerSchema = generateProjectionSchema(writerSchema, schemaFieldsMap, projectionFields);
+    readerSchema = HoodieRealtimeRecordReaderUtils.generateProjectionSchema(writerSchema, schemaFieldsMap, projectionFields);
     LOG.info(String.format("About to read compacted logs %s for base split %s, projecting cols %s",
         split.getDeltaLogPaths(), split.getPath(), projectionFields));
   }
@@ -409,7 +155,8 @@ public abstract class AbstractRealtimeRecordReader {
   public long getMaxCompactionMemoryInBytes() {
     // jobConf.getMemoryForMapTask() returns in MB
     return (long) Math
-        .ceil(Double.parseDouble(jobConf.get(COMPACTION_MEMORY_FRACTION_PROP, DEFAULT_COMPACTION_MEMORY_FRACTION))
+        .ceil(Double.parseDouble(jobConf.get(HoodieRealtimeConfig.COMPACTION_MEMORY_FRACTION_PROP,
+            HoodieRealtimeConfig.DEFAULT_COMPACTION_MEMORY_FRACTION))
             * jobConf.getMemoryForMapTask() * 1024 * 1024L);
   }
 }
diff --git a/hudi-hadoop-mr/src/main/java/org/apache/hudi/hadoop/realtime/HoodieCombineRealtimeRecordReader.java b/hudi-hadoop-mr/src/main/java/org/apache/hudi/hadoop/realtime/HoodieCombineRealtimeRecordReader.java
index bdf11ed..7a0bd37 100644
--- a/hudi-hadoop-mr/src/main/java/org/apache/hudi/hadoop/realtime/HoodieCombineRealtimeRecordReader.java
+++ b/hudi-hadoop-mr/src/main/java/org/apache/hudi/hadoop/realtime/HoodieCombineRealtimeRecordReader.java
@@ -20,6 +20,7 @@ package org.apache.hudi.hadoop.realtime;
 
 import org.apache.hudi.common.util.ValidationUtils;
 import org.apache.hudi.hadoop.hive.HoodieCombineRealtimeFileSplit;
+import org.apache.hudi.hadoop.utils.HoodieRealtimeRecordReaderUtils;
 
 import org.apache.hadoop.io.ArrayWritable;
 import org.apache.hadoop.io.NullWritable;
@@ -65,7 +66,7 @@ public class HoodieCombineRealtimeRecordReader implements RecordReader<NullWrita
   public boolean next(NullWritable key, ArrayWritable value) throws IOException {
     if (this.currentRecordReader.next(key, value)) {
       LOG.info("Reading from record reader");
-      LOG.info(AbstractRealtimeRecordReader.arrayWritableToString(value));
+      LOG.info(HoodieRealtimeRecordReaderUtils.arrayWritableToString(value));
       return true;
     } else if (recordReaders.size() > 0) {
       this.currentRecordReader.close();
diff --git a/hudi-hadoop-mr/src/main/java/org/apache/hudi/hadoop/realtime/HoodieParquetRealtimeInputFormat.java b/hudi-hadoop-mr/src/main/java/org/apache/hudi/hadoop/realtime/HoodieParquetRealtimeInputFormat.java
index ae3fb5c..1124791 100644
--- a/hudi-hadoop-mr/src/main/java/org/apache/hudi/hadoop/realtime/HoodieParquetRealtimeInputFormat.java
+++ b/hudi-hadoop-mr/src/main/java/org/apache/hudi/hadoop/realtime/HoodieParquetRealtimeInputFormat.java
@@ -18,27 +18,17 @@
 
 package org.apache.hudi.hadoop.realtime;
 
-import org.apache.hudi.common.fs.FSUtils;
-import org.apache.hudi.common.model.FileSlice;
-import org.apache.hudi.common.model.HoodieLogFile;
 import org.apache.hudi.common.model.HoodieRecord;
-import org.apache.hudi.common.table.HoodieTableMetaClient;
 import org.apache.hudi.common.table.timeline.HoodieDefaultTimeline;
-import org.apache.hudi.common.table.timeline.HoodieInstant;
-import org.apache.hudi.common.table.timeline.HoodieTimeline;
-import org.apache.hudi.common.table.view.HoodieTableFileSystemView;
-import org.apache.hudi.common.util.CollectionUtils;
-import org.apache.hudi.common.util.Option;
 import org.apache.hudi.common.util.ValidationUtils;
-import org.apache.hudi.exception.HoodieException;
-import org.apache.hudi.exception.HoodieIOException;
 import org.apache.hudi.hadoop.HoodieParquetInputFormat;
 import org.apache.hudi.hadoop.UseFileSplitsFromInputFormat;
+import org.apache.hudi.hadoop.utils.HoodieInputFormatUtils;
+import org.apache.hudi.hadoop.utils.HoodieRealtimeInputFormatUtils;
 
 import org.apache.hadoop.conf.Configurable;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.fs.FileStatus;
-import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.hive.serde2.ColumnProjectionUtils;
 import org.apache.hadoop.io.ArrayWritable;
 import org.apache.hadoop.io.NullWritable;
@@ -52,13 +42,7 @@ import org.apache.log4j.LogManager;
 import org.apache.log4j.Logger;
 
 import java.io.IOException;
-import java.util.ArrayList;
 import java.util.Arrays;
-import java.util.HashMap;
-import java.util.List;
-import java.util.Map;
-import java.util.function.Function;
-import java.util.stream.Collectors;
 import java.util.stream.Stream;
 
 /**
@@ -70,11 +54,6 @@ public class HoodieParquetRealtimeInputFormat extends HoodieParquetInputFormat i
 
   private static final Logger LOG = LogManager.getLogger(HoodieParquetRealtimeInputFormat.class);
 
-  // These positions have to be deterministic across all tables
-  public static final int HOODIE_COMMIT_TIME_COL_POS = 0;
-  public static final int HOODIE_RECORD_KEY_COL_POS = 2;
-  public static final int HOODIE_PARTITION_PATH_COL_POS = 3;
-  public static final String HOODIE_READ_COLUMNS_PROP = "hoodie.read.columns.set";
   // To make Hive on Spark queries work with RT tables. Our theory is that due to
   // {@link org.apache.hadoop.hive.ql.io.parquet.ProjectionPusher}
   // not handling empty list correctly, the ParquetRecordReaderWrapper ends up adding the same column ids multiple
@@ -85,74 +64,7 @@ public class HoodieParquetRealtimeInputFormat extends HoodieParquetInputFormat i
 
     Stream<FileSplit> fileSplits = Arrays.stream(super.getSplits(job, numSplits)).map(is -> (FileSplit) is);
 
-    // obtain all unique parent folders for splits
-    Map<Path, List<FileSplit>> partitionsToParquetSplits =
-        fileSplits.collect(Collectors.groupingBy(split -> split.getPath().getParent()));
-    // TODO(vc): Should we handle also non-hoodie splits here?
-    Map<String, HoodieTableMetaClient> metaClientMap = new HashMap<>();
-    Map<Path, HoodieTableMetaClient> partitionsToMetaClient =
-        partitionsToParquetSplits.keySet().stream().collect(Collectors.toMap(Function.identity(), p -> {
-          // find if we have a metaclient already for this partition.
-          Option<String> matchingBasePath = Option.fromJavaOptional(
-              metaClientMap.keySet().stream().filter(basePath -> p.toString().startsWith(basePath)).findFirst());
-          if (matchingBasePath.isPresent()) {
-            return metaClientMap.get(matchingBasePath.get());
-          }
-
-          try {
-            HoodieTableMetaClient metaClient = getTableMetaClient(p.getFileSystem(conf), p);
-            metaClientMap.put(metaClient.getBasePath(), metaClient);
-            return metaClient;
-          } catch (IOException e) {
-            throw new HoodieIOException("Error creating hoodie meta client against : " + p, e);
-          }
-        }));
-
-    // for all unique split parents, obtain all delta files based on delta commit timeline,
-    // grouped on file id
-    List<HoodieRealtimeFileSplit> rtSplits = new ArrayList<>();
-    partitionsToParquetSplits.keySet().forEach(partitionPath -> {
-      // for each partition path obtain the data & log file groupings, then map back to inputsplits
-      HoodieTableMetaClient metaClient = partitionsToMetaClient.get(partitionPath);
-      HoodieTableFileSystemView fsView = new HoodieTableFileSystemView(metaClient, metaClient.getActiveTimeline());
-      String relPartitionPath = FSUtils.getRelativePartitionPath(new Path(metaClient.getBasePath()), partitionPath);
-
-      try {
-        // Both commit and delta-commits are included - pick the latest completed one
-        Option<HoodieInstant> latestCompletedInstant =
-            metaClient.getActiveTimeline().getCommitsTimeline().filterCompletedInstants().lastInstant();
-
-        Stream<FileSlice> latestFileSlices = latestCompletedInstant
-            .map(instant -> fsView.getLatestMergedFileSlicesBeforeOrOn(relPartitionPath, instant.getTimestamp()))
-            .orElse(Stream.empty());
-
-        // subgroup splits again by file id & match with log files.
-        Map<String, List<FileSplit>> groupedInputSplits = partitionsToParquetSplits.get(partitionPath).stream()
-            .collect(Collectors.groupingBy(split -> FSUtils.getFileId(split.getPath().getName())));
-        latestFileSlices.forEach(fileSlice -> {
-          List<FileSplit> dataFileSplits = groupedInputSplits.get(fileSlice.getFileId());
-          dataFileSplits.forEach(split -> {
-            try {
-              List<String> logFilePaths = fileSlice.getLogFiles().sorted(HoodieLogFile.getLogFileComparator())
-                  .map(logFile -> logFile.getPath().toString()).collect(Collectors.toList());
-              // Get the maxCommit from the last delta or compaction or commit - when
-              // bootstrapped from COW table
-              String maxCommitTime = metaClient
-                  .getActiveTimeline().getTimelineOfActions(CollectionUtils.createSet(HoodieTimeline.COMMIT_ACTION,
-                      HoodieTimeline.ROLLBACK_ACTION, HoodieTimeline.DELTA_COMMIT_ACTION))
-                  .filterCompletedInstants().lastInstant().get().getTimestamp();
-              rtSplits.add(new HoodieRealtimeFileSplit(split, metaClient.getBasePath(), logFilePaths, maxCommitTime));
-            } catch (IOException e) {
-              throw new HoodieIOException("Error creating hoodie real time split ", e);
-            }
-          });
-        });
-      } catch (Exception e) {
-        throw new HoodieException("Error obtaining data file/log file grouping: " + partitionPath, e);
-      }
-    });
-    LOG.info("Returning a total splits of " + rtSplits.size());
-    return rtSplits.toArray(new InputSplit[0]);
+    return HoodieRealtimeInputFormatUtils.getRealtimeSplits(job, fileSplits);
   }
 
   @Override
@@ -199,9 +111,9 @@ public class HoodieParquetRealtimeInputFormat extends HoodieParquetInputFormat i
 
   private static void addRequiredProjectionFields(Configuration configuration) {
     // Need this to do merge records in HoodieRealtimeRecordReader
-    addProjectionField(configuration, HoodieRecord.RECORD_KEY_METADATA_FIELD, HOODIE_RECORD_KEY_COL_POS);
-    addProjectionField(configuration, HoodieRecord.COMMIT_TIME_METADATA_FIELD, HOODIE_COMMIT_TIME_COL_POS);
-    addProjectionField(configuration, HoodieRecord.PARTITION_PATH_METADATA_FIELD, HOODIE_PARTITION_PATH_COL_POS);
+    addProjectionField(configuration, HoodieRecord.RECORD_KEY_METADATA_FIELD, HoodieInputFormatUtils.HOODIE_RECORD_KEY_COL_POS);
+    addProjectionField(configuration, HoodieRecord.COMMIT_TIME_METADATA_FIELD, HoodieInputFormatUtils.HOODIE_COMMIT_TIME_COL_POS);
+    addProjectionField(configuration, HoodieRecord.PARTITION_PATH_METADATA_FIELD, HoodieInputFormatUtils.HOODIE_PARTITION_PATH_COL_POS);
   }
 
   /**
@@ -228,12 +140,12 @@ public class HoodieParquetRealtimeInputFormat extends HoodieParquetInputFormat i
     // risk of experiencing race conditions. Hence, we synchronize on the JobConf object here. There is negligible
     // latency incurred here due to the synchronization since get record reader is called once per spilt before the
     // actual heavy lifting of reading the parquet files happen.
-    if (jobConf.get(HOODIE_READ_COLUMNS_PROP) == null) {
+    if (jobConf.get(HoodieInputFormatUtils.HOODIE_READ_COLUMNS_PROP) == null) {
       synchronized (jobConf) {
         LOG.info(
             "Before adding Hoodie columns, Projections :" + jobConf.get(ColumnProjectionUtils.READ_COLUMN_NAMES_CONF_STR)
                 + ", Ids :" + jobConf.get(ColumnProjectionUtils.READ_COLUMN_IDS_CONF_STR));
-        if (jobConf.get(HOODIE_READ_COLUMNS_PROP) == null) {
+        if (jobConf.get(HoodieInputFormatUtils.HOODIE_READ_COLUMNS_PROP) == null) {
           // Hive (across all versions) fails for queries like select count(`_hoodie_commit_time`) from table;
           // In this case, the projection fields gets removed. Looking at HiveInputFormat implementation, in some cases
           // hoodie additional projection columns are reset after calling setConf and only natural projections
@@ -245,7 +157,7 @@ public class HoodieParquetRealtimeInputFormat extends HoodieParquetInputFormat i
           addRequiredProjectionFields(jobConf);
 
           this.conf = jobConf;
-          this.conf.set(HOODIE_READ_COLUMNS_PROP, "true");
+          this.conf.set(HoodieInputFormatUtils.HOODIE_READ_COLUMNS_PROP, "true");
         }
       }
     }
diff --git a/hudi-hadoop-mr/src/main/java/org/apache/hudi/hadoop/realtime/RealtimeCompactedRecordReader.java b/hudi-hadoop-mr/src/main/java/org/apache/hudi/hadoop/realtime/RealtimeCompactedRecordReader.java
index 73e0b50..02bb5eb 100644
--- a/hudi-hadoop-mr/src/main/java/org/apache/hudi/hadoop/realtime/RealtimeCompactedRecordReader.java
+++ b/hudi-hadoop-mr/src/main/java/org/apache/hudi/hadoop/realtime/RealtimeCompactedRecordReader.java
@@ -24,6 +24,9 @@ import org.apache.hudi.common.model.HoodieRecord;
 import org.apache.hudi.common.model.HoodieRecordPayload;
 import org.apache.hudi.common.table.log.HoodieMergedLogRecordScanner;
 import org.apache.hudi.common.util.Option;
+import org.apache.hudi.hadoop.config.HoodieRealtimeConfig;
+import org.apache.hudi.hadoop.utils.HoodieInputFormatUtils;
+import org.apache.hudi.hadoop.utils.HoodieRealtimeRecordReaderUtils;
 
 import org.apache.avro.generic.GenericRecord;
 import org.apache.hadoop.io.ArrayWritable;
@@ -60,13 +63,17 @@ class RealtimeCompactedRecordReader extends AbstractRealtimeRecordReader
     // NOTE: HoodieCompactedLogRecordScanner will not return records for an in-flight commit
     // but can return records for completed commits > the commit we are trying to read (if using
     // readCommit() API)
-    return new HoodieMergedLogRecordScanner(FSUtils.getFs(split.getPath().toString(), jobConf), split.getBasePath(),
-        split.getDeltaLogPaths(), usesCustomPayload ? getWriterSchema() : getReaderSchema(), split.getMaxCommitTime(),
+    return new HoodieMergedLogRecordScanner(
+        FSUtils.getFs(split.getPath().toString(), jobConf),
+        split.getBasePath(),
+        split.getDeltaLogPaths(),
+        usesCustomPayload ? getWriterSchema() : getReaderSchema(),
+        split.getMaxCommitTime(),
         getMaxCompactionMemoryInBytes(),
-        Boolean
-            .parseBoolean(jobConf.get(COMPACTION_LAZY_BLOCK_READ_ENABLED_PROP, DEFAULT_COMPACTION_LAZY_BLOCK_READ_ENABLED)),
-        false, jobConf.getInt(MAX_DFS_STREAM_BUFFER_SIZE_PROP, DEFAULT_MAX_DFS_STREAM_BUFFER_SIZE),
-        jobConf.get(SPILLABLE_MAP_BASE_PATH_PROP, DEFAULT_SPILLABLE_MAP_BASE_PATH));
+        Boolean.parseBoolean(jobConf.get(HoodieRealtimeConfig.COMPACTION_LAZY_BLOCK_READ_ENABLED_PROP, HoodieRealtimeConfig.DEFAULT_COMPACTION_LAZY_BLOCK_READ_ENABLED)),
+        false,
+        jobConf.getInt(HoodieRealtimeConfig.MAX_DFS_STREAM_BUFFER_SIZE_PROP, HoodieRealtimeConfig.DEFAULT_MAX_DFS_STREAM_BUFFER_SIZE),
+        jobConf.get(HoodieRealtimeConfig.SPILLABLE_MAP_BASE_PATH_PROP, HoodieRealtimeConfig.DEFAULT_SPILLABLE_MAP_BASE_PATH));
   }
 
   @Override
@@ -81,7 +88,7 @@ class RealtimeCompactedRecordReader extends AbstractRealtimeRecordReader
       // TODO(VC): Right now, we assume all records in log, have a matching base record. (which
       // would be true until we have a way to index logs too)
       // return from delta records map if we have some match.
-      String key = arrayWritable.get()[HoodieParquetRealtimeInputFormat.HOODIE_RECORD_KEY_COL_POS].toString();
+      String key = arrayWritable.get()[HoodieInputFormatUtils.HOODIE_RECORD_KEY_COL_POS].toString();
       if (deltaRecordMap.containsKey(key)) {
         // TODO(NA): Invoke preCombine here by converting arrayWritable to Avro. This is required since the
         // deltaRecord may not be a full record and needs values of columns from the parquet
@@ -105,11 +112,11 @@ class RealtimeCompactedRecordReader extends AbstractRealtimeRecordReader
         // we assume, a later safe record in the log, is newer than what we have in the map &
         // replace it. Since we want to return an arrayWritable which is the same length as the elements in the latest
         // schema, we use writerSchema to create the arrayWritable from the latest generic record
-        ArrayWritable aWritable = (ArrayWritable) avroToArrayWritable(recordToReturn, getHiveSchema());
+        ArrayWritable aWritable = (ArrayWritable) HoodieRealtimeRecordReaderUtils.avroToArrayWritable(recordToReturn, getHiveSchema());
         Writable[] replaceValue = aWritable.get();
         if (LOG.isDebugEnabled()) {
-          LOG.debug(String.format("key %s, base values: %s, log values: %s", key, arrayWritableToString(arrayWritable),
-              arrayWritableToString(aWritable)));
+          LOG.debug(String.format("key %s, base values: %s, log values: %s", key, HoodieRealtimeRecordReaderUtils.arrayWritableToString(arrayWritable),
+              HoodieRealtimeRecordReaderUtils.arrayWritableToString(aWritable)));
         }
         Writable[] originalValue = arrayWritable.get();
         try {
@@ -117,10 +124,10 @@ class RealtimeCompactedRecordReader extends AbstractRealtimeRecordReader
           arrayWritable.set(originalValue);
         } catch (RuntimeException re) {
           LOG.error("Got exception when doing array copy", re);
-          LOG.error("Base record :" + arrayWritableToString(arrayWritable));
-          LOG.error("Log record :" + arrayWritableToString(aWritable));
-          String errMsg = "Base-record :" + arrayWritableToString(arrayWritable)
-              + " ,Log-record :" + arrayWritableToString(aWritable) + " ,Error :" + re.getMessage();
+          LOG.error("Base record :" + HoodieRealtimeRecordReaderUtils.arrayWritableToString(arrayWritable));
+          LOG.error("Log record :" + HoodieRealtimeRecordReaderUtils.arrayWritableToString(aWritable));
+          String errMsg = "Base-record :" + HoodieRealtimeRecordReaderUtils.arrayWritableToString(arrayWritable)
+              + " ,Log-record :" + HoodieRealtimeRecordReaderUtils.arrayWritableToString(aWritable) + " ,Error :" + re.getMessage();
           throw new RuntimeException(errMsg, re);
         }
       }
diff --git a/hudi-hadoop-mr/src/main/java/org/apache/hudi/hadoop/realtime/RealtimeUnmergedRecordReader.java b/hudi-hadoop-mr/src/main/java/org/apache/hudi/hadoop/realtime/RealtimeUnmergedRecordReader.java
index 4c773d4..8bc1cfb 100644
--- a/hudi-hadoop-mr/src/main/java/org/apache/hudi/hadoop/realtime/RealtimeUnmergedRecordReader.java
+++ b/hudi-hadoop-mr/src/main/java/org/apache/hudi/hadoop/realtime/RealtimeUnmergedRecordReader.java
@@ -37,6 +37,8 @@ import org.apache.hudi.common.util.queue.FunctionBasedQueueProducer;
 import org.apache.hudi.common.util.queue.IteratorBasedQueueProducer;
 import org.apache.hudi.hadoop.RecordReaderValueIterator;
 import org.apache.hudi.hadoop.SafeParquetRecordReaderWrapper;
+import org.apache.hudi.hadoop.config.HoodieRealtimeConfig;
+import org.apache.hudi.hadoop.utils.HoodieRealtimeRecordReaderUtils;
 
 class RealtimeUnmergedRecordReader extends AbstractRealtimeRecordReader
     implements RecordReader<NullWritable, ArrayWritable> {
@@ -76,11 +78,11 @@ class RealtimeUnmergedRecordReader extends AbstractRealtimeRecordReader
     this.iterator = this.executor.getQueue().iterator();
     this.logRecordScanner = new HoodieUnMergedLogRecordScanner(FSUtils.getFs(split.getPath().toString(), jobConf),
         split.getBasePath(), split.getDeltaLogPaths(), getReaderSchema(), split.getMaxCommitTime(),
-        Boolean.parseBoolean(jobConf.get(COMPACTION_LAZY_BLOCK_READ_ENABLED_PROP, DEFAULT_COMPACTION_LAZY_BLOCK_READ_ENABLED)),
-        false, jobConf.getInt(MAX_DFS_STREAM_BUFFER_SIZE_PROP, DEFAULT_MAX_DFS_STREAM_BUFFER_SIZE), record -> {
+        Boolean.parseBoolean(jobConf.get(HoodieRealtimeConfig.COMPACTION_LAZY_BLOCK_READ_ENABLED_PROP, HoodieRealtimeConfig.DEFAULT_COMPACTION_LAZY_BLOCK_READ_ENABLED)),
+        false, jobConf.getInt(HoodieRealtimeConfig.MAX_DFS_STREAM_BUFFER_SIZE_PROP, HoodieRealtimeConfig.DEFAULT_MAX_DFS_STREAM_BUFFER_SIZE), record -> {
           // convert Hoodie log record to Hadoop AvroWritable and buffer
           GenericRecord rec = (GenericRecord) record.getData().getInsertValue(getReaderSchema()).get();
-          ArrayWritable aWritable = (ArrayWritable) avroToArrayWritable(rec, getHiveSchema());
+          ArrayWritable aWritable = (ArrayWritable) HoodieRealtimeRecordReaderUtils.avroToArrayWritable(rec, getHiveSchema());
           this.executor.getQueue().insertRecord(aWritable);
     });
     // Start reading and buffering
diff --git a/hudi-hadoop-mr/src/main/java/org/apache/hudi/hadoop/HoodieHiveUtil.java b/hudi-hadoop-mr/src/main/java/org/apache/hudi/hadoop/utils/HoodieHiveUtils.java
similarity index 98%
rename from hudi-hadoop-mr/src/main/java/org/apache/hudi/hadoop/HoodieHiveUtil.java
rename to hudi-hadoop-mr/src/main/java/org/apache/hudi/hadoop/utils/HoodieHiveUtils.java
index 0537cfa..02fb9d0 100644
--- a/hudi-hadoop-mr/src/main/java/org/apache/hudi/hadoop/HoodieHiveUtil.java
+++ b/hudi-hadoop-mr/src/main/java/org/apache/hudi/hadoop/utils/HoodieHiveUtils.java
@@ -16,7 +16,7 @@
  * limitations under the License.
  */
 
-package org.apache.hudi.hadoop;
+package org.apache.hudi.hadoop.utils;
 
 import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.mapreduce.JobContext;
@@ -31,9 +31,9 @@ import java.util.regex.Matcher;
 import java.util.regex.Pattern;
 import java.util.stream.Collectors;
 
-public class HoodieHiveUtil {
+public class HoodieHiveUtils {
 
-  public static final Logger LOG = LogManager.getLogger(HoodieHiveUtil.class);
+  public static final Logger LOG = LogManager.getLogger(HoodieHiveUtils.class);
 
   public static final String HOODIE_CONSUME_MODE_PATTERN = "hoodie.%s.consume.mode";
   public static final String HOODIE_START_COMMIT_PATTERN = "hoodie.%s.consume.start.timestamp";
diff --git a/hudi-hadoop-mr/src/main/java/org/apache/hudi/hadoop/HoodieParquetInputFormat.java b/hudi-hadoop-mr/src/main/java/org/apache/hudi/hadoop/utils/HoodieInputFormatUtils.java
similarity index 58%
copy from hudi-hadoop-mr/src/main/java/org/apache/hudi/hadoop/HoodieParquetInputFormat.java
copy to hudi-hadoop-mr/src/main/java/org/apache/hudi/hadoop/utils/HoodieInputFormatUtils.java
index 5a10f3c..d10b664 100644
--- a/hudi-hadoop-mr/src/main/java/org/apache/hudi/hadoop/HoodieParquetInputFormat.java
+++ b/hudi-hadoop-mr/src/main/java/org/apache/hudi/hadoop/utils/HoodieInputFormatUtils.java
@@ -16,21 +16,8 @@
  * limitations under the License.
  */
 
-package org.apache.hudi.hadoop;
+package org.apache.hudi.hadoop.utils;
 
-import org.apache.hadoop.conf.Configurable;
-import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.fs.FileStatus;
-import org.apache.hadoop.fs.FileSystem;
-import org.apache.hadoop.fs.Path;
-import org.apache.hadoop.hive.ql.io.parquet.MapredParquetInputFormat;
-import org.apache.hadoop.io.ArrayWritable;
-import org.apache.hadoop.io.NullWritable;
-import org.apache.hadoop.mapred.InputSplit;
-import org.apache.hadoop.mapred.JobConf;
-import org.apache.hadoop.mapred.RecordReader;
-import org.apache.hadoop.mapred.Reporter;
-import org.apache.hadoop.mapreduce.Job;
 import org.apache.hudi.common.model.HoodieBaseFile;
 import org.apache.hudi.common.model.HoodieCommitMetadata;
 import org.apache.hudi.common.model.HoodiePartitionMetadata;
@@ -39,85 +26,40 @@ import org.apache.hudi.common.table.timeline.HoodieDefaultTimeline;
 import org.apache.hudi.common.table.timeline.HoodieInstant;
 import org.apache.hudi.common.table.timeline.HoodieTimeline;
 import org.apache.hudi.common.table.view.HoodieTableFileSystemView;
-import org.apache.hudi.common.table.view.TableFileSystemView.BaseFileOnlyView;
+import org.apache.hudi.common.table.view.TableFileSystemView;
 import org.apache.hudi.common.util.Option;
 import org.apache.hudi.common.util.StringUtils;
 import org.apache.hudi.exception.HoodieIOException;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileStatus;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.mapred.JobConf;
+import org.apache.hadoop.mapreduce.Job;
 import org.apache.log4j.LogManager;
 import org.apache.log4j.Logger;
 
 import java.io.IOException;
 import java.util.ArrayList;
-import java.util.Arrays;
 import java.util.Collection;
 import java.util.HashMap;
 import java.util.HashSet;
 import java.util.List;
 import java.util.Map;
 import java.util.Set;
+import java.util.function.Function;
 import java.util.stream.Collectors;
 
-/**
- * HoodieInputFormat which understands the Hoodie File Structure and filters files based on the Hoodie Mode. If paths
- * that does not correspond to a hoodie table then they are passed in as is (as what FileInputFormat.listStatus()
- * would do). The JobConf could have paths from multipe Hoodie/Non-Hoodie tables
- */
-@UseFileSplitsFromInputFormat
-public class HoodieParquetInputFormat extends MapredParquetInputFormat implements Configurable {
-
-  private static final Logger LOG = LogManager.getLogger(HoodieParquetInputFormat.class);
+public class HoodieInputFormatUtils {
 
-  protected Configuration conf;
-
-  @Override
-  public FileStatus[] listStatus(JobConf job) throws IOException {
-    // Segregate inputPaths[] to incremental, snapshot and non hoodie paths
-    List<String> incrementalTables = HoodieHiveUtil.getIncrementalTableNames(Job.getInstance(job));
-    InputPathHandler inputPathHandler = new InputPathHandler(conf, getInputPaths(job), incrementalTables);
-    List<FileStatus> returns = new ArrayList<>();
+  // These positions have to be deterministic across all tables
+  public static final int HOODIE_COMMIT_TIME_COL_POS = 0;
+  public static final int HOODIE_RECORD_KEY_COL_POS = 2;
+  public static final int HOODIE_PARTITION_PATH_COL_POS = 3;
+  public static final String HOODIE_READ_COLUMNS_PROP = "hoodie.read.columns.set";
 
-    Map<String, HoodieTableMetaClient> tableMetaClientMap = inputPathHandler.getTableMetaClientMap();
-    // process incremental pulls first
-    for (String table : incrementalTables) {
-      HoodieTableMetaClient metaClient = tableMetaClientMap.get(table);
-      if (metaClient == null) {
-        /* This can happen when the INCREMENTAL mode is set for a table but there were no InputPaths
-         * in the jobConf
-         */
-        continue;
-      }
-      List<Path> inputPaths = inputPathHandler.getGroupedIncrementalPaths().get(metaClient);
-      List<FileStatus> result = listStatusForIncrementalMode(job, metaClient, inputPaths);
-      if (result != null) {
-        returns.addAll(result);
-      }
-    }
-
-    // process non hoodie Paths next.
-    List<Path> nonHoodiePaths = inputPathHandler.getNonHoodieInputPaths();
-    if (nonHoodiePaths.size() > 0) {
-      setInputPaths(job, nonHoodiePaths.toArray(new Path[nonHoodiePaths.size()]));
-      FileStatus[] fileStatuses = super.listStatus(job);
-      returns.addAll(Arrays.asList(fileStatuses));
-    }
-
-    // process snapshot queries next.
-    List<Path> snapshotPaths = inputPathHandler.getSnapshotPaths();
-    if (snapshotPaths.size() > 0) {
-      setInputPaths(job, snapshotPaths.toArray(new Path[snapshotPaths.size()]));
-      FileStatus[] fileStatuses = super.listStatus(job);
-      Map<HoodieTableMetaClient, List<FileStatus>> groupedFileStatus =
-          groupFileStatusForSnapshotPaths(fileStatuses, tableMetaClientMap.values());
-      LOG.info("Found a total of " + groupedFileStatus.size() + " groups");
-      for (Map.Entry<HoodieTableMetaClient, List<FileStatus>> entry : groupedFileStatus.entrySet()) {
-        List<FileStatus> result = filterFileStatusForSnapshotMode(entry.getKey(), entry.getValue());
-        if (result != null) {
-          returns.addAll(result);
-        }
-      }
-    }
-    return returns.toArray(new FileStatus[returns.size()]);
-  }
+  private static final Logger LOG = LogManager.getLogger(HoodieInputFormatUtils.class);
 
   /**
    * Filter any specific instants that we do not want to process.
@@ -132,16 +74,20 @@ public class HoodieParquetInputFormat extends MapredParquetInputFormat implement
    *
    * To workaround this problem, we want to stop returning data belonging to commits > t2.
    * After compaction is complete, incremental reader would see updates in t2, t3, so on.
+   * @param timeline
+   * @return
    */
-  protected HoodieDefaultTimeline filterInstantsTimeline(HoodieDefaultTimeline timeline) {
+  public static HoodieDefaultTimeline filterInstantsTimeline(HoodieDefaultTimeline timeline) {
     HoodieDefaultTimeline commitsAndCompactionTimeline = timeline.getCommitsAndCompactionTimeline();
-    Option<HoodieInstant> pendingCompactionInstant = commitsAndCompactionTimeline.filterPendingCompactionTimeline().firstInstant();
+    Option<HoodieInstant> pendingCompactionInstant = commitsAndCompactionTimeline
+        .filterPendingCompactionTimeline().firstInstant();
     if (pendingCompactionInstant.isPresent()) {
-      HoodieDefaultTimeline instantsTimeline = commitsAndCompactionTimeline.findInstantsBefore(pendingCompactionInstant.get().getTimestamp());
+      HoodieDefaultTimeline instantsTimeline = commitsAndCompactionTimeline
+          .findInstantsBefore(pendingCompactionInstant.get().getTimestamp());
       int numCommitsFilteredByCompaction = commitsAndCompactionTimeline.getCommitsTimeline().countInstants()
           - instantsTimeline.getCommitsTimeline().countInstants();
       LOG.info("Earliest pending compaction instant is: " + pendingCompactionInstant.get().getTimestamp()
-              + " skipping " + numCommitsFilteredByCompaction + " commits");
+          + " skipping " + numCommitsFilteredByCompaction + " commits");
 
       return instantsTimeline;
     } else {
@@ -150,29 +96,18 @@ public class HoodieParquetInputFormat extends MapredParquetInputFormat implement
   }
 
   /**
-   * Achieves listStatus functionality for an incrementally queried table. Instead of listing all
-   * partitions and then filtering based on the commits of interest, this logic first extracts the
-   * partitions touched by the desired commits and then lists only those partitions.
+   * Extract partitions touched by the commitsToCheck.
+   * @param commitsToCheck
+   * @param tableMetaClient
+   * @param timeline
+   * @param inputPaths
+   * @return
+   * @throws IOException
    */
-  private List<FileStatus> listStatusForIncrementalMode(
-      JobConf job, HoodieTableMetaClient tableMetaClient, List<Path> inputPaths) throws IOException {
-    String tableName = tableMetaClient.getTableConfig().getTableName();
-    Job jobContext = Job.getInstance(job);
-    HoodieDefaultTimeline baseTimeline;
-    if (HoodieHiveUtil.stopAtCompaction(jobContext, tableName)) {
-      baseTimeline = filterInstantsTimeline(tableMetaClient.getActiveTimeline());
-    } else {
-      baseTimeline = tableMetaClient.getActiveTimeline();
-    }
-
-    HoodieTimeline timeline = baseTimeline.getCommitsTimeline().filterCompletedInstants();
-    String lastIncrementalTs = HoodieHiveUtil.readStartCommitTime(jobContext, tableName);
-    // Total number of commits to return in this batch. Set this to -1 to get all the commits.
-    Integer maxCommits = HoodieHiveUtil.readMaxCommits(jobContext, tableName);
-    LOG.info("Last Incremental timestamp was set as " + lastIncrementalTs);
-    List<HoodieInstant> commitsToCheck = timeline.findInstantsAfter(lastIncrementalTs, maxCommits)
-        .getInstants().collect(Collectors.toList());
-    // Extract partitions touched by the commitsToCheck
+  public static Option<String> getAffectedPartitions(List<HoodieInstant> commitsToCheck,
+                                      HoodieTableMetaClient tableMetaClient,
+                                      HoodieTimeline timeline,
+                                      List<Path> inputPaths) throws IOException {
     Set<String> partitionsToList = new HashSet<>();
     for (HoodieInstant commit : commitsToCheck) {
       HoodieCommitMetadata commitMetadata = HoodieCommitMetadata.fromBytes(timeline.getInstantDetails(commit).get(),
@@ -180,7 +115,7 @@ public class HoodieParquetInputFormat extends MapredParquetInputFormat implement
       partitionsToList.addAll(commitMetadata.getPartitionToWriteStats().keySet());
     }
     if (partitionsToList.isEmpty()) {
-      return null;
+      return Option.empty();
     }
     String incrementalInputPaths = partitionsToList.stream()
         .map(s -> StringUtils.isNullOrEmpty(s) ? tableMetaClient.getBasePath() : tableMetaClient.getBasePath() + Path.SEPARATOR + s)
@@ -211,19 +146,105 @@ public class HoodieParquetInputFormat extends MapredParquetInputFormat implement
           return false;
         })
         .collect(Collectors.joining(","));
-    if (StringUtils.isNullOrEmpty(incrementalInputPaths)) {
-      return null;
+    return Option.of(incrementalInputPaths);
+  }
+
+  /**
+   * Extract HoodieTimeline based on HoodieTableMetaClient.
+   * @param job
+   * @param tableMetaClient
+   * @return
+   */
+  public static Option<HoodieTimeline> getFilteredCommitsTimeline(Job job, HoodieTableMetaClient tableMetaClient) {
+    String tableName = tableMetaClient.getTableConfig().getTableName();
+    HoodieDefaultTimeline baseTimeline;
+    if (HoodieHiveUtils.stopAtCompaction(job, tableName)) {
+      baseTimeline = filterInstantsTimeline(tableMetaClient.getActiveTimeline());
+    } else {
+      baseTimeline = tableMetaClient.getActiveTimeline();
+    }
+    return Option.of(baseTimeline.getCommitsTimeline().filterCompletedInstants());
+  }
+
+  /**
+   * Get commits for incremental query from Hive map reduce configuration.
+   * @param job
+   * @param tableName
+   * @param timeline
+   * @return
+   */
+  public static Option<List<HoodieInstant>> getCommitsForIncrementalQuery(Job job, String tableName, HoodieTimeline timeline) {
+    String lastIncrementalTs = HoodieHiveUtils.readStartCommitTime(job, tableName);
+    // Total number of commits to return in this batch. Set this to -1 to get all the commits.
+    Integer maxCommits = HoodieHiveUtils.readMaxCommits(job, tableName);
+    LOG.info("Last Incremental timestamp was set as " + lastIncrementalTs);
+    return Option.of(timeline.findInstantsAfter(lastIncrementalTs, maxCommits)
+        .getInstants().collect(Collectors.toList()));
+  }
+
+  /**
+   * Extract HoodieTableMetaClient by base path.
+   * @param conf
+   * @param partitions
+   * @return
+   */
+  public static Map<Path, HoodieTableMetaClient> getTableMetaClientByBasePath(Configuration conf, Set<Path> partitions) {
+    Map<String, HoodieTableMetaClient> metaClientMap = new HashMap<>();
+    return partitions.stream().collect(Collectors.toMap(Function.identity(), p -> {
+      // Get meta client if this path is the base path.
+      Option<String> matchingBasePath = Option.fromJavaOptional(
+          metaClientMap.keySet().stream().filter(basePath -> p.toString().startsWith(basePath)).findFirst());
+      if (matchingBasePath.isPresent()) {
+        return metaClientMap.get(matchingBasePath.get());
+      }
+
+      try {
+        HoodieTableMetaClient metaClient = getTableMetaClientForBasePath(p.getFileSystem(conf), p);
+        metaClientMap.put(metaClient.getBasePath(), metaClient);
+        return metaClient;
+      } catch (IOException e) {
+        throw new HoodieIOException("Error creating hoodie meta client against : " + p, e);
+      }
+    }));
+  }
+
+  /**
+   * Extract HoodieTableMetaClient from a partition path(not base path).
+   * @param fs
+   * @param dataPath
+   * @return
+   * @throws IOException
+   */
+  public static HoodieTableMetaClient getTableMetaClientForBasePath(FileSystem fs, Path dataPath) throws IOException {
+    int levels = HoodieHiveUtils.DEFAULT_LEVELS_TO_BASEPATH;
+    if (HoodiePartitionMetadata.hasPartitionMetadata(fs, dataPath)) {
+      HoodiePartitionMetadata metadata = new HoodiePartitionMetadata(fs, dataPath);
+      metadata.readFromFS();
+      levels = metadata.getPartitionDepth();
     }
-    // Mutate the JobConf to set the input paths to only partitions touched by incremental pull.
-    setInputPaths(job, incrementalInputPaths);
-    FileStatus[] fileStatuses = super.listStatus(job);
-    BaseFileOnlyView roView = new HoodieTableFileSystemView(tableMetaClient, timeline, fileStatuses);
+    Path baseDir = HoodieHiveUtils.getNthParent(dataPath, levels);
+    LOG.info("Reading hoodie metadata from path " + baseDir.toString());
+    return new HoodieTableMetaClient(fs.getConf(), baseDir.toString());
+  }
+
+  /**
+   * Filter a list of FileStatus based on commitsToCheck for incremental view.
+   * @param job
+   * @param tableMetaClient
+   * @param timeline
+   * @param fileStatuses
+   * @param commitsToCheck
+   * @return
+   */
+  public static List<FileStatus> filterIncrementalFileStatus(Job job, HoodieTableMetaClient tableMetaClient,
+      HoodieTimeline timeline, FileStatus[] fileStatuses, List<HoodieInstant> commitsToCheck) {
+    TableFileSystemView.BaseFileOnlyView roView = new HoodieTableFileSystemView(tableMetaClient, timeline, fileStatuses);
     List<String> commitsList = commitsToCheck.stream().map(HoodieInstant::getTimestamp).collect(Collectors.toList());
     List<HoodieBaseFile> filteredFiles = roView.getLatestBaseFilesInRange(commitsList).collect(Collectors.toList());
     List<FileStatus> returns = new ArrayList<>();
     for (HoodieBaseFile filteredFile : filteredFiles) {
       LOG.debug("Processing incremental hoodie file - " + filteredFile.getPath());
-      filteredFile = checkFileStatus(filteredFile);
+      filteredFile = refreshFileStatus(job.getConfiguration(), filteredFile);
       returns.add(filteredFile.getFileStatus());
     }
     LOG.info("Total paths to process after hoodie incremental filter " + filteredFiles.size());
@@ -238,7 +259,7 @@ public class HoodieParquetInputFormat extends MapredParquetInputFormat implement
    * @return
    * @throws IOException
    */
-  private Map<HoodieTableMetaClient, List<FileStatus>> groupFileStatusForSnapshotPaths(
+  public static Map<HoodieTableMetaClient, List<FileStatus>> groupFileStatusForSnapshotPaths(
       FileStatus[] fileStatuses, Collection<HoodieTableMetaClient> metaClientList) {
     // This assumes the paths for different tables are grouped together
     Map<HoodieTableMetaClient, List<FileStatus>> grouped = new HashMap<>();
@@ -268,16 +289,20 @@ public class HoodieParquetInputFormat extends MapredParquetInputFormat implement
 
   /**
    * Filters data files for a snapshot queried table.
+   * @param job
+   * @param metadata
+   * @param fileStatuses
+   * @return
    */
-  private List<FileStatus> filterFileStatusForSnapshotMode(
-      HoodieTableMetaClient metadata, List<FileStatus> fileStatuses) {
+  public static List<FileStatus> filterFileStatusForSnapshotMode(
+      JobConf job, HoodieTableMetaClient metadata, List<FileStatus> fileStatuses) {
     FileStatus[] statuses = fileStatuses.toArray(new FileStatus[0]);
     if (LOG.isDebugEnabled()) {
       LOG.debug("Hoodie Metadata initialized with completed commit Ts as :" + metadata);
     }
     // Get all commits, delta commits, compactions, as all of them produce a base parquet file today
     HoodieTimeline timeline = metadata.getActiveTimeline().getCommitsTimeline().filterCompletedInstants();
-    BaseFileOnlyView roView = new HoodieTableFileSystemView(metadata, timeline, statuses);
+    TableFileSystemView.BaseFileOnlyView roView = new HoodieTableFileSystemView(metadata, timeline, statuses);
     // filter files on the latest commit found
     List<HoodieBaseFile> filteredFiles = roView.getLatestBaseFiles().collect(Collectors.toList());
     LOG.info("Total paths to process after hoodie filter " + filteredFiles.size());
@@ -286,7 +311,7 @@ public class HoodieParquetInputFormat extends MapredParquetInputFormat implement
       if (LOG.isDebugEnabled()) {
         LOG.debug("Processing latest hoodie file - " + filteredFile.getPath());
       }
-      filteredFile = checkFileStatus(filteredFile);
+      filteredFile = refreshFileStatus(job, filteredFile);
       returns.add(filteredFile.getFileStatus());
     }
     return returns;
@@ -296,8 +321,11 @@ public class HoodieParquetInputFormat extends MapredParquetInputFormat implement
    * Checks the file status for a race condition which can set the file size to 0. 1. HiveInputFormat does
    * super.listStatus() and gets back a FileStatus[] 2. Then it creates the HoodieTableMetaClient for the paths listed.
    * 3. Generation of splits looks at FileStatus size to create splits, which skips this file
+   * @param conf
+   * @param dataFile
+   * @return
    */
-  private HoodieBaseFile checkFileStatus(HoodieBaseFile dataFile) {
+  private static HoodieBaseFile refreshFileStatus(Configuration conf, HoodieBaseFile dataFile) {
     Path dataPath = dataFile.getFileStatus().getPath();
     try {
       if (dataFile.getFileSize() == 0) {
@@ -311,46 +339,4 @@ public class HoodieParquetInputFormat extends MapredParquetInputFormat implement
     }
   }
 
-  public void setConf(Configuration conf) {
-    this.conf = conf;
-  }
-
-  @Override
-  public Configuration getConf() {
-    return conf;
-  }
-
-  @Override
-  public RecordReader<NullWritable, ArrayWritable> getRecordReader(final InputSplit split, final JobConf job,
-      final Reporter reporter) throws IOException {
-    // TODO enable automatic predicate pushdown after fixing issues
-    // FileSplit fileSplit = (FileSplit) split;
-    // HoodieTableMetadata metadata = getTableMetadata(fileSplit.getPath().getParent());
-    // String tableName = metadata.getTableName();
-    // String mode = HoodieHiveUtil.readMode(job, tableName);
-
-    // if (HoodieHiveUtil.INCREMENTAL_SCAN_MODE.equals(mode)) {
-    // FilterPredicate predicate = constructHoodiePredicate(job, tableName, split);
-    // LOG.info("Setting parquet predicate push down as " + predicate);
-    // ParquetInputFormat.setFilterPredicate(job, predicate);
-    // clearOutExistingPredicate(job);
-    // }
-    return super.getRecordReader(split, job, reporter);
-  }
-
-  /**
-   * Read the table metadata from a data path. This assumes certain hierarchy of files which should be changed once a
-   * better way is figured out to pass in the hoodie meta directory
-   */
-  protected static HoodieTableMetaClient getTableMetaClient(FileSystem fs, Path dataPath) throws IOException {
-    int levels = HoodieHiveUtil.DEFAULT_LEVELS_TO_BASEPATH;
-    if (HoodiePartitionMetadata.hasPartitionMetadata(fs, dataPath)) {
-      HoodiePartitionMetadata metadata = new HoodiePartitionMetadata(fs, dataPath);
-      metadata.readFromFS();
-      levels = metadata.getPartitionDepth();
-    }
-    Path baseDir = HoodieHiveUtil.getNthParent(dataPath, levels);
-    LOG.info("Reading hoodie metadata from path " + baseDir.toString());
-    return new HoodieTableMetaClient(fs.getConf(), baseDir.toString());
-  }
 }
diff --git a/hudi-hadoop-mr/src/main/java/org/apache/hudi/hadoop/utils/HoodieRealtimeInputFormatUtils.java b/hudi-hadoop-mr/src/main/java/org/apache/hudi/hadoop/utils/HoodieRealtimeInputFormatUtils.java
new file mode 100644
index 0000000..7ae4ea0
--- /dev/null
+++ b/hudi-hadoop-mr/src/main/java/org/apache/hudi/hadoop/utils/HoodieRealtimeInputFormatUtils.java
@@ -0,0 +1,154 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hudi.hadoop.utils;
+
+import org.apache.hudi.common.fs.FSUtils;
+import org.apache.hudi.common.model.FileSlice;
+import org.apache.hudi.common.model.HoodieLogFile;
+import org.apache.hudi.common.table.HoodieTableMetaClient;
+import org.apache.hudi.common.table.timeline.HoodieInstant;
+import org.apache.hudi.common.table.timeline.HoodieTimeline;
+import org.apache.hudi.common.table.view.HoodieTableFileSystemView;
+import org.apache.hudi.common.util.CollectionUtils;
+import org.apache.hudi.common.util.Option;
+import org.apache.hudi.exception.HoodieException;
+import org.apache.hudi.exception.HoodieIOException;
+import org.apache.hudi.hadoop.realtime.HoodieRealtimeFileSplit;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileStatus;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.mapred.FileSplit;
+import org.apache.hadoop.mapred.InputSplit;
+import org.apache.log4j.LogManager;
+import org.apache.log4j.Logger;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.stream.Collectors;
+import java.util.stream.Stream;
+
+public class HoodieRealtimeInputFormatUtils extends HoodieInputFormatUtils {
+
+  private static final Logger LOG = LogManager.getLogger(HoodieRealtimeInputFormatUtils.class);
+
+  public static InputSplit[] getRealtimeSplits(Configuration conf, Stream<FileSplit> fileSplits) throws IOException {
+    Map<Path, List<FileSplit>> partitionsToParquetSplits =
+        fileSplits.collect(Collectors.groupingBy(split -> split.getPath().getParent()));
+    // TODO(vc): Should we handle also non-hoodie splits here?
+    Map<Path, HoodieTableMetaClient> partitionsToMetaClient = getTableMetaClientByBasePath(conf, partitionsToParquetSplits.keySet());
+
+    // for all unique split parents, obtain all delta files based on delta commit timeline,
+    // grouped on file id
+    List<HoodieRealtimeFileSplit> rtSplits = new ArrayList<>();
+    partitionsToParquetSplits.keySet().forEach(partitionPath -> {
+      // for each partition path obtain the data & log file groupings, then map back to inputsplits
+      HoodieTableMetaClient metaClient = partitionsToMetaClient.get(partitionPath);
+      HoodieTableFileSystemView fsView = new HoodieTableFileSystemView(metaClient, metaClient.getActiveTimeline());
+      String relPartitionPath = FSUtils.getRelativePartitionPath(new Path(metaClient.getBasePath()), partitionPath);
+
+      try {
+        // Both commit and delta-commits are included - pick the latest completed one
+        Option<HoodieInstant> latestCompletedInstant =
+            metaClient.getActiveTimeline().getCommitsTimeline().filterCompletedInstants().lastInstant();
+
+        Stream<FileSlice> latestFileSlices = latestCompletedInstant
+            .map(instant -> fsView.getLatestMergedFileSlicesBeforeOrOn(relPartitionPath, instant.getTimestamp()))
+            .orElse(Stream.empty());
+
+        // subgroup splits again by file id & match with log files.
+        Map<String, List<FileSplit>> groupedInputSplits = partitionsToParquetSplits.get(partitionPath).stream()
+            .collect(Collectors.groupingBy(split -> FSUtils.getFileId(split.getPath().getName())));
+        latestFileSlices.forEach(fileSlice -> {
+          List<FileSplit> dataFileSplits = groupedInputSplits.get(fileSlice.getFileId());
+          dataFileSplits.forEach(split -> {
+            try {
+              List<String> logFilePaths = fileSlice.getLogFiles().sorted(HoodieLogFile.getLogFileComparator())
+                  .map(logFile -> logFile.getPath().toString()).collect(Collectors.toList());
+              // Get the maxCommit from the last delta or compaction or commit - when
+              // bootstrapped from COW table
+              String maxCommitTime = metaClient
+                  .getActiveTimeline().getTimelineOfActions(CollectionUtils.createSet(HoodieTimeline.COMMIT_ACTION,
+                      HoodieTimeline.ROLLBACK_ACTION, HoodieTimeline.DELTA_COMMIT_ACTION))
+                  .filterCompletedInstants().lastInstant().get().getTimestamp();
+              rtSplits.add(new HoodieRealtimeFileSplit(split, metaClient.getBasePath(), logFilePaths, maxCommitTime));
+            } catch (IOException e) {
+              throw new HoodieIOException("Error creating hoodie real time split ", e);
+            }
+          });
+        });
+      } catch (Exception e) {
+        throw new HoodieException("Error obtaining data file/log file grouping: " + partitionPath, e);
+      }
+    });
+    LOG.info("Returning a total splits of " + rtSplits.size());
+    return rtSplits.toArray(new InputSplit[0]);
+  }
+
+  // Return parquet file with a list of log files in the same file group.
+  public static Map<String, List<String>> groupLogsByBaseFile(Configuration conf, Stream<FileStatus> fileStatuses) {
+    Map<Path, List<FileStatus>> partitionsToParquetSplits =
+        fileStatuses.collect(Collectors.groupingBy(file -> file.getPath().getParent()));
+    // TODO(vc): Should we handle also non-hoodie splits here?
+    Map<Path, HoodieTableMetaClient> partitionsToMetaClient = getTableMetaClientByBasePath(conf, partitionsToParquetSplits.keySet());
+
+    // for all unique split parents, obtain all delta files based on delta commit timeline,
+    // grouped on file id
+    Map<String, List<String>> resultMap = new HashMap<>();
+    partitionsToParquetSplits.keySet().forEach(partitionPath -> {
+      // for each partition path obtain the data & log file groupings, then map back to inputsplits
+      HoodieTableMetaClient metaClient = partitionsToMetaClient.get(partitionPath);
+      HoodieTableFileSystemView fsView = new HoodieTableFileSystemView(metaClient, metaClient.getActiveTimeline());
+      String relPartitionPath = FSUtils.getRelativePartitionPath(new Path(metaClient.getBasePath()), partitionPath);
+
+      try {
+        // Both commit and delta-commits are included - pick the latest completed one
+        Option<HoodieInstant> latestCompletedInstant =
+            metaClient.getActiveTimeline().getCommitsTimeline().filterCompletedInstants().lastInstant();
+
+        Stream<FileSlice> latestFileSlices = latestCompletedInstant
+            .map(instant -> fsView.getLatestMergedFileSlicesBeforeOrOn(relPartitionPath, instant.getTimestamp()))
+            .orElse(Stream.empty());
+
+        // subgroup splits again by file id & match with log files.
+        Map<String, List<FileStatus>> groupedInputSplits = partitionsToParquetSplits.get(partitionPath).stream()
+            .collect(Collectors.groupingBy(file -> FSUtils.getFileId(file.getPath().getName())));
+        latestFileSlices.forEach(fileSlice -> {
+          List<FileStatus> dataFileSplits = groupedInputSplits.get(fileSlice.getFileId());
+          dataFileSplits.forEach(split -> {
+            try {
+              List<String> logFilePaths = fileSlice.getLogFiles().sorted(HoodieLogFile.getLogFileComparator())
+                  .map(logFile -> logFile.getPath().toString()).collect(Collectors.toList());
+              resultMap.put(split.getPath().toString(), logFilePaths);
+            } catch (Exception e) {
+              throw new HoodieException("Error creating hoodie real time split ", e);
+            }
+          });
+        });
+      } catch (Exception e) {
+        throw new HoodieException("Error obtaining data file/log file grouping: " + partitionPath, e);
+      }
+    });
+    return resultMap;
+  }
+
+}
diff --git a/hudi-hadoop-mr/src/main/java/org/apache/hudi/hadoop/realtime/AbstractRealtimeRecordReader.java b/hudi-hadoop-mr/src/main/java/org/apache/hudi/hadoop/utils/HoodieRealtimeRecordReaderUtils.java
similarity index 58%
copy from hudi-hadoop-mr/src/main/java/org/apache/hudi/hadoop/realtime/AbstractRealtimeRecordReader.java
copy to hudi-hadoop-mr/src/main/java/org/apache/hudi/hadoop/utils/HoodieRealtimeRecordReaderUtils.java
index 5f675bb..6af3770 100644
--- a/hudi-hadoop-mr/src/main/java/org/apache/hudi/hadoop/realtime/AbstractRealtimeRecordReader.java
+++ b/hudi-hadoop-mr/src/main/java/org/apache/hudi/hadoop/utils/HoodieRealtimeRecordReaderUtils.java
@@ -16,26 +16,20 @@
  * limitations under the License.
  */
 
-package org.apache.hudi.hadoop.realtime;
+package org.apache.hudi.hadoop.utils;
 
 import org.apache.hudi.avro.HoodieAvroUtils;
-import org.apache.hudi.common.model.HoodieAvroPayload;
-import org.apache.hudi.common.table.HoodieTableMetaClient;
-import org.apache.hudi.common.table.log.LogReaderUtils;
 import org.apache.hudi.common.util.collection.Pair;
 import org.apache.hudi.exception.HoodieException;
 import org.apache.hudi.exception.HoodieIOException;
 
 import org.apache.avro.LogicalTypes;
 import org.apache.avro.Schema;
-import org.apache.avro.Schema.Field;
 import org.apache.avro.generic.GenericArray;
 import org.apache.avro.generic.GenericFixed;
 import org.apache.avro.generic.GenericRecord;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.fs.Path;
-import org.apache.hadoop.hive.metastore.api.hive_metastoreConstants;
-import org.apache.hadoop.hive.serde2.ColumnProjectionUtils;
 import org.apache.hadoop.hive.serde2.io.DoubleWritable;
 import org.apache.hadoop.hive.serde2.io.HiveDecimalWritable;
 import org.apache.hadoop.io.ArrayWritable;
@@ -46,10 +40,6 @@ import org.apache.hadoop.io.IntWritable;
 import org.apache.hadoop.io.LongWritable;
 import org.apache.hadoop.io.Text;
 import org.apache.hadoop.io.Writable;
-import org.apache.hadoop.mapred.JobConf;
-import org.apache.log4j.LogManager;
-import org.apache.log4j.Logger;
-import org.apache.parquet.avro.AvroSchemaConverter;
 import org.apache.parquet.hadoop.ParquetFileReader;
 import org.apache.parquet.schema.MessageType;
 
@@ -63,67 +53,13 @@ import java.util.Set;
 import java.util.TreeMap;
 import java.util.stream.Collectors;
 
-/**
- * Record Reader implementation to merge fresh avro data with base parquet data, to support real time queries.
- */
-public abstract class AbstractRealtimeRecordReader {
-
-  // Fraction of mapper/reducer task memory used for compaction of log files
-  public static final String COMPACTION_MEMORY_FRACTION_PROP = "compaction.memory.fraction";
-  public static final String DEFAULT_COMPACTION_MEMORY_FRACTION = "0.75";
-  // used to choose a trade off between IO vs Memory when performing compaction process
-  // Depending on outputfile size and memory provided, choose true to avoid OOM for large file
-  // size + small memory
-  public static final String COMPACTION_LAZY_BLOCK_READ_ENABLED_PROP = "compaction.lazy.block.read.enabled";
-  public static final String DEFAULT_COMPACTION_LAZY_BLOCK_READ_ENABLED = "true";
-
-  // Property to set the max memory for dfs inputstream buffer size
-  public static final String MAX_DFS_STREAM_BUFFER_SIZE_PROP = "hoodie.memory.dfs.buffer.max.size";
-  // Setting this to lower value of 1 MB since no control over how many RecordReaders will be started in a mapper
-  public static final int DEFAULT_MAX_DFS_STREAM_BUFFER_SIZE = 1024 * 1024; // 1 MB
-  // Property to set file path prefix for spillable file
-  public static final String SPILLABLE_MAP_BASE_PATH_PROP = "hoodie.memory.spillable.map.path";
-  // Default file path prefix for spillable file
-  public static final String DEFAULT_SPILLABLE_MAP_BASE_PATH = "/tmp/";
-
-  private static final Logger LOG = LogManager.getLogger(AbstractRealtimeRecordReader.class);
-
-  protected final HoodieRealtimeFileSplit split;
-  protected final JobConf jobConf;
-  private final MessageType baseFileSchema;
-  protected final boolean usesCustomPayload;
-  // Schema handles
-  private Schema readerSchema;
-  private Schema writerSchema;
-  private Schema hiveSchema;
-
-  public AbstractRealtimeRecordReader(HoodieRealtimeFileSplit split, JobConf job) {
-    this.split = split;
-    this.jobConf = job;
-    LOG.info("cfg ==> " + job.get(ColumnProjectionUtils.READ_COLUMN_NAMES_CONF_STR));
-    LOG.info("columnIds ==> " + job.get(ColumnProjectionUtils.READ_COLUMN_IDS_CONF_STR));
-    LOG.info("partitioningColumns ==> " + job.get(hive_metastoreConstants.META_TABLE_PARTITION_COLUMNS, ""));
-    try {
-      this.usesCustomPayload = usesCustomPayload();
-      LOG.info("usesCustomPayload ==> " + this.usesCustomPayload);
-      baseFileSchema = readSchema(jobConf, split.getPath());
-      init();
-    } catch (IOException e) {
-      throw new HoodieIOException("Could not create HoodieRealtimeRecordReader on path " + this.split.getPath(), e);
-    }
-  }
-
-  private boolean usesCustomPayload() {
-    HoodieTableMetaClient metaClient = new HoodieTableMetaClient(jobConf, split.getBasePath());
-    return !(metaClient.getTableConfig().getPayloadClass().contains(HoodieAvroPayload.class.getName())
-        || metaClient.getTableConfig().getPayloadClass().contains("org.apache.hudi.OverwriteWithLatestAvroPayload"));
-  }
+public class HoodieRealtimeRecordReaderUtils {
 
   /**
    * Reads the schema from the parquet file. This is different from ParquetUtils as it uses the twitter parquet to
    * support hive 1.1.0
    */
-  private static MessageType readSchema(Configuration conf, Path parquetFilePath) {
+  public static MessageType readSchema(Configuration conf, Path parquetFilePath) {
     try {
       return ParquetFileReader.readFooter(conf, parquetFilePath).getFileMetaData().getSchema();
     } catch (IOException e) {
@@ -134,7 +70,7 @@ public abstract class AbstractRealtimeRecordReader {
   /**
    * Prints a JSON representation of the ArrayWritable for easier debuggability.
    */
-  protected static String arrayWritableToString(ArrayWritable writable) {
+  public static String arrayWritableToString(ArrayWritable writable) {
     if (writable == null) {
       return "null";
     }
@@ -161,42 +97,10 @@ public abstract class AbstractRealtimeRecordReader {
   }
 
   /**
-   * Given a comma separated list of field names and positions at which they appear on Hive, return
-   * an ordered list of field names, that can be passed onto storage.
-   */
-  private static List<String> orderFields(String fieldNameCsv, String fieldOrderCsv, List<String> partitioningFields) {
-    // Need to convert the following to Set first since Hive does not handle duplicate field names correctly but
-    // handles duplicate fields orders correctly.
-    // Fields Orders -> {@link https://github
-    // .com/apache/hive/blob/f37c5de6c32b9395d1b34fa3c02ed06d1bfbf6eb/serde/src/java
-    // /org/apache/hadoop/hive/serde2/ColumnProjectionUtils.java#L188}
-    // Field Names -> {@link https://github.com/apache/hive/blob/f37c5de6c32b9395d1b34fa3c02ed06d1bfbf6eb/serde/src/java
-    // /org/apache/hadoop/hive/serde2/ColumnProjectionUtils.java#L229}
-    String[] fieldOrdersWithDups = fieldOrderCsv.split(",");
-    Set<String> fieldOrdersSet = new LinkedHashSet<>(Arrays.asList(fieldOrdersWithDups));
-    String[] fieldOrders = fieldOrdersSet.toArray(new String[0]);
-    List<String> fieldNames = Arrays.stream(fieldNameCsv.split(","))
-        .filter(fn -> !partitioningFields.contains(fn)).collect(Collectors.toList());
-    Set<String> fieldNamesSet = new LinkedHashSet<>(fieldNames);
-    // Hive does not provide ids for partitioning fields, so check for lengths excluding that.
-    if (fieldNamesSet.size() != fieldOrders.length) {
-      throw new HoodieException(String
-          .format("Error ordering fields for storage read. #fieldNames: %d, #fieldPositions: %d",
-              fieldNames.size(), fieldOrders.length));
-    }
-    TreeMap<Integer, String> orderedFieldMap = new TreeMap<>();
-    String[] fieldNamesArray = fieldNamesSet.toArray(new String[0]);
-    for (int ox = 0; ox < fieldOrders.length; ox++) {
-      orderedFieldMap.put(Integer.parseInt(fieldOrders[ox]), fieldNamesArray[ox]);
-    }
-    return new ArrayList<>(orderedFieldMap.values());
-  }
-
-  /**
    * Generate a reader schema off the provided writeSchema, to just project out the provided columns.
    */
-  public static Schema generateProjectionSchema(Schema writeSchema, Map<String, Field> schemaFieldsMap,
-      List<String> fieldNames) {
+  public static Schema generateProjectionSchema(Schema writeSchema, Map<String, Schema.Field> schemaFieldsMap,
+                                                List<String> fieldNames) {
     /**
      * Avro & Presto field names seems to be case sensitive (support fields differing only in case) whereas
      * Hive/Impala/SparkSQL(default) are case-insensitive. Spark allows this to be configurable using
@@ -224,7 +128,7 @@ public abstract class AbstractRealtimeRecordReader {
     return projectedSchema;
   }
 
-  public static Map<String, Field> getNameToFieldMap(Schema schema) {
+  public static Map<String, Schema.Field> getNameToFieldMap(Schema schema) {
     return schema.getFields().stream().map(r -> Pair.of(r.name().toLowerCase(), r))
         .collect(Collectors.toMap(Pair::getLeft, Pair::getRight));
   }
@@ -305,10 +209,10 @@ public abstract class AbstractRealtimeRecordReader {
         if (schema.getLogicalType() != null && schema.getLogicalType().getName().equals("decimal")) {
           LogicalTypes.Decimal decimal = (LogicalTypes.Decimal) LogicalTypes.fromSchema(schema);
           HiveDecimalWritable writable = new HiveDecimalWritable(((GenericFixed) value).bytes(),
-                  decimal.getScale());
+              decimal.getScale());
           return HiveDecimalWritable.enforcePrecisionScale(writable,
-                  decimal.getPrecision(),
-                  decimal.getScale());
+              decimal.getPrecision(),
+              decimal.getScale());
         }
         return new BytesWritable(((GenericFixed) value).bytes());
       default:
@@ -317,6 +221,38 @@ public abstract class AbstractRealtimeRecordReader {
   }
 
   /**
+   * Given a comma separated list of field names and positions at which they appear on Hive, return
+   * an ordered list of field names, that can be passed onto storage.
+   */
+  public static List<String> orderFields(String fieldNameCsv, String fieldOrderCsv, List<String> partitioningFields) {
+    // Need to convert the following to Set first since Hive does not handle duplicate field names correctly but
+    // handles duplicate fields orders correctly.
+    // Fields Orders -> {@link https://github
+    // .com/apache/hive/blob/f37c5de6c32b9395d1b34fa3c02ed06d1bfbf6eb/serde/src/java
+    // /org/apache/hadoop/hive/serde2/ColumnProjectionUtils.java#L188}
+    // Field Names -> {@link https://github.com/apache/hive/blob/f37c5de6c32b9395d1b34fa3c02ed06d1bfbf6eb/serde/src/java
+    // /org/apache/hadoop/hive/serde2/ColumnProjectionUtils.java#L229}
+    String[] fieldOrdersWithDups = fieldOrderCsv.split(",");
+    Set<String> fieldOrdersSet = new LinkedHashSet<>(Arrays.asList(fieldOrdersWithDups));
+    String[] fieldOrders = fieldOrdersSet.toArray(new String[0]);
+    List<String> fieldNames = Arrays.stream(fieldNameCsv.split(","))
+        .filter(fn -> !partitioningFields.contains(fn)).collect(Collectors.toList());
+    Set<String> fieldNamesSet = new LinkedHashSet<>(fieldNames);
+    // Hive does not provide ids for partitioning fields, so check for lengths excluding that.
+    if (fieldNamesSet.size() != fieldOrders.length) {
+      throw new HoodieException(String
+          .format("Error ordering fields for storage read. #fieldNames: %d, #fieldPositions: %d",
+              fieldNames.size(), fieldOrders.length));
+    }
+    TreeMap<Integer, String> orderedFieldMap = new TreeMap<>();
+    String[] fieldNamesArray = fieldNamesSet.toArray(new String[0]);
+    for (int ox = 0; ox < fieldOrders.length; ox++) {
+      orderedFieldMap.put(Integer.parseInt(fieldOrders[ox]), fieldNamesArray[ox]);
+    }
+    return new ArrayList<>(orderedFieldMap.values());
+  }
+
+  /**
    * Hive implementation of ParquetRecordReader results in partition columns not present in the original parquet file to
    * also be part of the projected schema. Hive expects the record reader implementation to return the row in its
    * entirety (with un-projected column having null values). As we use writerSchema for this, make sure writer schema
@@ -324,92 +260,12 @@ public abstract class AbstractRealtimeRecordReader {
    *
    * @param schema Schema to be changed
    */
-  private static Schema addPartitionFields(Schema schema, List<String> partitioningFields) {
+  public static Schema addPartitionFields(Schema schema, List<String> partitioningFields) {
     final Set<String> firstLevelFieldNames =
-        schema.getFields().stream().map(Field::name).map(String::toLowerCase).collect(Collectors.toSet());
+        schema.getFields().stream().map(Schema.Field::name).map(String::toLowerCase).collect(Collectors.toSet());
     List<String> fieldsToAdd = partitioningFields.stream().map(String::toLowerCase)
         .filter(x -> !firstLevelFieldNames.contains(x)).collect(Collectors.toList());
 
     return HoodieAvroUtils.appendNullSchemaFields(schema, fieldsToAdd);
   }
-
-  /**
-   * Goes through the log files in reverse order and finds the schema from the last available data block. If not, falls
-   * back to the schema from the latest parquet file. Finally, sets the partition column and projection fields into the
-   * job conf.
-   */
-  private void init() throws IOException {
-    Schema schemaFromLogFile =
-        LogReaderUtils.readLatestSchemaFromLogFiles(split.getBasePath(), split.getDeltaLogPaths(), jobConf);
-    if (schemaFromLogFile == null) {
-      writerSchema = new AvroSchemaConverter().convert(baseFileSchema);
-      LOG.debug("Writer Schema From Parquet => " + writerSchema.getFields());
-    } else {
-      writerSchema = schemaFromLogFile;
-      LOG.debug("Writer Schema From Log => " + writerSchema.getFields());
-    }
-    // Add partitioning fields to writer schema for resulting row to contain null values for these fields
-    String partitionFields = jobConf.get(hive_metastoreConstants.META_TABLE_PARTITION_COLUMNS, "");
-    List<String> partitioningFields =
-        partitionFields.length() > 0 ? Arrays.stream(partitionFields.split("/")).collect(Collectors.toList())
-            : new ArrayList<>();
-    writerSchema = addPartitionFields(writerSchema, partitioningFields);
-    List<String> projectionFields = orderFields(jobConf.get(ColumnProjectionUtils.READ_COLUMN_NAMES_CONF_STR),
-        jobConf.get(ColumnProjectionUtils.READ_COLUMN_IDS_CONF_STR), partitioningFields);
-
-    Map<String, Field> schemaFieldsMap = getNameToFieldMap(writerSchema);
-    hiveSchema = constructHiveOrderedSchema(writerSchema, schemaFieldsMap);
-    // TODO(vc): In the future, the reader schema should be updated based on log files & be able
-    // to null out fields not present before
-
-    readerSchema = generateProjectionSchema(writerSchema, schemaFieldsMap, projectionFields);
-    LOG.info(String.format("About to read compacted logs %s for base split %s, projecting cols %s",
-        split.getDeltaLogPaths(), split.getPath(), projectionFields));
-  }
-
-  private Schema constructHiveOrderedSchema(Schema writerSchema, Map<String, Field> schemaFieldsMap) {
-    // Get all column names of hive table
-    String hiveColumnString = jobConf.get(hive_metastoreConstants.META_TABLE_COLUMNS);
-    LOG.info("Hive Columns : " + hiveColumnString);
-    String[] hiveColumns = hiveColumnString.split(",");
-    LOG.info("Hive Columns : " + hiveColumnString);
-    List<Field> hiveSchemaFields = new ArrayList<>();
-
-    for (String columnName : hiveColumns) {
-      Field field = schemaFieldsMap.get(columnName.toLowerCase());
-
-      if (field != null) {
-        hiveSchemaFields.add(new Schema.Field(field.name(), field.schema(), field.doc(), field.defaultVal()));
-      } else {
-        // Hive has some extra virtual columns like BLOCK__OFFSET__INSIDE__FILE which do not exist in table schema.
-        // They will get skipped as they won't be found in the original schema.
-        LOG.debug("Skipping Hive Column => " + columnName);
-      }
-    }
-
-    Schema hiveSchema = Schema.createRecord(writerSchema.getName(), writerSchema.getDoc(), writerSchema.getNamespace(),
-        writerSchema.isError());
-    hiveSchema.setFields(hiveSchemaFields);
-    LOG.info("HIVE Schema is :" + hiveSchema.toString(true));
-    return hiveSchema;
-  }
-
-  public Schema getReaderSchema() {
-    return readerSchema;
-  }
-
-  public Schema getWriterSchema() {
-    return writerSchema;
-  }
-
-  public Schema getHiveSchema() {
-    return hiveSchema;
-  }
-
-  public long getMaxCompactionMemoryInBytes() {
-    // jobConf.getMemoryForMapTask() returns in MB
-    return (long) Math
-        .ceil(Double.parseDouble(jobConf.get(COMPACTION_MEMORY_FRACTION_PROP, DEFAULT_COMPACTION_MEMORY_FRACTION))
-            * jobConf.getMemoryForMapTask() * 1024 * 1024L);
-  }
 }
diff --git a/hudi-hadoop-mr/src/test/java/org/apache/hudi/hadoop/TestHoodieParquetInputFormat.java b/hudi-hadoop-mr/src/test/java/org/apache/hudi/hadoop/TestHoodieParquetInputFormat.java
index ad38d33..aa9d828 100644
--- a/hudi-hadoop-mr/src/test/java/org/apache/hudi/hadoop/TestHoodieParquetInputFormat.java
+++ b/hudi-hadoop-mr/src/test/java/org/apache/hudi/hadoop/TestHoodieParquetInputFormat.java
@@ -29,6 +29,7 @@ import org.apache.hudi.common.table.timeline.HoodieTimeline;
 import org.apache.hudi.common.table.timeline.TimelineMetadataUtils;
 import org.apache.hudi.common.testutils.HoodieTestUtils;
 import org.apache.hudi.hadoop.testutils.InputFormatTestUtil;
+import org.apache.hudi.hadoop.utils.HoodieHiveUtils;
 
 import org.apache.avro.Schema;
 import org.apache.hadoop.fs.FileStatus;
@@ -103,7 +104,7 @@ public class TestHoodieParquetInputFormat {
     timeline.setInstants(instants);
 
     // Verify getCommitsTimelineBeforePendingCompaction does not return instants after first compaction instant
-    HoodieTimeline filteredTimeline = new HoodieParquetInputFormat().filterInstantsTimeline(timeline);
+    HoodieTimeline filteredTimeline = inputFormat.filterInstantsTimeline(timeline);
     assertTrue(filteredTimeline.containsInstant(t1));
     assertTrue(filteredTimeline.containsInstant(t2));
     assertFalse(filteredTimeline.containsInstant(t3));
@@ -116,7 +117,7 @@ public class TestHoodieParquetInputFormat {
     instants.remove(t3);
     timeline = new HoodieActiveTimeline(metaClient);
     timeline.setInstants(instants);
-    filteredTimeline = new HoodieParquetInputFormat().filterInstantsTimeline(timeline);
+    filteredTimeline = inputFormat.filterInstantsTimeline(timeline);
 
     // verify all remaining instants are returned.
     assertTrue(filteredTimeline.containsInstant(t1));
@@ -130,7 +131,7 @@ public class TestHoodieParquetInputFormat {
     instants.remove(t5);
     timeline = new HoodieActiveTimeline(metaClient);
     timeline.setInstants(instants);
-    filteredTimeline = new HoodieParquetInputFormat().filterInstantsTimeline(timeline);
+    filteredTimeline = inputFormat.filterInstantsTimeline(timeline);
 
     // verify all remaining instants are returned.
     assertTrue(filteredTimeline.containsInstant(t1));
@@ -267,7 +268,7 @@ public class TestHoodieParquetInputFormat {
     ensureFilesInCommit("Pulling 3 commits from 100, should get us the 1 files from 300 commit", files, "300", 1);
     ensureFilesInCommit("Pulling 3 commits from 100, should get us the 1 files from 200 commit", files, "200", 1);
 
-    InputFormatTestUtil.setupIncremental(jobConf, "100", HoodieHiveUtil.MAX_COMMIT_ALL);
+    InputFormatTestUtil.setupIncremental(jobConf, "100", HoodieHiveUtils.MAX_COMMIT_ALL);
     files = inputFormat.listStatus(jobConf);
 
     assertEquals(5, files.length,
@@ -312,15 +313,15 @@ public class TestHoodieParquetInputFormat {
   public void testGetIncrementalTableNames() throws IOException {
     String[] expectedincrTables = {"db1.raw_trips", "db2.model_trips", "db3.model_trips"};
     JobConf conf = new JobConf();
-    String incrementalMode1 = String.format(HoodieHiveUtil.HOODIE_CONSUME_MODE_PATTERN, expectedincrTables[0]);
-    conf.set(incrementalMode1, HoodieHiveUtil.INCREMENTAL_SCAN_MODE);
-    String incrementalMode2 = String.format(HoodieHiveUtil.HOODIE_CONSUME_MODE_PATTERN, expectedincrTables[1]);
-    conf.set(incrementalMode2,HoodieHiveUtil.INCREMENTAL_SCAN_MODE);
-    String incrementalMode3 = String.format(HoodieHiveUtil.HOODIE_CONSUME_MODE_PATTERN, "db3.model_trips");
-    conf.set(incrementalMode3, HoodieHiveUtil.INCREMENTAL_SCAN_MODE.toLowerCase());
-    String defaultmode = String.format(HoodieHiveUtil.HOODIE_CONSUME_MODE_PATTERN, "db3.first_trips");
-    conf.set(defaultmode, HoodieHiveUtil.DEFAULT_SCAN_MODE);
-    List<String> actualincrTables = HoodieHiveUtil.getIncrementalTableNames(Job.getInstance(conf));
+    String incrementalMode1 = String.format(HoodieHiveUtils.HOODIE_CONSUME_MODE_PATTERN, expectedincrTables[0]);
+    conf.set(incrementalMode1, HoodieHiveUtils.INCREMENTAL_SCAN_MODE);
+    String incrementalMode2 = String.format(HoodieHiveUtils.HOODIE_CONSUME_MODE_PATTERN, expectedincrTables[1]);
+    conf.set(incrementalMode2,HoodieHiveUtils.INCREMENTAL_SCAN_MODE);
+    String incrementalMode3 = String.format(HoodieHiveUtils.HOODIE_CONSUME_MODE_PATTERN, "db3.model_trips");
+    conf.set(incrementalMode3, HoodieHiveUtils.INCREMENTAL_SCAN_MODE.toLowerCase());
+    String defaultmode = String.format(HoodieHiveUtils.HOODIE_CONSUME_MODE_PATTERN, "db3.first_trips");
+    conf.set(defaultmode, HoodieHiveUtils.DEFAULT_SCAN_MODE);
+    List<String> actualincrTables = HoodieHiveUtils.getIncrementalTableNames(Job.getInstance(conf));
     for (String expectedincrTable : expectedincrTables) {
       assertTrue(actualincrTables.contains(expectedincrTable));
     }
diff --git a/hudi-hadoop-mr/src/test/java/org/apache/hudi/hadoop/realtime/TestHoodieRealtimeRecordReader.java b/hudi-hadoop-mr/src/test/java/org/apache/hudi/hadoop/realtime/TestHoodieRealtimeRecordReader.java
index 3e63fef..5a735ce 100644
--- a/hudi-hadoop-mr/src/test/java/org/apache/hudi/hadoop/realtime/TestHoodieRealtimeRecordReader.java
+++ b/hudi-hadoop-mr/src/test/java/org/apache/hudi/hadoop/realtime/TestHoodieRealtimeRecordReader.java
@@ -31,6 +31,7 @@ import org.apache.hudi.common.testutils.SchemaTestUtil;
 import org.apache.hudi.common.util.collection.Pair;
 import org.apache.hudi.exception.HoodieException;
 import org.apache.hudi.hadoop.testutils.InputFormatTestUtil;
+import org.apache.hudi.hadoop.config.HoodieRealtimeConfig;
 
 import org.apache.avro.Schema;
 import org.apache.avro.Schema.Field;
@@ -80,7 +81,7 @@ public class TestHoodieRealtimeRecordReader {
   @BeforeEach
   public void setUp() {
     jobConf = new JobConf();
-    jobConf.set(AbstractRealtimeRecordReader.MAX_DFS_STREAM_BUFFER_SIZE_PROP, String.valueOf(1024 * 1024));
+    jobConf.set(HoodieRealtimeConfig.MAX_DFS_STREAM_BUFFER_SIZE_PROP, String.valueOf(1024 * 1024));
     hadoopConf = HoodieTestUtils.getDefaultHadoopConf();
     fs = FSUtils.getFs(basePath.toString(), hadoopConf);
   }
diff --git a/hudi-hadoop-mr/src/test/java/org/apache/hudi/hadoop/testutils/InputFormatTestUtil.java b/hudi-hadoop-mr/src/test/java/org/apache/hudi/hadoop/testutils/InputFormatTestUtil.java
index 0c6ed7b..05669bb 100644
--- a/hudi-hadoop-mr/src/test/java/org/apache/hudi/hadoop/testutils/InputFormatTestUtil.java
+++ b/hudi-hadoop-mr/src/test/java/org/apache/hudi/hadoop/testutils/InputFormatTestUtil.java
@@ -28,7 +28,7 @@ import org.apache.hudi.common.table.log.block.HoodieCommandBlock;
 import org.apache.hudi.common.table.log.block.HoodieLogBlock;
 import org.apache.hudi.common.testutils.HoodieTestUtils;
 import org.apache.hudi.common.testutils.SchemaTestUtil;
-import org.apache.hudi.hadoop.HoodieHiveUtil;
+import org.apache.hudi.hadoop.utils.HoodieHiveUtils;
 
 import org.apache.avro.Schema;
 import org.apache.avro.generic.GenericRecord;
@@ -104,15 +104,15 @@ public class InputFormatTestUtil {
 
   public static void setupIncremental(JobConf jobConf, String startCommit, int numberOfCommitsToPull) {
     String modePropertyName =
-        String.format(HoodieHiveUtil.HOODIE_CONSUME_MODE_PATTERN, HoodieTestUtils.RAW_TRIPS_TEST_NAME);
-    jobConf.set(modePropertyName, HoodieHiveUtil.INCREMENTAL_SCAN_MODE);
+        String.format(HoodieHiveUtils.HOODIE_CONSUME_MODE_PATTERN, HoodieTestUtils.RAW_TRIPS_TEST_NAME);
+    jobConf.set(modePropertyName, HoodieHiveUtils.INCREMENTAL_SCAN_MODE);
 
     String startCommitTimestampName =
-        String.format(HoodieHiveUtil.HOODIE_START_COMMIT_PATTERN, HoodieTestUtils.RAW_TRIPS_TEST_NAME);
+        String.format(HoodieHiveUtils.HOODIE_START_COMMIT_PATTERN, HoodieTestUtils.RAW_TRIPS_TEST_NAME);
     jobConf.set(startCommitTimestampName, startCommit);
 
     String maxCommitPulls =
-        String.format(HoodieHiveUtil.HOODIE_MAX_COMMIT_PATTERN, HoodieTestUtils.RAW_TRIPS_TEST_NAME);
+        String.format(HoodieHiveUtils.HOODIE_MAX_COMMIT_PATTERN, HoodieTestUtils.RAW_TRIPS_TEST_NAME);
     jobConf.setInt(maxCommitPulls, numberOfCommitsToPull);
   }