You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hudi.apache.org by vb...@apache.org on 2020/04/21 03:40:14 UTC

[incubator-hudi] branch master updated: [HUDI-371] Supporting hive combine input format for realtime tables (#1503)

This is an automated email from the ASF dual-hosted git repository.

vbalaji pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-hudi.git


The following commit(s) were added to refs/heads/master by this push:
     new 332072b  [HUDI-371] Supporting hive combine input format for realtime tables (#1503)
332072b is described below

commit 332072bc6d5fd04e9325d21e55dfac66bc8f848a
Author: n3nash <na...@uber.com>
AuthorDate: Mon Apr 20 20:40:06 2020 -0700

    [HUDI-371] Supporting hive combine input format for realtime tables (#1503)
---
 .../hudi/common/HoodieMergeOnReadTestUtils.java    |   4 +-
 .../hudi/common/util/collection/ArrayUtils.java    |  62 ++
 hudi-hadoop-mr/pom.xml                             |   2 +-
 .../hadoop/hive/HoodieCombineHiveInputFormat.java  | 626 ++++++++++++---------
 .../hive/HoodieCombineRealtimeFileSplit.java       | 169 ++++++
 .../hive/HoodieCombineRealtimeHiveSplit.java       |  44 ++
 .../realtime/AbstractRealtimeRecordReader.java     |   3 +
 .../HoodieCombineRealtimeRecordReader.java         | 103 ++++
 .../realtime/HoodieParquetRealtimeInputFormat.java |   2 +-
 .../realtime/HoodieRealtimeRecordReader.java       |   1 +
 .../realtime/RealtimeUnmergedRecordReader.java     |  22 +-
 .../apache/hudi/hadoop/InputFormatTestUtil.java    | 100 ++++
 .../hudi/hadoop/TestHoodieParquetInputFormat.java  |  28 +-
 .../realtime/TestHoodieCombineHiveInputFormat.java | 160 ++++++
 .../realtime/TestHoodieRealtimeRecordReader.java   |  91 +--
 15 files changed, 1045 insertions(+), 372 deletions(-)

diff --git a/hudi-client/src/test/java/org/apache/hudi/common/HoodieMergeOnReadTestUtils.java b/hudi-client/src/test/java/org/apache/hudi/common/HoodieMergeOnReadTestUtils.java
index 24430fb..1a65a46 100644
--- a/hudi-client/src/test/java/org/apache/hudi/common/HoodieMergeOnReadTestUtils.java
+++ b/hudi-client/src/test/java/org/apache/hudi/common/HoodieMergeOnReadTestUtils.java
@@ -50,7 +50,7 @@ public class HoodieMergeOnReadTestUtils {
   }
 
   public static List<GenericRecord> getRecordsUsingInputFormat(List<String> inputPaths, String basePath,
-                                                                Configuration conf) {
+      Configuration conf) {
     JobConf jobConf = new JobConf(conf);
     return getRecordsUsingInputFormat(inputPaths, basePath, jobConf, new HoodieParquetRealtimeInputFormat());
   }
@@ -125,4 +125,4 @@ public class HoodieMergeOnReadTestUtils {
     jobConf.set("mapreduce.input.fileinputformat.inputdir", inputPath);
     jobConf.set("map.input.dir", inputPath);
   }
-}
+}
\ No newline at end of file
diff --git a/hudi-common/src/main/java/org/apache/hudi/common/util/collection/ArrayUtils.java b/hudi-common/src/main/java/org/apache/hudi/common/util/collection/ArrayUtils.java
new file mode 100644
index 0000000..cc76c9d
--- /dev/null
+++ b/hudi-common/src/main/java/org/apache/hudi/common/util/collection/ArrayUtils.java
@@ -0,0 +1,62 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hudi.common.util.collection;
+
+/**
+ * <p>Operations on arrays, primitive arrays (like {@code int[]}) and
+ * primitive wrapper arrays (like {@code Integer[]}).</p>
+ *
+ * <p>This class tries to handle {@code null} input gracefully.
+ * An exception will not be thrown for a {@code null}
+ * array input. However, an Object array that contains a {@code null}
+ * element may throw an exception. Each method documents its behaviour.</p>
+ *
+ * NOTE : Adapted from org.apache.commons.lang3.ArrayUtils
+ */
+public class ArrayUtils {
+
+  /**
+   * An empty immutable {@code long} array.
+   */
+  public static final long[] EMPTY_LONG_ARRAY = new long[0];
+
+  // Long array converters
+  // ----------------------------------------------------------------------
+  /**
+   * <p>Converts an array of object Longs to primitives.</p>
+   *
+   * <p>This method returns {@code null} for a {@code null} input array.</p>
+   *
+   * @param array  a {@code Long} array, may be {@code null}
+   * @return a {@code long} array, {@code null} if null array input
+   * @throws NullPointerException if array content is {@code null}
+   */
+  public static long[] toPrimitive(Long[] array) {
+    if (array == null) {
+      return null;
+    } else if (array.length == 0) {
+      return EMPTY_LONG_ARRAY;
+    }
+    final long[] result = new long[array.length];
+    for (int i = 0; i < array.length; i++) {
+      result[i] = array[i].longValue();
+    }
+    return result;
+  }
+}
diff --git a/hudi-hadoop-mr/pom.xml b/hudi-hadoop-mr/pom.xml
index a77932c..8d21bda 100644
--- a/hudi-hadoop-mr/pom.xml
+++ b/hudi-hadoop-mr/pom.xml
@@ -150,4 +150,4 @@
       </plugin>
     </plugins>
   </build>
-</project>
+</project>
\ No newline at end of file
diff --git a/hudi-hadoop-mr/src/main/java/org/apache/hudi/hadoop/hive/HoodieCombineHiveInputFormat.java b/hudi-hadoop-mr/src/main/java/org/apache/hudi/hadoop/hive/HoodieCombineHiveInputFormat.java
index 27a8318..9f024e9 100644
--- a/hudi-hadoop-mr/src/main/java/org/apache/hudi/hadoop/hive/HoodieCombineHiveInputFormat.java
+++ b/hudi-hadoop-mr/src/main/java/org/apache/hudi/hadoop/hive/HoodieCombineHiveInputFormat.java
@@ -18,7 +18,11 @@
 
 package org.apache.hudi.hadoop.hive;
 
+import org.apache.hudi.common.util.ReflectionUtils;
+import org.apache.hudi.common.util.ValidationUtils;
+import org.apache.hudi.exception.HoodieException;
 import org.apache.hudi.hadoop.HoodieParquetInputFormat;
+import org.apache.hudi.hadoop.realtime.HoodieCombineRealtimeRecordReader;
 import org.apache.hudi.hadoop.realtime.HoodieParquetRealtimeInputFormat;
 
 import org.apache.hadoop.conf.Configuration;
@@ -28,11 +32,13 @@ import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.fs.PathFilter;
 import org.apache.hadoop.hive.common.StringInternUtils;
 import org.apache.hadoop.hive.conf.HiveConf;
+import org.apache.hadoop.hive.metastore.api.hive_metastoreConstants;
 import org.apache.hadoop.hive.ql.exec.Operator;
 import org.apache.hadoop.hive.ql.exec.Utilities;
 import org.apache.hadoop.hive.ql.io.CombineHiveRecordReader;
 import org.apache.hadoop.hive.ql.io.HiveFileFormatUtils;
 import org.apache.hadoop.hive.ql.io.HiveInputFormat;
+import org.apache.hadoop.hive.ql.io.IOContextMap;
 import org.apache.hadoop.hive.ql.io.IOPrepareCache;
 import org.apache.hadoop.hive.ql.io.SymlinkTextInputFormat;
 import org.apache.hadoop.hive.ql.log.PerfLogger;
@@ -45,9 +51,11 @@ import org.apache.hadoop.hive.shims.HadoopShims.CombineFileInputFormatShim;
 import org.apache.hadoop.hive.shims.HadoopShimsSecure;
 import org.apache.hadoop.hive.shims.HadoopShimsSecure.InputSplitShim;
 import org.apache.hadoop.hive.shims.ShimLoader;
+import org.apache.hadoop.io.Text;
 import org.apache.hadoop.io.Writable;
 import org.apache.hadoop.io.WritableComparable;
 import org.apache.hadoop.mapred.FileInputFormat;
+import org.apache.hadoop.mapred.FileSplit;
 import org.apache.hadoop.mapred.InputFormat;
 import org.apache.hadoop.mapred.InputSplit;
 import org.apache.hadoop.mapred.JobConf;
@@ -67,6 +75,7 @@ import java.util.Arrays;
 import java.util.Collections;
 import java.util.HashMap;
 import java.util.HashSet;
+import java.util.LinkedList;
 import java.util.List;
 import java.util.Map;
 import java.util.Objects;
@@ -99,250 +108,6 @@ public class HoodieCombineHiveInputFormat<K extends WritableComparable, V extend
   private static final int MAX_CHECK_NONCOMBINABLE_THREAD_NUM = 50;
   private static final int DEFAULT_NUM_PATH_PER_THREAD = 100;
 
-  private class CheckNonCombinablePathCallable implements Callable<Set<Integer>> {
-
-    private final Path[] paths;
-    private final int start;
-    private final int length;
-    private final JobConf conf;
-
-    public CheckNonCombinablePathCallable(Path[] paths, int start, int length, JobConf conf) {
-      this.paths = paths;
-      this.start = start;
-      this.length = length;
-      this.conf = conf;
-    }
-
-    @Override
-    public Set<Integer> call() throws Exception {
-      Set<Integer> nonCombinablePathIndices = new HashSet<>();
-      for (int i = 0; i < length; i++) {
-        PartitionDesc part = HiveFileFormatUtils.getPartitionDescFromPathRecursively(pathToPartitionInfo,
-            paths[i + start], IOPrepareCache.get().allocatePartitionDescMap());
-        // Use HiveInputFormat if any of the paths is not splittable
-        Class<? extends InputFormat> inputFormatClass = part.getInputFileFormatClass();
-        InputFormat<WritableComparable, Writable> inputFormat = getInputFormatFromCache(inputFormatClass, conf);
-        if (inputFormat instanceof AvoidSplitCombination
-            && ((AvoidSplitCombination) inputFormat).shouldSkipCombine(paths[i + start], conf)) {
-          if (LOG.isDebugEnabled()) {
-            LOG.debug("The path [" + paths[i + start] + "] is being parked for HiveInputFormat.getSplits");
-          }
-          nonCombinablePathIndices.add(i + start);
-        }
-      }
-      return nonCombinablePathIndices;
-    }
-  }
-
-  /**
-   * CombineHiveInputSplit encapsulates an InputSplit with its corresponding inputFormatClassName. A
-   * CombineHiveInputSplit comprises of multiple chunks from different files. Since, they belong to a single directory,
-   * there is a single inputformat for all the chunks.
-   */
-  public static class CombineHiveInputSplit extends InputSplitShim {
-
-    private String inputFormatClassName;
-    private CombineFileSplit inputSplitShim;
-    private Map<Path, PartitionDesc> pathToPartitionInfo;
-
-    public CombineHiveInputSplit() throws IOException {
-      this(ShimLoader.getHadoopShims().getCombineFileInputFormat().getInputSplitShim());
-    }
-
-    public CombineHiveInputSplit(CombineFileSplit inputSplitShim) throws IOException {
-      this(inputSplitShim.getJob(), inputSplitShim);
-    }
-
-    public CombineHiveInputSplit(JobConf job, CombineFileSplit inputSplitShim) throws IOException {
-      this(job, inputSplitShim, null);
-    }
-
-    public CombineHiveInputSplit(JobConf job, CombineFileSplit inputSplitShim,
-        Map<Path, PartitionDesc> pathToPartitionInfo) throws IOException {
-      this.inputSplitShim = inputSplitShim;
-      this.pathToPartitionInfo = pathToPartitionInfo;
-      if (job != null) {
-        if (this.pathToPartitionInfo == null) {
-          this.pathToPartitionInfo = Utilities.getMapWork(job).getPathToPartitionInfo();
-        }
-
-        // extract all the inputFormatClass names for each chunk in the
-        // CombinedSplit.
-        Path[] ipaths = inputSplitShim.getPaths();
-        if (ipaths.length > 0) {
-          PartitionDesc part = HiveFileFormatUtils.getPartitionDescFromPathRecursively(this.pathToPartitionInfo,
-              ipaths[0], IOPrepareCache.get().getPartitionDescMap());
-          inputFormatClassName = part.getInputFileFormatClass().getName();
-        }
-      }
-    }
-
-    public CombineFileSplit getInputSplitShim() {
-      return inputSplitShim;
-    }
-
-    /**
-     * Returns the inputFormat class name for the i-th chunk.
-     */
-    public String inputFormatClassName() {
-      return inputFormatClassName;
-    }
-
-    public void setInputFormatClassName(String inputFormatClassName) {
-      this.inputFormatClassName = inputFormatClassName;
-    }
-
-    @Override
-    public JobConf getJob() {
-      return inputSplitShim.getJob();
-    }
-
-    @Override
-    public long getLength() {
-      return inputSplitShim.getLength();
-    }
-
-    /**
-     * Returns an array containing the startoffsets of the files in the split.
-     */
-    @Override
-    public long[] getStartOffsets() {
-      return inputSplitShim.getStartOffsets();
-    }
-
-    /**
-     * Returns an array containing the lengths of the files in the split.
-     */
-    @Override
-    public long[] getLengths() {
-      return inputSplitShim.getLengths();
-    }
-
-    /**
-     * Returns the start offset of the i<sup>th</sup> Path.
-     */
-    @Override
-    public long getOffset(int i) {
-      return inputSplitShim.getOffset(i);
-    }
-
-    /**
-     * Returns the length of the i<sup>th</sup> Path.
-     */
-    @Override
-    public long getLength(int i) {
-      return inputSplitShim.getLength(i);
-    }
-
-    /**
-     * Returns the number of Paths in the split.
-     */
-    @Override
-    public int getNumPaths() {
-      return inputSplitShim.getNumPaths();
-    }
-
-    /**
-     * Returns the i<sup>th</sup> Path.
-     */
-    @Override
-    public Path getPath(int i) {
-      return inputSplitShim.getPath(i);
-    }
-
-    /**
-     * Returns all the Paths in the split.
-     */
-    @Override
-    public Path[] getPaths() {
-      return inputSplitShim.getPaths();
-    }
-
-    /**
-     * Returns all the Paths where this input-split resides.
-     */
-    @Override
-    public String[] getLocations() throws IOException {
-      return inputSplitShim.getLocations();
-    }
-
-    /**
-     * Prints this obejct as a string.
-     */
-    @Override
-    public String toString() {
-      StringBuilder sb = new StringBuilder();
-      sb.append(inputSplitShim.toString());
-      sb.append("InputFormatClass: " + inputFormatClassName);
-      sb.append("\n");
-      return sb.toString();
-    }
-
-    /**
-     * Writable interface.
-     */
-    @Override
-    public void readFields(DataInput in) throws IOException {
-      inputSplitShim.readFields(in);
-      inputFormatClassName = in.readUTF();
-    }
-
-    /**
-     * Writable interface.
-     */
-    @Override
-    public void write(DataOutput out) throws IOException {
-      inputSplitShim.write(out);
-      if (inputFormatClassName == null) {
-        if (pathToPartitionInfo == null) {
-          pathToPartitionInfo = Utilities.getMapWork(getJob()).getPathToPartitionInfo();
-        }
-
-        // extract all the inputFormatClass names for each chunk in the
-        // CombinedSplit.
-        PartitionDesc part = HiveFileFormatUtils.getPartitionDescFromPathRecursively(pathToPartitionInfo,
-            inputSplitShim.getPath(0), IOPrepareCache.get().getPartitionDescMap());
-
-        // create a new InputFormat instance if this is the first time to see
-        // this class
-        inputFormatClassName = part.getInputFileFormatClass().getName();
-      }
-
-      out.writeUTF(inputFormatClassName);
-    }
-  }
-
-  // Splits are not shared across different partitions with different input formats.
-  // For example, 2 partitions (1 sequencefile and 1 rcfile) will have 2 different splits
-  private static class CombinePathInputFormat {
-
-    private final List<Operator<? extends OperatorDesc>> opList;
-    private final String inputFormatClassName;
-    private final String deserializerClassName;
-
-    public CombinePathInputFormat(List<Operator<? extends OperatorDesc>> opList, String inputFormatClassName,
-        String deserializerClassName) {
-      this.opList = opList;
-      this.inputFormatClassName = inputFormatClassName;
-      this.deserializerClassName = deserializerClassName;
-    }
-
-    @Override
-    public boolean equals(Object o) {
-      if (o instanceof CombinePathInputFormat) {
-        CombinePathInputFormat mObj = (CombinePathInputFormat) o;
-        return (opList.equals(mObj.opList)) && (inputFormatClassName.equals(mObj.inputFormatClassName))
-            && (Objects.equals(deserializerClassName, mObj.deserializerClassName));
-      }
-      return false;
-    }
-
-    @Override
-    public int hashCode() {
-      return (opList == null) ? 0 : opList.hashCode();
-    }
-  }
-
   /**
    * Create Hive splits based on CombineFileSplit.
    */
@@ -391,6 +156,16 @@ public class HoodieCombineHiveInputFormat<K extends WritableComparable, V extend
         LOG.info("Setting hoodie filter and realtime input format");
         combine.setHoodieFilter(true);
         combine.setRealTime(true);
+        if (job.get(hive_metastoreConstants.META_TABLE_PARTITION_COLUMNS, "").isEmpty()) {
+          List<String> partitions = new ArrayList<>(part.getPartSpec().keySet());
+          if (!partitions.isEmpty()) {
+            String partitionStr = String.join(",", partitions);
+            LOG.info("Setting Partitions in jobConf - Partition Keys for Path : " + path + " is :" + partitionStr);
+            job.set(hive_metastoreConstants.META_TABLE_PARTITION_COLUMNS, partitionStr);
+          } else {
+            job.set(hive_metastoreConstants.META_TABLE_PARTITION_COLUMNS, "");
+          }
+        }
       }
       String deserializerClassName = null;
       try {
@@ -472,7 +247,16 @@ public class HoodieCombineHiveInputFormat<K extends WritableComparable, V extend
     }
 
     for (CombineFileSplit is : iss) {
-      CombineHiveInputSplit csplit = new CombineHiveInputSplit(job, is, pathToPartitionInfo);
+      final InputSplit csplit;
+      if (combine.isRealTime) {
+        if (is instanceof HoodieCombineRealtimeHiveSplit) {
+          csplit = is;
+        } else {
+          csplit = new HoodieCombineRealtimeHiveSplit(job, is, pathToPartitionInfo);
+        }
+      } else {
+        csplit = new CombineHiveInputSplit(job, is, pathToPartitionInfo);
+      }
       result.add(csplit);
     }
 
@@ -727,13 +511,249 @@ public class HoodieCombineHiveInputFormat<K extends WritableComparable, V extend
 
     pushProjectionsAndFilters(job, inputFormatClass, hsplit.getPath(0));
 
-    return ShimLoader.getHadoopShims().getCombineFileInputFormat().getRecordReader(job, (CombineFileSplit) split,
-        reporter, CombineHiveRecordReader.class);
+    if (inputFormatClass.getName().equals(HoodieParquetInputFormat.class.getName())) {
+      return ShimLoader.getHadoopShims().getCombineFileInputFormat().getRecordReader(job, (CombineFileSplit) split,
+          reporter, CombineHiveRecordReader.class);
+    } else if (inputFormatClass.getName().equals(HoodieParquetRealtimeInputFormat.class.getName())) {
+      HoodieCombineFileInputFormatShim shims = new HoodieCombineFileInputFormatShim();
+      IOContextMap.get(job).setInputPath(((CombineHiveInputSplit) split).getPath(0));
+      return shims.getRecordReader(job, ((CombineHiveInputSplit) split).getInputSplitShim(),
+          reporter, CombineHiveRecordReader.class);
+    } else {
+      throw new HoodieException("Unexpected input format : " + inputFormatClassName);
+    }
   }
 
-  static class CombineFilter implements PathFilter {
+  /**
+   * This is a marker interface that is used to identify the formats where combine split generation is not applicable.
+   */
+  public interface AvoidSplitCombination {
 
-    private final Set<String> pStrings = new HashSet<>();
+    boolean shouldSkipCombine(Path path, Configuration conf) throws IOException;
+  }
+
+  /**
+   * CombineHiveInputSplit encapsulates an InputSplit with its corresponding inputFormatClassName. A
+   * CombineHiveInputSplit comprises of multiple chunks from different files. Since, they belong to a single directory,
+   * there is a single inputformat for all the chunks.
+   */
+  public static class CombineHiveInputSplit extends InputSplitShim {
+
+    private String inputFormatClassName;
+    protected CombineFileSplit inputSplitShim;
+    private Map<Path, PartitionDesc> pathToPartitionInfo;
+
+    public CombineHiveInputSplit() throws IOException {
+      this(ShimLoader.getHadoopShims().getCombineFileInputFormat().getInputSplitShim());
+    }
+
+    public CombineHiveInputSplit(CombineFileSplit inputSplitShim) throws IOException {
+      this(inputSplitShim.getJob(), inputSplitShim);
+    }
+
+    public CombineHiveInputSplit(JobConf job, CombineFileSplit inputSplitShim) throws IOException {
+      this(job, inputSplitShim, null);
+    }
+
+    public CombineHiveInputSplit(JobConf job, CombineFileSplit inputSplitShim,
+        Map<Path, PartitionDesc> pathToPartitionInfo) throws IOException {
+      this.inputSplitShim = inputSplitShim;
+      this.pathToPartitionInfo = pathToPartitionInfo;
+      if (job != null) {
+        if (this.pathToPartitionInfo == null) {
+          this.pathToPartitionInfo = Utilities.getMapWork(job).getPathToPartitionInfo();
+        }
+
+        // extract all the inputFormatClass names for each chunk in the
+        // CombinedSplit.
+        Path[] ipaths = inputSplitShim.getPaths();
+        if (ipaths.length > 0) {
+          PartitionDesc part = HiveFileFormatUtils.getPartitionDescFromPathRecursively(this.pathToPartitionInfo,
+              ipaths[0], IOPrepareCache.get().getPartitionDescMap());
+          inputFormatClassName = part.getInputFileFormatClass().getName();
+        }
+      }
+    }
+
+    public CombineFileSplit getInputSplitShim() {
+      return inputSplitShim;
+    }
+
+    /**
+     * Returns the inputFormat class name for the i-th chunk.
+     */
+    public String inputFormatClassName() {
+      return inputFormatClassName;
+    }
+
+    public void setInputFormatClassName(String inputFormatClassName) {
+      this.inputFormatClassName = inputFormatClassName;
+    }
+
+    @Override
+    public JobConf getJob() {
+      return inputSplitShim.getJob();
+    }
+
+    @Override
+    public long getLength() {
+      return inputSplitShim.getLength();
+    }
+
+    /**
+     * Returns an array containing the startoffsets of the files in the split.
+     */
+    @Override
+    public long[] getStartOffsets() {
+      return inputSplitShim.getStartOffsets();
+    }
+
+    /**
+     * Returns an array containing the lengths of the files in the split.
+     */
+    @Override
+    public long[] getLengths() {
+      return inputSplitShim.getLengths();
+    }
+
+    /**
+     * Returns the start offset of the i<sup>th</sup> Path.
+     */
+    @Override
+    public long getOffset(int i) {
+      return inputSplitShim.getOffset(i);
+    }
+
+    /**
+     * Returns the length of the i<sup>th</sup> Path.
+     */
+    @Override
+    public long getLength(int i) {
+      return inputSplitShim.getLength(i);
+    }
+
+    /**
+     * Returns the number of Paths in the split.
+     */
+    @Override
+    public int getNumPaths() {
+      return inputSplitShim.getNumPaths();
+    }
+
+    /**
+     * Returns the i<sup>th</sup> Path.
+     */
+    @Override
+    public Path getPath(int i) {
+      return inputSplitShim.getPath(i);
+    }
+
+    /**
+     * Returns all the Paths in the split.
+     */
+    @Override
+    public Path[] getPaths() {
+      return inputSplitShim.getPaths();
+    }
+
+    /**
+     * Returns all the Paths where this input-split resides.
+     */
+    @Override
+    public String[] getLocations() throws IOException {
+      return inputSplitShim.getLocations();
+    }
+
+    /**
+     * Prints this obejct as a string.
+     */
+    @Override
+    public String toString() {
+      StringBuilder sb = new StringBuilder();
+      sb.append(inputSplitShim.toString());
+      sb.append("InputFormatClass: " + inputFormatClassName);
+      sb.append("\n");
+      return sb.toString();
+    }
+
+    /**
+     * Writable interface.
+     */
+    @Override
+    public void readFields(DataInput in) throws IOException {
+      inputFormatClassName = Text.readString(in);
+      if (HoodieParquetRealtimeInputFormat.class.getName().equals(inputFormatClassName)) {
+        String inputShimClassName = Text.readString(in);
+        inputSplitShim = ReflectionUtils.loadClass(inputShimClassName);
+        inputSplitShim.readFields(in);
+      } else {
+        inputSplitShim.readFields(in);
+      }
+    }
+
+    /**
+     * Writable interface.
+     */
+    @Override
+    public void write(DataOutput out) throws IOException {
+      if (inputFormatClassName == null) {
+        if (pathToPartitionInfo == null) {
+          pathToPartitionInfo = Utilities.getMapWork(getJob()).getPathToPartitionInfo();
+        }
+
+        // extract all the inputFormatClass names for each chunk in the
+        // CombinedSplit.
+        PartitionDesc part = HiveFileFormatUtils.getPartitionDescFromPathRecursively(pathToPartitionInfo,
+            inputSplitShim.getPath(0), IOPrepareCache.get().getPartitionDescMap());
+
+        // create a new InputFormat instance if this is the first time to see
+        // this class
+        inputFormatClassName = part.getInputFileFormatClass().getName();
+      }
+      Text.writeString(out, inputFormatClassName);
+      if (HoodieParquetRealtimeInputFormat.class.getName().equals(inputFormatClassName)) {
+        // Write Shim Class Name
+        Text.writeString(out, inputSplitShim.getClass().getName());
+      }
+      inputSplitShim.write(out);
+    }
+  }
+
+  // Splits are not shared across different partitions with different input formats.
+  // For example, 2 partitions (1 sequencefile and 1 rcfile) will have 2 different splits
+  private static class CombinePathInputFormat {
+
+    private final List<Operator<? extends OperatorDesc>> opList;
+    private final String inputFormatClassName;
+    private final String deserializerClassName;
+
+    public CombinePathInputFormat(List<Operator<? extends OperatorDesc>> opList, String inputFormatClassName,
+        String deserializerClassName) {
+      this.opList = opList;
+      this.inputFormatClassName = inputFormatClassName;
+      this.deserializerClassName = deserializerClassName;
+    }
+
+    @Override
+    public boolean equals(Object o) {
+      if (o instanceof CombinePathInputFormat) {
+        CombinePathInputFormat mObj = (CombinePathInputFormat) o;
+        return (opList.equals(mObj.opList)) && (inputFormatClassName.equals(mObj.inputFormatClassName))
+            && (deserializerClassName == null ? (mObj.deserializerClassName == null)
+            : deserializerClassName.equals(mObj.deserializerClassName));
+      }
+      return false;
+    }
+
+    @Override
+    public int hashCode() {
+      return (opList == null) ? 0 : opList.hashCode();
+    }
+  }
+
+  static class CombineFilter implements PathFilter {
+
+    private final Set<String> pStrings = new HashSet<>();
 
     // store a path prefix in this TestFilter
     // PRECONDITION: p should always be a directory
@@ -776,14 +796,6 @@ public class HoodieCombineHiveInputFormat<K extends WritableComparable, V extend
   }
 
   /**
-   * This is a marker interface that is used to identify the formats where combine split generation is not applicable.
-   */
-  public interface AvoidSplitCombination {
-
-    boolean shouldSkipCombine(Path path, Configuration conf) throws IOException;
-  }
-
-  /**
    * **MOD** This is the implementation of CombineFileInputFormat which is a copy of
    * org.apache.hadoop.hive.shims.HadoopShimsSecure.CombineFileInputFormatShim with changes in listStatus.
    */
@@ -793,7 +805,8 @@ public class HoodieCombineHiveInputFormat<K extends WritableComparable, V extend
     private boolean hoodieFilter = false;
     private boolean isRealTime = false;
 
-    public HoodieCombineFileInputFormatShim() {}
+    public HoodieCombineFileInputFormatShim() {
+    }
 
     @Override
     public Path[] getInputPathsShim(JobConf conf) {
@@ -840,6 +853,7 @@ public class HoodieCombineHiveInputFormat<K extends WritableComparable, V extend
     @Override
     public CombineFileSplit[] getSplits(JobConf job, int numSplits) throws IOException {
       long minSize = job.getLong(org.apache.hadoop.mapreduce.lib.input.FileInputFormat.SPLIT_MINSIZE, 0L);
+      long maxSize = job.getLong(org.apache.hadoop.mapreduce.lib.input.FileInputFormat.SPLIT_MAXSIZE, minSize);
       if (job.getLong("mapreduce.input.fileinputformat.split.minsize.per.node", 0L) == 0L) {
         super.setMinSplitSizeNode(minSize);
       }
@@ -851,19 +865,48 @@ public class HoodieCombineHiveInputFormat<K extends WritableComparable, V extend
       if (job.getLong(org.apache.hadoop.mapreduce.lib.input.FileInputFormat.SPLIT_MAXSIZE, 0L) == 0L) {
         super.setMaxSplitSize(minSize);
       }
-
-      InputSplit[] splits = super.getSplits(job, numSplits);
-      List<InputSplitShim> inputSplitShims = new ArrayList<>();
-
-      for (InputSplit inputSplit : splits) {
-        CombineFileSplit split = (CombineFileSplit) inputSplit;
-        if (split.getPaths().length > 0) {
-          inputSplitShims.add(new InputSplitShim(job, split.getPaths(), split.getStartOffsets(),
-              split.getLengths(), split.getLocations()));
+      LOG.info("mapreduce.input.fileinputformat.split.minsize=" + minSize
+          + ", mapreduce.input.fileinputformat.split.maxsize=" + maxSize);
+
+      if (isRealTime) {
+        job.set("hudi.hive.realtime", "true");
+        InputSplit[] splits;
+        if (hoodieFilter) {
+          HoodieParquetInputFormat input = new HoodieParquetRealtimeInputFormat();
+          input.setConf(job);
+          splits = input.getSplits(job, numSplits);
+        } else {
+          splits = super.getSplits(job, numSplits);
+        }
+        ArrayList<CombineFileSplit> combineFileSplits = new ArrayList<>();
+        HoodieCombineRealtimeFileSplit.Builder builder = new HoodieCombineRealtimeFileSplit.Builder();
+        int counter = 0;
+        for (int pos = 0; pos < splits.length; pos++) {
+          if (counter == maxSize - 1 || pos == splits.length - 1) {
+            builder.addSplit((FileSplit)splits[pos]);
+            combineFileSplits.add(builder.build(job));
+            builder = new HoodieCombineRealtimeFileSplit.Builder();
+            counter = 0;
+          } else if (counter < maxSize) {
+            counter++;
+            builder.addSplit((FileSplit)splits[pos]);
+          }
+        }
+        return combineFileSplits.toArray(new CombineFileSplit[combineFileSplits.size()]);
+      } else {
+        InputSplit[] splits = super.getSplits(job, numSplits);
+        ArrayList inputSplitShims = new ArrayList();
+
+        for (int pos = 0; pos < splits.length; ++pos) {
+          CombineFileSplit split = (CombineFileSplit) splits[pos];
+          if (split.getPaths().length > 0) {
+            inputSplitShims.add(new HadoopShimsSecure.InputSplitShim(job, split.getPaths(), split.getStartOffsets(),
+                split.getLengths(), split.getLocations()));
+          }
         }
+        return (CombineFileSplit[]) inputSplitShims
+            .toArray(new HadoopShimsSecure.InputSplitShim[inputSplitShims.size()]);
       }
-
-      return inputSplitShims.toArray(new HadoopShimsSecure.InputSplitShim[inputSplitShims.size()]);
     }
 
     @Override
@@ -874,6 +917,16 @@ public class HoodieCombineHiveInputFormat<K extends WritableComparable, V extend
     @Override
     public RecordReader getRecordReader(JobConf job, CombineFileSplit split, Reporter reporter,
         Class<RecordReader<K, V>> rrClass) throws IOException {
+      isRealTime = Boolean.valueOf(job.get("hudi.hive.realtime", "false"));
+      if (isRealTime) {
+        List<RecordReader> recordReaders = new LinkedList<>();
+        ValidationUtils.checkArgument(split instanceof HoodieCombineRealtimeFileSplit, "Only "
+            + HoodieCombineRealtimeFileSplit.class.getName() + " allowed, found " + split.getClass().getName());
+        for (InputSplit inputSplit : ((HoodieCombineRealtimeFileSplit) split).getRealtimeFileSplits()) {
+          recordReaders.add(new HoodieParquetRealtimeInputFormat().getRecordReader(inputSplit, job, reporter));
+        }
+        return new HoodieCombineRealtimeRecordReader(job, split, recordReaders);
+      }
       return new HadoopShimsSecure.CombineFileRecordReader(job, split, reporter, rrClass);
     }
 
@@ -885,4 +938,39 @@ public class HoodieCombineHiveInputFormat<K extends WritableComparable, V extend
       isRealTime = realTime;
     }
   }
+
+  private class CheckNonCombinablePathCallable implements Callable<Set<Integer>> {
+
+    private final Path[] paths;
+    private final int start;
+    private final int length;
+    private final JobConf conf;
+
+    public CheckNonCombinablePathCallable(Path[] paths, int start, int length, JobConf conf) {
+      this.paths = paths;
+      this.start = start;
+      this.length = length;
+      this.conf = conf;
+    }
+
+    @Override
+    public Set<Integer> call() throws Exception {
+      Set<Integer> nonCombinablePathIndices = new HashSet<Integer>();
+      for (int i = 0; i < length; i++) {
+        PartitionDesc part = HiveFileFormatUtils.getPartitionDescFromPathRecursively(pathToPartitionInfo,
+            paths[i + start], IOPrepareCache.get().allocatePartitionDescMap());
+        // Use HiveInputFormat if any of the paths is not splittable
+        Class<? extends InputFormat> inputFormatClass = part.getInputFileFormatClass();
+        InputFormat<WritableComparable, Writable> inputFormat = getInputFormatFromCache(inputFormatClass, conf);
+        if (inputFormat instanceof AvoidSplitCombination
+            && ((AvoidSplitCombination) inputFormat).shouldSkipCombine(paths[i + start], conf)) {
+          if (LOG.isDebugEnabled()) {
+            LOG.debug("The path [" + paths[i + start] + "] is being parked for HiveInputFormat.getSplits");
+          }
+          nonCombinablePathIndices.add(i + start);
+        }
+      }
+      return nonCombinablePathIndices;
+    }
+  }
 }
diff --git a/hudi-hadoop-mr/src/main/java/org/apache/hudi/hadoop/hive/HoodieCombineRealtimeFileSplit.java b/hudi-hadoop-mr/src/main/java/org/apache/hudi/hadoop/hive/HoodieCombineRealtimeFileSplit.java
new file mode 100644
index 0000000..a30aa17
--- /dev/null
+++ b/hudi-hadoop-mr/src/main/java/org/apache/hudi/hadoop/hive/HoodieCombineRealtimeFileSplit.java
@@ -0,0 +1,169 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hudi.hadoop.hive;
+
+import org.apache.hudi.common.util.ReflectionUtils;
+import org.apache.hudi.common.util.collection.ArrayUtils;
+import org.apache.hudi.exception.HoodieIOException;
+import org.apache.hudi.hadoop.realtime.HoodieRealtimeFileSplit;
+
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.io.Text;
+import org.apache.hadoop.mapred.FileSplit;
+import org.apache.hadoop.mapred.InputSplit;
+import org.apache.hadoop.mapred.JobConf;
+import org.apache.hadoop.mapred.lib.CombineFileSplit;
+
+import java.io.DataInput;
+import java.io.DataOutput;
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.List;
+import java.util.stream.Collectors;
+
+/**
+ * Represents a CombineFileSplit for realtime tables.
+ */
+public class HoodieCombineRealtimeFileSplit extends CombineFileSplit {
+
+  // These are instances of HoodieRealtimeSplits
+  List<FileSplit> realtimeFileSplits = new ArrayList<>();
+
+  public HoodieCombineRealtimeFileSplit() {
+  }
+
+  public HoodieCombineRealtimeFileSplit(JobConf jobConf, List<FileSplit> realtimeFileSplits) {
+    super(jobConf, realtimeFileSplits.stream().map(p ->
+            ((HoodieRealtimeFileSplit) p).getPath()).collect(Collectors.toList()).toArray(new
+            Path[realtimeFileSplits.size()]),
+        ArrayUtils.toPrimitive(realtimeFileSplits.stream().map(p -> ((HoodieRealtimeFileSplit) p).getStart())
+            .collect(Collectors.toList()).toArray(new Long[realtimeFileSplits.size()])),
+        ArrayUtils.toPrimitive(realtimeFileSplits.stream().map(p -> ((HoodieRealtimeFileSplit) p).getLength())
+            .collect(Collectors.toList()).toArray(new Long[realtimeFileSplits.size()])),
+        realtimeFileSplits.stream().map(p -> {
+          try {
+            return Arrays.asList(p.getLocations());
+          } catch (Exception e) {
+            throw new RuntimeException(e);
+          }
+        }).flatMap(List::stream).collect(Collectors.toList()).toArray(new
+            String[realtimeFileSplits.size()]));
+    this.realtimeFileSplits = realtimeFileSplits;
+
+  }
+
+  public List<FileSplit> getRealtimeFileSplits() {
+    return realtimeFileSplits;
+  }
+
+  @Override
+  public String toString() {
+    return "HoodieCombineRealtimeFileSplit{"
+        + "realtimeFileSplits=" + realtimeFileSplits
+        + '}';
+  }
+
+  /**
+   * Writable interface.
+   */
+  @Override
+  public void write(DataOutput out) throws IOException {
+    out.writeInt(realtimeFileSplits.size());
+    for (InputSplit inputSplit: realtimeFileSplits) {
+      Text.writeString(out, inputSplit.getClass().getName());
+      inputSplit.write(out);
+    }
+  }
+
+  public void readFields(DataInput in) throws IOException {
+    int listLength = in.readInt();
+    realtimeFileSplits = new ArrayList<>(listLength);
+    for (int i = 0; i < listLength; i++) {
+      String inputClassName = Text.readString(in);
+      HoodieRealtimeFileSplit inputSplit = ReflectionUtils.loadClass(inputClassName);
+      inputSplit.readFields(in);
+      realtimeFileSplits.add(inputSplit);
+    }
+  }
+
+  public long getLength() {
+    return realtimeFileSplits.size();
+  }
+
+  /** Returns an array containing the start offsets of the files in the split. */
+  public long[] getStartOffsets() {
+    return realtimeFileSplits.stream().mapToLong(x -> 0L).toArray();
+  }
+
+  /** Returns an array containing the lengths of the files in the split. */
+  public long[] getLengths() {
+    return realtimeFileSplits.stream().mapToLong(FileSplit::getLength).toArray();
+  }
+
+  /** Returns the start offset of the i<sup>th</sup> Path. */
+  public long getOffset(int i) {
+    return 0;
+  }
+
+  /** Returns the length of the i<sup>th</sup> Path. */
+  public long getLength(int i) {
+    return realtimeFileSplits.get(i).getLength();
+  }
+
+  /** Returns the number of Paths in the split. */
+  public int getNumPaths() {
+    return realtimeFileSplits.size();
+  }
+
+  /** Returns the i<sup>th</sup> Path. */
+  public Path getPath(int i) {
+    return realtimeFileSplits.get(i).getPath();
+  }
+
+  /** Returns all the Paths in the split. */
+  public Path[] getPaths() {
+    return realtimeFileSplits.stream().map(x -> x.getPath()).toArray(Path[]::new);
+  }
+
+  /** Returns all the Paths where this input-split resides. */
+  public String[] getLocations() throws IOException {
+    return realtimeFileSplits.stream().flatMap(x -> {
+      try {
+        return Arrays.stream(x.getLocations());
+      } catch (IOException e) {
+        throw new HoodieIOException(e.getMessage(), e);
+      }
+    }).toArray(String[]::new);
+  }
+
+  public static class Builder {
+
+    // These are instances of HoodieRealtimeSplits
+    public List<FileSplit> fileSplits = new ArrayList<>();
+
+    public void addSplit(FileSplit split) {
+      fileSplits.add(split);
+    }
+
+    public HoodieCombineRealtimeFileSplit build(JobConf conf) {
+      return new HoodieCombineRealtimeFileSplit(conf, fileSplits);
+    }
+  }
+}
diff --git a/hudi-hadoop-mr/src/main/java/org/apache/hudi/hadoop/hive/HoodieCombineRealtimeHiveSplit.java b/hudi-hadoop-mr/src/main/java/org/apache/hudi/hadoop/hive/HoodieCombineRealtimeHiveSplit.java
new file mode 100644
index 0000000..c29e51b
--- /dev/null
+++ b/hudi-hadoop-mr/src/main/java/org/apache/hudi/hadoop/hive/HoodieCombineRealtimeHiveSplit.java
@@ -0,0 +1,44 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hudi.hadoop.hive;
+
+import java.io.IOException;
+import java.util.Map;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.hive.ql.plan.PartitionDesc;
+import org.apache.hadoop.hive.shims.ShimLoader;
+import org.apache.hadoop.mapred.JobConf;
+import org.apache.hadoop.mapred.lib.CombineFileSplit;
+import org.apache.hudi.hadoop.hive.HoodieCombineHiveInputFormat.CombineHiveInputSplit;
+
+/**
+ * Represents a CombineHiveInputSplit for realtime tables.
+ */
+public class HoodieCombineRealtimeHiveSplit extends CombineHiveInputSplit {
+
+  public HoodieCombineRealtimeHiveSplit() throws IOException {
+    super(ShimLoader.getHadoopShims().getCombineFileInputFormat().getInputSplitShim());
+  }
+
+  public HoodieCombineRealtimeHiveSplit(JobConf jobConf, CombineFileSplit
+      combineFileSplit, Map<Path, PartitionDesc> map)
+      throws IOException {
+    super(jobConf, combineFileSplit, map);
+  }
+}
diff --git a/hudi-hadoop-mr/src/main/java/org/apache/hudi/hadoop/realtime/AbstractRealtimeRecordReader.java b/hudi-hadoop-mr/src/main/java/org/apache/hudi/hadoop/realtime/AbstractRealtimeRecordReader.java
index 1c484e8..3edae5c 100644
--- a/hudi-hadoop-mr/src/main/java/org/apache/hudi/hadoop/realtime/AbstractRealtimeRecordReader.java
+++ b/hudi-hadoop-mr/src/main/java/org/apache/hudi/hadoop/realtime/AbstractRealtimeRecordReader.java
@@ -360,7 +360,9 @@ public abstract class AbstractRealtimeRecordReader {
   private Schema constructHiveOrderedSchema(Schema writerSchema, Map<String, Field> schemaFieldsMap) {
     // Get all column names of hive table
     String hiveColumnString = jobConf.get(hive_metastoreConstants.META_TABLE_COLUMNS);
+    LOG.info("Hive Columns : " + hiveColumnString);
     String[] hiveColumns = hiveColumnString.split(",");
+    LOG.info("Hive Columns : " + hiveColumnString);
     List<Field> hiveSchemaFields = new ArrayList<>();
 
     for (String columnName : hiveColumns) {
@@ -378,6 +380,7 @@ public abstract class AbstractRealtimeRecordReader {
     Schema hiveSchema = Schema.createRecord(writerSchema.getName(), writerSchema.getDoc(), writerSchema.getNamespace(),
         writerSchema.isError());
     hiveSchema.setFields(hiveSchemaFields);
+    LOG.info("HIVE Schema is :" + hiveSchema.toString(true));
     return hiveSchema;
   }
 
diff --git a/hudi-hadoop-mr/src/main/java/org/apache/hudi/hadoop/realtime/HoodieCombineRealtimeRecordReader.java b/hudi-hadoop-mr/src/main/java/org/apache/hudi/hadoop/realtime/HoodieCombineRealtimeRecordReader.java
new file mode 100644
index 0000000..bdf11ed
--- /dev/null
+++ b/hudi-hadoop-mr/src/main/java/org/apache/hudi/hadoop/realtime/HoodieCombineRealtimeRecordReader.java
@@ -0,0 +1,103 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hudi.hadoop.realtime;
+
+import org.apache.hudi.common.util.ValidationUtils;
+import org.apache.hudi.hadoop.hive.HoodieCombineRealtimeFileSplit;
+
+import org.apache.hadoop.io.ArrayWritable;
+import org.apache.hadoop.io.NullWritable;
+import org.apache.hadoop.mapred.InputSplit;
+import org.apache.hadoop.mapred.JobConf;
+import org.apache.hadoop.mapred.RecordReader;
+import org.apache.hadoop.mapred.lib.CombineFileSplit;
+import org.apache.log4j.LogManager;
+import org.apache.log4j.Logger;
+
+import java.io.IOException;
+import java.util.LinkedList;
+import java.util.List;
+
+/**
+ * Allows to read multiple realtime file splits grouped together by CombineInputFormat.
+ */
+public class HoodieCombineRealtimeRecordReader implements RecordReader<NullWritable, ArrayWritable> {
+
+  private static final transient Logger LOG = LogManager.getLogger(HoodieCombineRealtimeRecordReader.class);
+  // RecordReaders for each split
+  List<HoodieRealtimeRecordReader> recordReaders = new LinkedList<>();
+  // Points to the currently iterating record reader
+  HoodieRealtimeRecordReader currentRecordReader;
+
+  public HoodieCombineRealtimeRecordReader(JobConf jobConf, CombineFileSplit split,
+      List<RecordReader> readers) {
+    try {
+      ValidationUtils.checkArgument(((HoodieCombineRealtimeFileSplit) split).getRealtimeFileSplits().size() == readers
+          .size(), "Num Splits does not match number of unique RecordReaders!");
+      for (InputSplit rtSplit : ((HoodieCombineRealtimeFileSplit) split).getRealtimeFileSplits()) {
+        LOG.info("Creating new RealtimeRecordReader for split");
+        recordReaders.add(
+            new HoodieRealtimeRecordReader((HoodieRealtimeFileSplit) rtSplit, jobConf, readers.remove(0)));
+      }
+      currentRecordReader = recordReaders.remove(0);
+    } catch (Exception e) {
+      throw new RuntimeException(e);
+    }
+  }
+
+  @Override
+  public boolean next(NullWritable key, ArrayWritable value) throws IOException {
+    if (this.currentRecordReader.next(key, value)) {
+      LOG.info("Reading from record reader");
+      LOG.info(AbstractRealtimeRecordReader.arrayWritableToString(value));
+      return true;
+    } else if (recordReaders.size() > 0) {
+      this.currentRecordReader.close();
+      this.currentRecordReader = recordReaders.remove(0);
+      return this.currentRecordReader.next(key, value);
+    } else {
+      return false;
+    }
+  }
+
+  @Override
+  public NullWritable createKey() {
+    return this.currentRecordReader.createKey();
+  }
+
+  @Override
+  public ArrayWritable createValue() {
+    return this.currentRecordReader.createValue();
+  }
+
+  @Override
+  public long getPos() throws IOException {
+    return this.currentRecordReader.getPos();
+  }
+
+  @Override
+  public void close() throws IOException {
+    this.currentRecordReader.close();
+  }
+
+  @Override
+  public float getProgress() throws IOException {
+    return this.currentRecordReader.getProgress();
+  }
+}
diff --git a/hudi-hadoop-mr/src/main/java/org/apache/hudi/hadoop/realtime/HoodieParquetRealtimeInputFormat.java b/hudi-hadoop-mr/src/main/java/org/apache/hudi/hadoop/realtime/HoodieParquetRealtimeInputFormat.java
index ebb784d..ce86807 100644
--- a/hudi-hadoop-mr/src/main/java/org/apache/hudi/hadoop/realtime/HoodieParquetRealtimeInputFormat.java
+++ b/hudi-hadoop-mr/src/main/java/org/apache/hudi/hadoop/realtime/HoodieParquetRealtimeInputFormat.java
@@ -262,4 +262,4 @@ public class HoodieParquetRealtimeInputFormat extends HoodieParquetInputFormat i
   public Configuration getConf() {
     return conf;
   }
-}
+}
\ No newline at end of file
diff --git a/hudi-hadoop-mr/src/main/java/org/apache/hudi/hadoop/realtime/HoodieRealtimeRecordReader.java b/hudi-hadoop-mr/src/main/java/org/apache/hudi/hadoop/realtime/HoodieRealtimeRecordReader.java
index c4b79cb..a8af067 100644
--- a/hudi-hadoop-mr/src/main/java/org/apache/hudi/hadoop/realtime/HoodieRealtimeRecordReader.java
+++ b/hudi-hadoop-mr/src/main/java/org/apache/hudi/hadoop/realtime/HoodieRealtimeRecordReader.java
@@ -66,6 +66,7 @@ public class HoodieRealtimeRecordReader implements RecordReader<NullWritable, Ar
         LOG.info("Enabling un-merged reading of realtime records");
         return new RealtimeUnmergedRecordReader(split, jobConf, realReader);
       }
+      LOG.info("Enabling merged reading of realtime records for split " + split);
       return new RealtimeCompactedRecordReader(split, jobConf, realReader);
     } catch (IOException ex) {
       LOG.error("Got exception when constructing record reader", ex);
diff --git a/hudi-hadoop-mr/src/main/java/org/apache/hudi/hadoop/realtime/RealtimeUnmergedRecordReader.java b/hudi-hadoop-mr/src/main/java/org/apache/hudi/hadoop/realtime/RealtimeUnmergedRecordReader.java
index bb31384..f1e81a7 100644
--- a/hudi-hadoop-mr/src/main/java/org/apache/hudi/hadoop/realtime/RealtimeUnmergedRecordReader.java
+++ b/hudi-hadoop-mr/src/main/java/org/apache/hudi/hadoop/realtime/RealtimeUnmergedRecordReader.java
@@ -18,6 +18,15 @@
 
 package org.apache.hudi.hadoop.realtime;
 
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Iterator;
+import java.util.List;
+import org.apache.avro.generic.GenericRecord;
+import org.apache.hadoop.io.ArrayWritable;
+import org.apache.hadoop.io.NullWritable;
+import org.apache.hadoop.mapred.JobConf;
+import org.apache.hadoop.mapred.RecordReader;
 import org.apache.hudi.common.fs.FSUtils;
 import org.apache.hudi.common.table.log.HoodieUnMergedLogRecordScanner;
 import org.apache.hudi.common.util.DefaultSizeEstimator;
@@ -29,17 +38,6 @@ import org.apache.hudi.common.util.queue.IteratorBasedQueueProducer;
 import org.apache.hudi.hadoop.RecordReaderValueIterator;
 import org.apache.hudi.hadoop.SafeParquetRecordReaderWrapper;
 
-import org.apache.avro.generic.GenericRecord;
-import org.apache.hadoop.io.ArrayWritable;
-import org.apache.hadoop.io.NullWritable;
-import org.apache.hadoop.mapred.JobConf;
-import org.apache.hadoop.mapred.RecordReader;
-
-import java.io.IOException;
-import java.util.ArrayList;
-import java.util.Iterator;
-import java.util.List;
-
 class RealtimeUnmergedRecordReader extends AbstractRealtimeRecordReader
     implements RecordReader<NullWritable, ArrayWritable> {
 
@@ -84,7 +82,7 @@ class RealtimeUnmergedRecordReader extends AbstractRealtimeRecordReader
           GenericRecord rec = (GenericRecord) record.getData().getInsertValue(getReaderSchema()).get();
           ArrayWritable aWritable = (ArrayWritable) avroToArrayWritable(rec, getWriterSchema());
           this.executor.getQueue().insertRecord(aWritable);
-        });
+    });
     // Start reading and buffering
     this.executor.startProducers();
   }
diff --git a/hudi-hadoop-mr/src/test/java/org/apache/hudi/hadoop/InputFormatTestUtil.java b/hudi-hadoop-mr/src/test/java/org/apache/hudi/hadoop/InputFormatTestUtil.java
index 0f7bc1e..a575403 100644
--- a/hudi-hadoop-mr/src/test/java/org/apache/hudi/hadoop/InputFormatTestUtil.java
+++ b/hudi-hadoop-mr/src/test/java/org/apache/hudi/hadoop/InputFormatTestUtil.java
@@ -20,14 +20,23 @@ package org.apache.hudi.hadoop;
 
 import org.apache.hudi.avro.HoodieAvroUtils;
 import org.apache.hudi.common.fs.FSUtils;
+import org.apache.hudi.common.model.HoodieLogFile;
 import org.apache.hudi.common.model.HoodieRecord;
 import org.apache.hudi.common.model.HoodieTestUtils;
+import org.apache.hudi.common.table.log.HoodieLogFormat;
+import org.apache.hudi.common.table.log.block.HoodieAvroDataBlock;
+import org.apache.hudi.common.table.log.block.HoodieCommandBlock;
+import org.apache.hudi.common.table.log.block.HoodieLogBlock;
 import org.apache.hudi.common.util.SchemaTestUtil;
 
 import org.apache.avro.Schema;
 import org.apache.avro.generic.GenericRecord;
 import org.apache.avro.generic.IndexedRecord;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.hive.metastore.api.hive_metastoreConstants;
+import org.apache.hadoop.hive.serde2.ColumnProjectionUtils;
 import org.apache.hadoop.mapred.JobConf;
 import org.apache.parquet.avro.AvroParquetWriter;
 import org.junit.rules.TemporaryFolder;
@@ -37,9 +46,12 @@ import java.io.IOException;
 import java.util.ArrayList;
 import java.util.Arrays;
 import java.util.Collections;
+import java.util.HashMap;
 import java.util.List;
+import java.util.Map;
 import java.util.Objects;
 import java.util.UUID;
+import java.util.stream.Collectors;
 
 public class InputFormatTestUtil {
 
@@ -202,4 +214,92 @@ public class InputFormatTestUtil {
     }
 
   }
+
+  public static HoodieLogFormat.Writer writeRollback(File partitionDir, FileSystem fs, String fileId, String baseCommit,
+                                                     String newCommit, String rolledBackInstant, int logVersion)
+      throws InterruptedException, IOException {
+    HoodieLogFormat.Writer writer = HoodieLogFormat.newWriterBuilder().onParentPath(new Path(partitionDir.getPath())).withFileId(fileId)
+        .overBaseCommit(baseCommit).withFs(fs).withLogVersion(logVersion).withLogWriteToken("1-0-1")
+        .withFileExtension(HoodieLogFile.DELTA_EXTENSION).build();
+    // generate metadata
+    Map<HoodieLogBlock.HeaderMetadataType, String> header = new HashMap<>();
+    header.put(HoodieLogBlock.HeaderMetadataType.INSTANT_TIME, newCommit);
+    header.put(HoodieLogBlock.HeaderMetadataType.TARGET_INSTANT_TIME, rolledBackInstant);
+    header.put(HoodieLogBlock.HeaderMetadataType.COMMAND_BLOCK_TYPE,
+        String.valueOf(HoodieCommandBlock.HoodieCommandBlockTypeEnum.ROLLBACK_PREVIOUS_BLOCK.ordinal()));
+    // if update belongs to an existing log file
+    writer = writer.appendBlock(new HoodieCommandBlock(header));
+    return writer;
+  }
+
+  public static HoodieLogFormat.Writer writeDataBlockToLogFile(File partitionDir, FileSystem fs, Schema schema, String
+      fileId,
+                                                               String baseCommit, String newCommit, int numberOfRecords, int offset, int logVersion)
+      throws InterruptedException, IOException {
+    HoodieLogFormat.Writer writer = HoodieLogFormat.newWriterBuilder().onParentPath(new Path(partitionDir.getPath()))
+        .withFileExtension(HoodieLogFile.DELTA_EXTENSION).withFileId(fileId).withLogVersion(logVersion)
+        .withLogWriteToken("1-0-1").overBaseCommit(baseCommit).withFs(fs).build();
+    List<IndexedRecord> records = new ArrayList<>();
+    for (int i = offset; i < offset + numberOfRecords; i++) {
+      records.add(SchemaTestUtil.generateAvroRecordFromJson(schema, i, newCommit, "fileid0"));
+    }
+    Schema writeSchema = records.get(0).getSchema();
+    Map<HoodieLogBlock.HeaderMetadataType, String> header = new HashMap<>();
+    header.put(HoodieLogBlock.HeaderMetadataType.INSTANT_TIME, newCommit);
+    header.put(HoodieLogBlock.HeaderMetadataType.SCHEMA, writeSchema.toString());
+    HoodieAvroDataBlock dataBlock = new HoodieAvroDataBlock(records, header);
+    writer = writer.appendBlock(dataBlock);
+    return writer;
+  }
+
+  public static HoodieLogFormat.Writer writeRollbackBlockToLogFile(File partitionDir, FileSystem fs, Schema schema,
+      String
+          fileId, String baseCommit, String newCommit, String oldCommit, int logVersion)
+      throws InterruptedException, IOException {
+    HoodieLogFormat.Writer writer = HoodieLogFormat.newWriterBuilder().onParentPath(new Path(partitionDir.getPath()))
+        .withFileExtension(HoodieLogFile.DELTA_EXTENSION).withFileId(fileId).overBaseCommit(baseCommit)
+        .withLogVersion(logVersion).withFs(fs).build();
+
+    Map<HoodieLogBlock.HeaderMetadataType, String> header = new HashMap<>();
+    header.put(HoodieLogBlock.HeaderMetadataType.INSTANT_TIME, newCommit);
+    header.put(HoodieLogBlock.HeaderMetadataType.SCHEMA, schema.toString());
+    header.put(HoodieLogBlock.HeaderMetadataType.TARGET_INSTANT_TIME, oldCommit);
+    header.put(HoodieLogBlock.HeaderMetadataType.COMMAND_BLOCK_TYPE,
+        String.valueOf(HoodieCommandBlock.HoodieCommandBlockTypeEnum.ROLLBACK_PREVIOUS_BLOCK.ordinal()));
+    HoodieCommandBlock rollbackBlock = new HoodieCommandBlock(header);
+    writer = writer.appendBlock(rollbackBlock);
+    return writer;
+  }
+
+  public static void setPropsForInputFormat(JobConf jobConf,
+      Schema schema, String hiveColumnTypes) {
+    List<Schema.Field> fields = schema.getFields();
+    String names = fields.stream().map(f -> f.name().toString()).collect(Collectors.joining(","));
+    String postions = fields.stream().map(f -> String.valueOf(f.pos())).collect(Collectors.joining(","));
+    Configuration conf = HoodieTestUtils.getDefaultHadoopConf();
+
+    String hiveColumnNames = fields.stream().filter(field -> !field.name().equalsIgnoreCase("datestr"))
+        .map(Schema.Field::name).collect(Collectors.joining(","));
+    hiveColumnNames = hiveColumnNames + ",datestr";
+    String modifiedHiveColumnTypes = HoodieAvroUtils.addMetadataColumnTypes(hiveColumnTypes);
+    modifiedHiveColumnTypes = modifiedHiveColumnTypes + ",string";
+    jobConf.set(hive_metastoreConstants.META_TABLE_COLUMNS, hiveColumnNames);
+    jobConf.set(hive_metastoreConstants.META_TABLE_COLUMN_TYPES, modifiedHiveColumnTypes);
+    jobConf.set(ColumnProjectionUtils.READ_COLUMN_NAMES_CONF_STR, names);
+    jobConf.set(ColumnProjectionUtils.READ_COLUMN_IDS_CONF_STR, postions);
+    jobConf.set(hive_metastoreConstants.META_TABLE_PARTITION_COLUMNS, "datestr");
+    conf.set(hive_metastoreConstants.META_TABLE_COLUMNS, hiveColumnNames);
+    conf.set(ColumnProjectionUtils.READ_COLUMN_NAMES_CONF_STR, names);
+    conf.set(ColumnProjectionUtils.READ_COLUMN_IDS_CONF_STR, postions);
+    conf.set(hive_metastoreConstants.META_TABLE_PARTITION_COLUMNS, "datestr");
+    conf.set(hive_metastoreConstants.META_TABLE_COLUMN_TYPES, modifiedHiveColumnTypes);
+    jobConf.addResource(conf);
+  }
+
+  public static void setInputPath(JobConf jobConf, String inputPath) {
+    jobConf.set("mapreduce.input.fileinputformat.inputdir", inputPath);
+    jobConf.set("mapreduce.input.fileinputformat.inputdir", inputPath);
+    jobConf.set("map.input.dir", inputPath);
+  }
+
 }
diff --git a/hudi-hadoop-mr/src/test/java/org/apache/hudi/hadoop/TestHoodieParquetInputFormat.java b/hudi-hadoop-mr/src/test/java/org/apache/hudi/hadoop/TestHoodieParquetInputFormat.java
index 1741757..536e8f6 100644
--- a/hudi-hadoop-mr/src/test/java/org/apache/hudi/hadoop/TestHoodieParquetInputFormat.java
+++ b/hudi-hadoop-mr/src/test/java/org/apache/hudi/hadoop/TestHoodieParquetInputFormat.java
@@ -56,9 +56,23 @@ import static org.junit.Assert.assertTrue;
 
 public class TestHoodieParquetInputFormat {
 
+  @Rule
+  public TemporaryFolder basePath = new TemporaryFolder();
+
   private HoodieParquetInputFormat inputFormat;
   private JobConf jobConf;
 
+  public static void ensureFilesInCommit(String msg, FileStatus[] files, String commit, int expected) {
+    int count = 0;
+    for (FileStatus file : files) {
+      String commitTs = FSUtils.getCommitTime(file.getPath().getName());
+      if (commit.equals(commitTs)) {
+        count++;
+      }
+    }
+    assertEquals(msg, expected, count);
+  }
+
   @Before
   public void setUp() {
     inputFormat = new HoodieParquetInputFormat();
@@ -66,9 +80,6 @@ public class TestHoodieParquetInputFormat {
     inputFormat.setConf(jobConf);
   }
 
-  @Rule
-  public TemporaryFolder basePath = new TemporaryFolder();
-
   // Verify that HoodieParquetInputFormat does not return instants after pending compaction
   @Test
   public void testPendingCompactionWithActiveCommits() throws IOException {
@@ -373,15 +384,4 @@ public class TestHoodieParquetInputFormat {
     assertEquals(msg, expectedNumberOfRecordsInCommit, actualCount);
     assertEquals(msg, totalExpected, totalCount);
   }
-
-  public static void ensureFilesInCommit(String msg, FileStatus[] files, String commit, int expected) {
-    int count = 0;
-    for (FileStatus file : files) {
-      String commitTs = FSUtils.getCommitTime(file.getPath().getName());
-      if (commit.equals(commitTs)) {
-        count++;
-      }
-    }
-    assertEquals(msg, expected, count);
-  }
 }
diff --git a/hudi-hadoop-mr/src/test/java/org/apache/hudi/hadoop/realtime/TestHoodieCombineHiveInputFormat.java b/hudi-hadoop-mr/src/test/java/org/apache/hudi/hadoop/realtime/TestHoodieCombineHiveInputFormat.java
new file mode 100644
index 0000000..b1839ff
--- /dev/null
+++ b/hudi-hadoop-mr/src/test/java/org/apache/hudi/hadoop/realtime/TestHoodieCombineHiveInputFormat.java
@@ -0,0 +1,160 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hudi.hadoop.realtime;
+
+import static org.apache.hadoop.hive.ql.exec.Utilities.HAS_MAP_WORK;
+import static org.apache.hadoop.hive.ql.exec.Utilities.MAPRED_MAPPER_CLASS;
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertTrue;
+
+import org.apache.hudi.avro.HoodieAvroUtils;
+import org.apache.hudi.common.HoodieCommonTestHarness;
+import org.apache.hudi.common.minicluster.MiniClusterUtil;
+import org.apache.hudi.common.model.HoodieTableType;
+import org.apache.hudi.common.model.HoodieTestUtils;
+import org.apache.hudi.common.table.log.HoodieLogFormat;
+import org.apache.hudi.common.util.SchemaTestUtil;
+import org.apache.hudi.hadoop.InputFormatTestUtil;
+
+import org.apache.avro.Schema;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.hive.ql.exec.Utilities;
+import org.apache.hadoop.hive.ql.exec.mr.ExecMapper;
+import org.apache.hadoop.hive.ql.plan.MapredWork;
+import org.apache.hadoop.hive.ql.plan.PartitionDesc;
+import org.apache.hadoop.hive.ql.plan.TableDesc;
+import org.apache.hadoop.io.ArrayWritable;
+import org.apache.hadoop.io.NullWritable;
+import org.apache.hadoop.mapred.FileInputFormat;
+import org.apache.hadoop.mapred.InputSplit;
+import org.apache.hadoop.mapred.JobConf;
+import org.apache.hadoop.mapred.RecordReader;
+import org.apache.hudi.hadoop.hive.HoodieCombineHiveInputFormat;
+import org.junit.AfterClass;
+import org.junit.Before;
+import org.junit.BeforeClass;
+import org.junit.Ignore;
+import org.junit.Rule;
+import org.junit.Test;
+import org.junit.rules.TemporaryFolder;
+
+import java.io.File;
+import java.io.IOException;
+import java.util.LinkedHashMap;
+
+public class TestHoodieCombineHiveInputFormat extends HoodieCommonTestHarness {
+
+  @Rule
+  public TemporaryFolder basePath = new TemporaryFolder();
+  private JobConf jobConf;
+  private FileSystem fs;
+  private Configuration hadoopConf;
+
+  @BeforeClass
+  public static void setUpClass() throws IOException, InterruptedException {
+    // Append is not supported in LocalFileSystem. HDFS needs to be setup.
+    MiniClusterUtil.setUp();
+  }
+
+  @AfterClass
+  public static void tearDownClass() {
+    MiniClusterUtil.shutdown();
+  }
+
+  @Before
+  public void setUp() throws IOException, InterruptedException {
+    this.fs = MiniClusterUtil.fileSystem;
+    jobConf = new JobConf();
+    hadoopConf = HoodieTestUtils.getDefaultHadoopConf();
+    assertTrue(fs.mkdirs(new Path(folder.getRoot().getPath())));
+    HoodieTestUtils.init(MiniClusterUtil.configuration, basePath.getRoot().getPath(), HoodieTableType.MERGE_ON_READ);
+  }
+
+  @Test
+  @Ignore
+  public void testHoodieRealtimeCombineHoodieInputFormat() throws Exception {
+
+    Configuration conf = new Configuration();
+    // initial commit
+    Schema schema = HoodieAvroUtils.addMetadataFields(SchemaTestUtil.getEvolvedSchema());
+    HoodieTestUtils.init(hadoopConf, basePath.getRoot().getAbsolutePath(), HoodieTableType.MERGE_ON_READ);
+    String commitTime = "100";
+    final int numRecords = 1000;
+    // Create 3 parquet files with 1000 records each
+    File partitionDir = InputFormatTestUtil.prepareParquetTable(basePath, schema, 3, numRecords, commitTime);
+    InputFormatTestUtil.commit(basePath, commitTime);
+
+    // insert 1000 update records to log file 0
+    String newCommitTime = "101";
+    HoodieLogFormat.Writer writer =
+        InputFormatTestUtil.writeDataBlockToLogFile(partitionDir, fs, schema, "fileid0", commitTime, newCommitTime,
+            numRecords, numRecords, 0);
+    writer.close();
+    // insert 1000 update records to log file 1
+    writer =
+        InputFormatTestUtil.writeDataBlockToLogFile(partitionDir, fs, schema, "fileid1", commitTime, newCommitTime,
+            numRecords, numRecords, 0);
+    writer.close();
+    // insert 1000 update records to log file 2
+    writer =
+        InputFormatTestUtil.writeDataBlockToLogFile(partitionDir, fs, schema, "fileid2", commitTime, newCommitTime,
+            numRecords, numRecords, 0);
+    writer.close();
+
+    TableDesc tblDesc = Utilities.defaultTd;
+    // Set the input format
+    tblDesc.setInputFileFormatClass(HoodieCombineHiveInputFormat.class);
+    PartitionDesc partDesc = new PartitionDesc(tblDesc, null);
+    LinkedHashMap<Path, PartitionDesc> pt = new LinkedHashMap<>();
+    pt.put(new Path(basePath.getRoot().getAbsolutePath()), partDesc);
+    MapredWork mrwork = new MapredWork();
+    mrwork.getMapWork().setPathToPartitionInfo(pt);
+    Path mapWorkPath = new Path(basePath.getRoot().getAbsolutePath());
+    Utilities.setMapRedWork(conf, mrwork, mapWorkPath);
+    jobConf = new JobConf(conf);
+    // Add the paths
+    FileInputFormat.setInputPaths(jobConf, partitionDir.getPath());
+    jobConf.set(HAS_MAP_WORK, "true");
+    // The following config tells Hive to choose ExecMapper to read the MAP_WORK
+    jobConf.set(MAPRED_MAPPER_CLASS, ExecMapper.class.getName());
+    // setting the split size to be 3 to create one split for 3 file groups
+    jobConf.set(org.apache.hadoop.mapreduce.lib.input.FileInputFormat.SPLIT_MAXSIZE, "3");
+
+    HoodieCombineHiveInputFormat combineHiveInputFormat = new HoodieCombineHiveInputFormat();
+    String tripsHiveColumnTypes = "double,string,string,string,double,double,double,double,double";
+    InputFormatTestUtil.setPropsForInputFormat(jobConf, schema, tripsHiveColumnTypes);
+    InputSplit[] splits = combineHiveInputFormat.getSplits(jobConf, 1);
+    // Since the SPLIT_SIZE is 3, we should create only 1 split with all 3 file groups
+    assertEquals(splits.length, 1);
+    RecordReader<NullWritable, ArrayWritable> recordReader =
+        combineHiveInputFormat.getRecordReader(splits[0], jobConf, null);
+    NullWritable nullWritable = recordReader.createKey();
+    ArrayWritable arrayWritable = recordReader.createValue();
+    int counter = 0;
+    while (recordReader.next(nullWritable, arrayWritable)) {
+      // read over all the splits
+      counter++;
+    }
+    // should read out 3 splits, each for file0, file1, file2 containing 1000 records each
+    assertEquals(3000, counter);
+  }
+
+}
diff --git a/hudi-hadoop-mr/src/test/java/org/apache/hudi/hadoop/realtime/TestHoodieRealtimeRecordReader.java b/hudi-hadoop-mr/src/test/java/org/apache/hudi/hadoop/realtime/TestHoodieRealtimeRecordReader.java
index 33856ae..be444b4 100644
--- a/hudi-hadoop-mr/src/test/java/org/apache/hudi/hadoop/realtime/TestHoodieRealtimeRecordReader.java
+++ b/hudi-hadoop-mr/src/test/java/org/apache/hudi/hadoop/realtime/TestHoodieRealtimeRecordReader.java
@@ -26,11 +26,6 @@ import org.apache.hudi.common.model.HoodieTableType;
 import org.apache.hudi.common.model.HoodieTestUtils;
 import org.apache.hudi.common.table.log.HoodieLogFormat;
 import org.apache.hudi.common.table.log.HoodieLogFormat.Writer;
-import org.apache.hudi.common.table.log.block.HoodieAvroDataBlock;
-import org.apache.hudi.common.table.log.block.HoodieCommandBlock;
-import org.apache.hudi.common.table.log.block.HoodieCommandBlock.HoodieCommandBlockTypeEnum;
-import org.apache.hudi.common.table.log.block.HoodieLogBlock;
-import org.apache.hudi.common.table.log.block.HoodieLogBlock.HeaderMetadataType;
 import org.apache.hudi.common.table.timeline.HoodieTimeline;
 import org.apache.hudi.common.util.SchemaTestUtil;
 import org.apache.hudi.common.util.collection.Pair;
@@ -39,7 +34,6 @@ import org.apache.hudi.hadoop.InputFormatTestUtil;
 
 import org.apache.avro.Schema;
 import org.apache.avro.Schema.Field;
-import org.apache.avro.generic.IndexedRecord;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.Path;
@@ -68,10 +62,8 @@ import java.io.File;
 import java.io.IOException;
 import java.util.ArrayList;
 import java.util.Collections;
-import java.util.HashMap;
 import java.util.HashSet;
 import java.util.List;
-import java.util.Map;
 import java.util.Set;
 import java.util.stream.Collectors;
 
@@ -83,7 +75,8 @@ import static org.junit.Assert.assertTrue;
 public class TestHoodieRealtimeRecordReader {
 
   private static final String PARTITION_COLUMN = "datestr";
-
+  @Rule
+  public TemporaryFolder basePath = new TemporaryFolder();
   private JobConf jobConf;
   private FileSystem fs;
   private Configuration hadoopConf;
@@ -96,64 +89,12 @@ public class TestHoodieRealtimeRecordReader {
     fs = FSUtils.getFs(basePath.getRoot().getAbsolutePath(), hadoopConf);
   }
 
-  @Rule
-  public TemporaryFolder basePath = new TemporaryFolder();
-
-  private Writer writeLogFile(File partitionDir, Schema schema, String fileId, String baseCommit, String newCommit,
+  private Writer writeLogFile(File partitionDir, Schema schema, String fileId, String baseCommit, String
+      newCommit,
       int numberOfRecords) throws InterruptedException, IOException {
-    return writeDataBlockToLogFile(partitionDir, schema, fileId, baseCommit, newCommit, numberOfRecords, 0, 0);
-  }
-
-  private Writer writeRollback(File partitionDir, Schema schema, String fileId, String baseCommit, String newCommit,
-      String rolledBackInstant, int logVersion) throws InterruptedException, IOException {
-    Writer writer = HoodieLogFormat.newWriterBuilder().onParentPath(new Path(partitionDir.getPath())).withFileId(fileId)
-        .overBaseCommit(baseCommit).withFs(fs).withLogVersion(logVersion).withLogWriteToken("1-0-1")
-        .withFileExtension(HoodieLogFile.DELTA_EXTENSION).build();
-    // generate metadata
-    Map<HeaderMetadataType, String> header = new HashMap<>();
-    header.put(HeaderMetadataType.INSTANT_TIME, newCommit);
-    header.put(HeaderMetadataType.TARGET_INSTANT_TIME, rolledBackInstant);
-    header.put(HeaderMetadataType.COMMAND_BLOCK_TYPE,
-        String.valueOf(HoodieCommandBlockTypeEnum.ROLLBACK_PREVIOUS_BLOCK.ordinal()));
-    // if update belongs to an existing log file
-    writer = writer.appendBlock(new HoodieCommandBlock(header));
-    return writer;
-  }
-
-  private HoodieLogFormat.Writer writeDataBlockToLogFile(File partitionDir, Schema schema, String fileId,
-      String baseCommit, String newCommit, int numberOfRecords, int offset, int logVersion)
-      throws InterruptedException, IOException {
-    HoodieLogFormat.Writer writer = HoodieLogFormat.newWriterBuilder().onParentPath(new Path(partitionDir.getPath()))
-        .withFileExtension(HoodieLogFile.DELTA_EXTENSION).withFileId(fileId).withLogVersion(logVersion)
-        .withLogWriteToken("1-0-1").overBaseCommit(baseCommit).withFs(fs).build();
-    List<IndexedRecord> records = new ArrayList<>();
-    for (int i = offset; i < offset + numberOfRecords; i++) {
-      records.add(SchemaTestUtil.generateAvroRecordFromJson(schema, i, newCommit, "fileid0"));
-    }
-    Schema writeSchema = records.get(0).getSchema();
-    Map<HeaderMetadataType, String> header = new HashMap<>();
-    header.put(HoodieLogBlock.HeaderMetadataType.INSTANT_TIME, newCommit);
-    header.put(HoodieLogBlock.HeaderMetadataType.SCHEMA, writeSchema.toString());
-    HoodieAvroDataBlock dataBlock = new HoodieAvroDataBlock(records, header);
-    writer = writer.appendBlock(dataBlock);
-    return writer;
-  }
-
-  private HoodieLogFormat.Writer writeRollbackBlockToLogFile(File partitionDir, Schema schema, String fileId,
-      String baseCommit, String newCommit, String oldCommit, int logVersion) throws InterruptedException, IOException {
-    HoodieLogFormat.Writer writer = HoodieLogFormat.newWriterBuilder().onParentPath(new Path(partitionDir.getPath()))
-        .withFileExtension(HoodieLogFile.DELTA_EXTENSION).withFileId(fileId).overBaseCommit(baseCommit)
-        .withLogVersion(logVersion).withFs(fs).build();
-
-    Map<HoodieLogBlock.HeaderMetadataType, String> header = new HashMap<>();
-    header.put(HoodieLogBlock.HeaderMetadataType.INSTANT_TIME, newCommit);
-    header.put(HoodieLogBlock.HeaderMetadataType.SCHEMA, schema.toString());
-    header.put(HoodieLogBlock.HeaderMetadataType.TARGET_INSTANT_TIME, oldCommit);
-    header.put(HeaderMetadataType.COMMAND_BLOCK_TYPE,
-        String.valueOf(HoodieCommandBlockTypeEnum.ROLLBACK_PREVIOUS_BLOCK.ordinal()));
-    HoodieCommandBlock rollbackBlock = new HoodieCommandBlock(header);
-    writer = writer.appendBlock(rollbackBlock);
-    return writer;
+    return InputFormatTestUtil.writeDataBlockToLogFile(partitionDir, fs, schema, fileId, baseCommit, newCommit,
+        numberOfRecords, 0,
+        0);
   }
 
   @Test
@@ -213,11 +154,12 @@ public class TestHoodieRealtimeRecordReader {
 
         HoodieLogFormat.Writer writer;
         if (action.equals(HoodieTimeline.ROLLBACK_ACTION)) {
-          writer = writeRollback(partitionDir, schema, "fileid0", baseInstant, instantTime,
+          writer = InputFormatTestUtil.writeRollback(partitionDir, fs, "fileid0", baseInstant, instantTime,
               String.valueOf(baseInstantTs + logVersion - 1), logVersion);
         } else {
           writer =
-              writeDataBlockToLogFile(partitionDir, schema, "fileid0", baseInstant, instantTime, 100, 0, logVersion);
+              InputFormatTestUtil.writeDataBlockToLogFile(partitionDir, fs, schema, "fileid0", baseInstant,
+                  instantTime, 100, 0, logVersion);
         }
         long size = writer.getCurrentSize();
         writer.close();
@@ -228,7 +170,7 @@ public class TestHoodieRealtimeRecordReader {
         HoodieRealtimeFileSplit split = new HoodieRealtimeFileSplit(
             new FileSplit(new Path(partitionDir + "/fileid0_1-0-1_" + baseInstant + ".parquet"), 0, 1, jobConf),
             basePath.getRoot().getPath(), fileSlice.getLogFiles().sorted(HoodieLogFile.getLogFileComparator())
-                .map(h -> h.getPath().toString()).collect(Collectors.toList()),
+            .map(h -> h.getPath().toString()).collect(Collectors.toList()),
             instantTime);
 
         // create a RecordReader to be used by HoodieRealtimeRecordReader
@@ -281,7 +223,8 @@ public class TestHoodieRealtimeRecordReader {
     // insert new records to log file
     String newCommitTime = "101";
     HoodieLogFormat.Writer writer =
-        writeDataBlockToLogFile(partitionDir, schema, "fileid0", instantTime, newCommitTime, numRecords, numRecords, 0);
+        InputFormatTestUtil.writeDataBlockToLogFile(partitionDir, fs, schema, "fileid0", instantTime, newCommitTime,
+            numRecords, numRecords, 0);
     long size = writer.getCurrentSize();
     writer.close();
     assertTrue("block - size should be > 0", size > 0);
@@ -302,7 +245,7 @@ public class TestHoodieRealtimeRecordReader {
     jobConf.set(REALTIME_SKIP_MERGE_PROP, "true");
 
     // validate unmerged record reader
-    HoodieRealtimeRecordReader recordReader = new HoodieRealtimeRecordReader(split, jobConf, reader);
+    RealtimeUnmergedRecordReader recordReader = new RealtimeUnmergedRecordReader(split, jobConf, reader);
 
     // use reader to read base Parquet File and log file
     // here all records should be present. Also ensure log records are in order.
@@ -486,7 +429,8 @@ public class TestHoodieRealtimeRecordReader {
     schema = SchemaTestUtil.getComplexEvolvedSchema();
     String newCommitTime = "101";
     HoodieLogFormat.Writer writer =
-        writeDataBlockToLogFile(partitionDir, schema, "fileid0", instantTime, newCommitTime, numberOfLogRecords, 0, 1);
+        InputFormatTestUtil.writeDataBlockToLogFile(partitionDir, fs, schema, "fileid0", instantTime, newCommitTime,
+            numberOfLogRecords, 0, 1);
     long size = writer.getCurrentSize();
     logFilePaths.add(writer.getLogFile().getPath().toString());
     writer.close();
@@ -494,7 +438,8 @@ public class TestHoodieRealtimeRecordReader {
 
     // write rollback for the previous block in new log file version
     newCommitTime = "102";
-    writer = writeRollbackBlockToLogFile(partitionDir, schema, "fileid0", instantTime, newCommitTime, "101", 1);
+    writer = InputFormatTestUtil.writeRollbackBlockToLogFile(partitionDir, fs, schema, "fileid0", instantTime,
+        newCommitTime, "101", 1);
     logFilePaths.add(writer.getLogFile().getPath().toString());
     writer.close();
     InputFormatTestUtil.deltaCommit(basePath, newCommitTime);