You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hudi.apache.org by vi...@apache.org on 2020/08/31 15:06:14 UTC

[hudi] branch master updated: [HUDI-960] Implementation of the HFile base and log file format. (#1804)

This is an automated email from the ASF dual-hosted git repository.

vinoth pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/hudi.git


The following commit(s) were added to refs/heads/master by this push:
     new 6461927  [HUDI-960] Implementation of the HFile base and log file format. (#1804)
6461927 is described below

commit 6461927eac3f8a225b17af5ecb6ace8c9cf1757b
Author: Prashant Wason <pw...@uber.com>
AuthorDate: Mon Aug 31 08:05:59 2020 -0700

    [HUDI-960] Implementation of the HFile base and log file format. (#1804)
    
    * [HUDI-960] Implementation of the HFile base and log file format.
    
    1. Includes HFileWriter and HFileReader
    2. Includes HFileInputFormat for both snapshot and realtime input format for Hive
    3. Unit test for new code
    4. IT for using HFile format and querying using Hive (Presto and SparkSQL are not supported)
    
    Advantage:
    HFile file format saves data as binary key-value pairs. This implementation chooses the following values:
    1. Key = Hoodie Record Key (as bytes)
    2. Value = Avro encoded GenericRecord (as bytes)
    
    HFile allows efficient lookup of a record by key or range of keys. Hence, this base file format is well suited to applications like RFC-15, RFC-08 which will benefit from the ability to lookup records by key or search in a range of keys without having to read the entire data/log format.
    
    Limitations:
    HFile storage format has certain limitations when used as a general purpose data storage format.
    1. Does not have a implemented reader for Presto and SparkSQL
    2. Is not a columnar file format and hence may lead to lower compression levels and greater IO on query side due to lack of column pruning
    
    
    Other changes:
     - Remove databricks/avro from pom
     - Fix HoodieClientTestUtils from not using scala imports/reflection based conversion etc
     - Breaking up limitFileSize(), per parquet and hfile base files
     - Added three new configs for HoodieHFileConfig - prefetchBlocksOnOpen, cacheDataInL1, dropBehindCacheCompaction
     - Throw UnsupportedException in HFileReader.getRecordKeys()
     - Updated HoodieCopyOnWriteTable to create the correct merge handle (HoodieSortedMergeHandle for HFile and HoodieMergeHandle otherwise)
    
    * Fixing checkstyle
    
    Co-authored-by: Vinoth Chandar <vi...@apache.org>
---
 hudi-client/pom.xml                                |   6 +
 .../apache/hudi/config/HoodieStorageConfig.java    |  31 +-
 .../org/apache/hudi/config/HoodieWriteConfig.java  |  15 +
 .../org/apache/hudi/io/HoodieCreateHandle.java     |  20 +-
 .../java/org/apache/hudi/io/HoodieMergeHandle.java |  11 +-
 .../apache/hudi/io/HoodieSortedMergeHandle.java    | 126 ++++++
 .../hudi/io/storage/HoodieFileWriterFactory.java   |  26 +-
 .../apache/hudi/io/storage/HoodieHFileConfig.java  |  95 +++++
 .../apache/hudi/io/storage/HoodieHFileWriter.java  | 166 ++++++++
 .../apache/hudi/table/HoodieCopyOnWriteTable.java  |  15 +-
 .../java/org/apache/hudi/table/HoodieTable.java    |   6 +
 .../action/commit/BaseCommitActionExecutor.java    |  43 ++-
 .../table/action/commit/CommitActionExecutor.java  |   7 +-
 .../compact/HoodieMergeOnReadTableCompactor.java   |   5 +-
 .../TestHoodieClientOnCopyOnWriteStorage.java      |   3 +-
 .../hudi/client/TestTableSchemaEvolution.java      |   2 +-
 .../hudi/client/TestUpdateSchemaEvolution.java     |   7 +-
 .../org/apache/hudi/index/TestHoodieIndex.java     |   2 +-
 .../apache/hudi/index/hbase/TestHBaseIndex.java    |   3 +-
 .../index/hbase/TestHBaseQPSResourceAllocator.java |   3 +-
 .../hudi/io/TestHoodieKeyLocationFetchHandle.java  |   2 +-
 .../org/apache/hudi/io/TestHoodieMergeHandle.java  |   2 +-
 .../io/storage/TestHoodieFileWriterFactory.java    |   5 +
 .../hudi/table/TestHoodieMergeOnReadTable.java     | 240 ++++++------
 .../commit/TestCopyOnWriteActionExecutor.java      |   6 +-
 .../table/action/commit/TestUpsertPartitioner.java |   3 +-
 .../table/action/compact/CompactionTestBase.java   |   5 +-
 .../table/action/compact/TestHoodieCompactor.java  |   2 +-
 .../hudi/testutils/HoodieClientTestBase.java       |   8 +-
 .../hudi/testutils/HoodieClientTestUtils.java      |  54 ++-
 .../hudi/testutils/SparkDatasetTestUtils.java      |   2 +-
 .../java/org/apache/hudi/avro/HoodieAvroUtils.java |  16 +-
 .../table/log/AbstractHoodieLogRecordScanner.java  |   5 +
 .../hudi/common/table/log/HoodieLogFileReader.java |   5 +
 .../common/table/log/block/HoodieDataBlock.java    |   2 +
 .../table/log/block/HoodieHFileDataBlock.java      | 159 ++++++++
 .../common/table/log/block/HoodieLogBlock.java     |   2 +-
 .../apache/hudi/io/storage/HoodieFileReader.java   |  15 +-
 .../hudi/io/storage/HoodieFileReaderFactory.java   |  12 +
 .../apache/hudi/io/storage/HoodieHFileReader.java  | 300 +++++++++++++++
 .../common/functional/TestHoodieAvroLogFormat.java |  30 --
 .../common/functional/TestHoodieLogFormat.java     |  36 +-
 .../apache/hudi/hadoop/HoodieHFileInputFormat.java | 163 ++++++++
 .../hudi/hadoop/HoodieHFileRecordReader.java       | 101 +++++
 .../org/apache/hudi/hadoop/InputPathHandler.java   |   2 +-
 ...at.java => HoodieHFileRealtimeInputFormat.java} | 123 +-----
 .../realtime/HoodieParquetRealtimeInputFormat.java |  70 +---
 .../hudi/hadoop/utils/HoodieInputFormatUtils.java  |  19 +
 .../utils/HoodieRealtimeInputFormatUtils.java      |  54 +++
 .../hudi/hadoop/TestHoodieHFileInputFormat.java    | 426 +++++++++++++++++++++
 .../java/org/apache/hudi/integ/ITTestBase.java     |   2 +-
 .../org/apache/hudi/integ/ITTestHoodieDemo.java    |  34 +-
 packaging/hudi-spark-bundle/pom.xml                |  17 +
 packaging/hudi-utilities-bundle/pom.xml            |  18 +-
 54 files changed, 2134 insertions(+), 398 deletions(-)

diff --git a/hudi-client/pom.xml b/hudi-client/pom.xml
index 0dd6a8c..1a05175 100644
--- a/hudi-client/pom.xml
+++ b/hudi-client/pom.xml
@@ -206,6 +206,12 @@
       <artifactId>hbase-client</artifactId>
       <version>${hbase.version}</version>
     </dependency>
+    <dependency>
+      <groupId>org.apache.hbase</groupId>
+      <artifactId>hbase-server</artifactId>
+      <version>${hbase.version}</version>
+      <scope>provided</scope>
+    </dependency>
 
     <!-- Hoodie - Tests -->
     <dependency>
diff --git a/hudi-client/src/main/java/org/apache/hudi/config/HoodieStorageConfig.java b/hudi-client/src/main/java/org/apache/hudi/config/HoodieStorageConfig.java
index ac8857f..50b45f3 100644
--- a/hudi-client/src/main/java/org/apache/hudi/config/HoodieStorageConfig.java
+++ b/hudi-client/src/main/java/org/apache/hudi/config/HoodieStorageConfig.java
@@ -39,6 +39,10 @@ public class HoodieStorageConfig extends DefaultHoodieConfig {
   public static final String DEFAULT_PARQUET_BLOCK_SIZE_BYTES = DEFAULT_PARQUET_FILE_MAX_BYTES;
   public static final String PARQUET_PAGE_SIZE_BYTES = "hoodie.parquet.page.size";
   public static final String DEFAULT_PARQUET_PAGE_SIZE_BYTES = String.valueOf(1 * 1024 * 1024);
+  public static final String HFILE_FILE_MAX_BYTES = "hoodie.hfile.max.file.size";
+  public static final String HFILE_BLOCK_SIZE_BYTES = "hoodie.hfile.block.size";
+  public static final String DEFAULT_HFILE_BLOCK_SIZE_BYTES = String.valueOf(1 * 1024 * 1024);
+  public static final String DEFAULT_HFILE_FILE_MAX_BYTES = String.valueOf(120 * 1024 * 1024);
   // used to size log files
   public static final String LOGFILE_SIZE_MAX_BYTES = "hoodie.logfile.max.size";
   public static final String DEFAULT_LOGFILE_SIZE_MAX_BYTES = String.valueOf(1024 * 1024 * 1024); // 1 GB
@@ -49,8 +53,10 @@ public class HoodieStorageConfig extends DefaultHoodieConfig {
   // Default compression ratio for parquet
   public static final String DEFAULT_STREAM_COMPRESSION_RATIO = String.valueOf(0.1);
   public static final String PARQUET_COMPRESSION_CODEC = "hoodie.parquet.compression.codec";
+  public static final String HFILE_COMPRESSION_ALGORITHM = "hoodie.hfile.compression.algorithm";
   // Default compression codec for parquet
   public static final String DEFAULT_PARQUET_COMPRESSION_CODEC = "gzip";
+  public static final String DEFAULT_HFILE_COMPRESSION_ALGORITHM = "GZ";
   public static final String LOGFILE_TO_PARQUET_COMPRESSION_RATIO = "hoodie.logfile.to.parquet.compression.ratio";
   // Default compression ratio for log file to parquet, general 3x
   public static final String DEFAULT_LOGFILE_TO_PARQUET_COMPRESSION_RATIO = String.valueOf(0.35);
@@ -79,7 +85,7 @@ public class HoodieStorageConfig extends DefaultHoodieConfig {
       return this;
     }
 
-    public Builder limitFileSize(long maxFileSize) {
+    public Builder parquetMaxFileSize(long maxFileSize) {
       props.setProperty(PARQUET_FILE_MAX_BYTES, String.valueOf(maxFileSize));
       return this;
     }
@@ -94,6 +100,16 @@ public class HoodieStorageConfig extends DefaultHoodieConfig {
       return this;
     }
 
+    public Builder hfileMaxFileSize(long maxFileSize) {
+      props.setProperty(HFILE_FILE_MAX_BYTES, String.valueOf(maxFileSize));
+      return this;
+    }
+
+    public Builder hfileBlockSize(int blockSize) {
+      props.setProperty(HFILE_BLOCK_SIZE_BYTES, String.valueOf(blockSize));
+      return this;
+    }
+
     public Builder logFileDataBlockMaxSize(int dataBlockSize) {
       props.setProperty(LOGFILE_DATA_BLOCK_SIZE_MAX_BYTES, String.valueOf(dataBlockSize));
       return this;
@@ -114,6 +130,11 @@ public class HoodieStorageConfig extends DefaultHoodieConfig {
       return this;
     }
 
+    public Builder hfileCompressionAlgorithm(String hfileCompressionAlgorithm) {
+      props.setProperty(HFILE_COMPRESSION_ALGORITHM, hfileCompressionAlgorithm);
+      return this;
+    }
+
     public Builder logFileToParquetCompressionRatio(double logFileToParquetCompressionRatio) {
       props.setProperty(LOGFILE_TO_PARQUET_COMPRESSION_RATIO, String.valueOf(logFileToParquetCompressionRatio));
       return this;
@@ -137,6 +158,14 @@ public class HoodieStorageConfig extends DefaultHoodieConfig {
           DEFAULT_PARQUET_COMPRESSION_CODEC);
       setDefaultOnCondition(props, !props.containsKey(LOGFILE_TO_PARQUET_COMPRESSION_RATIO),
           LOGFILE_TO_PARQUET_COMPRESSION_RATIO, DEFAULT_LOGFILE_TO_PARQUET_COMPRESSION_RATIO);
+
+      setDefaultOnCondition(props, !props.containsKey(HFILE_BLOCK_SIZE_BYTES), HFILE_BLOCK_SIZE_BYTES,
+          DEFAULT_HFILE_BLOCK_SIZE_BYTES);
+      setDefaultOnCondition(props, !props.containsKey(HFILE_COMPRESSION_ALGORITHM), HFILE_COMPRESSION_ALGORITHM,
+          DEFAULT_HFILE_COMPRESSION_ALGORITHM);
+      setDefaultOnCondition(props, !props.containsKey(HFILE_FILE_MAX_BYTES), HFILE_FILE_MAX_BYTES,
+          DEFAULT_HFILE_FILE_MAX_BYTES);
+
       return config;
     }
   }
diff --git a/hudi-client/src/main/java/org/apache/hudi/config/HoodieWriteConfig.java b/hudi-client/src/main/java/org/apache/hudi/config/HoodieWriteConfig.java
index 249b107..7ad0f96 100644
--- a/hudi-client/src/main/java/org/apache/hudi/config/HoodieWriteConfig.java
+++ b/hudi-client/src/main/java/org/apache/hudi/config/HoodieWriteConfig.java
@@ -18,6 +18,7 @@
 
 package org.apache.hudi.config;
 
+import org.apache.hadoop.hbase.io.compress.Compression;
 import org.apache.hudi.client.HoodieWriteClient;
 import org.apache.hudi.client.WriteStatus;
 import org.apache.hudi.client.bootstrap.BootstrapMode;
@@ -55,6 +56,8 @@ import java.util.stream.Collectors;
 @Immutable
 public class HoodieWriteConfig extends DefaultHoodieConfig {
 
+  private static final long serialVersionUID = 0L;
+
   public static final String TABLE_NAME = "hoodie.table.name";
   public static final String DEFAULT_ROLLBACK_USING_MARKERS = "false";
   public static final String ROLLBACK_USING_MARKERS = "hoodie.rollback.using.markers";
@@ -556,6 +559,18 @@ public class HoodieWriteConfig extends DefaultHoodieConfig {
     return Double.parseDouble(props.getProperty(HoodieStorageConfig.LOGFILE_TO_PARQUET_COMPRESSION_RATIO));
   }
 
+  public long getHFileMaxFileSize() {
+    return Long.parseLong(props.getProperty(HoodieStorageConfig.HFILE_FILE_MAX_BYTES));
+  }
+
+  public int getHFileBlockSize() {
+    return Integer.parseInt(props.getProperty(HoodieStorageConfig.HFILE_BLOCK_SIZE_BYTES));
+  }
+
+  public Compression.Algorithm getHFileCompressionAlgorithm() {
+    return Compression.Algorithm.valueOf(props.getProperty(HoodieStorageConfig.HFILE_COMPRESSION_ALGORITHM));
+  }
+
   /**
    * metrics properties.
    */
diff --git a/hudi-client/src/main/java/org/apache/hudi/io/HoodieCreateHandle.java b/hudi-client/src/main/java/org/apache/hudi/io/HoodieCreateHandle.java
index 5a76dc7..9953278 100644
--- a/hudi-client/src/main/java/org/apache/hudi/io/HoodieCreateHandle.java
+++ b/hudi-client/src/main/java/org/apache/hudi/io/HoodieCreateHandle.java
@@ -45,6 +45,7 @@ import org.apache.log4j.Logger;
 
 import java.io.IOException;
 import java.util.Iterator;
+import java.util.Map;
 
 public class HoodieCreateHandle<T extends HoodieRecordPayload> extends HoodieWriteHandle<T> {
 
@@ -55,7 +56,7 @@ public class HoodieCreateHandle<T extends HoodieRecordPayload> extends HoodieWri
   private long recordsWritten = 0;
   private long insertRecordsWritten = 0;
   private long recordsDeleted = 0;
-  private Iterator<HoodieRecord<T>> recordIterator;
+  private Map<String, HoodieRecord<T>> recordMap;
   private boolean useWriterSchema = false;
 
   public HoodieCreateHandle(HoodieWriteConfig config, String instantTime, HoodieTable<T> hoodieTable,
@@ -90,9 +91,10 @@ public class HoodieCreateHandle<T extends HoodieRecordPayload> extends HoodieWri
    * Called by the compactor code path.
    */
   public HoodieCreateHandle(HoodieWriteConfig config, String instantTime, HoodieTable<T> hoodieTable,
-      String partitionPath, String fileId, Iterator<HoodieRecord<T>> recordIterator, SparkTaskContextSupplier sparkTaskContextSupplier) {
+      String partitionPath, String fileId, Map<String, HoodieRecord<T>> recordMap,
+      SparkTaskContextSupplier sparkTaskContextSupplier) {
     this(config, instantTime, hoodieTable, partitionPath, fileId, sparkTaskContextSupplier);
-    this.recordIterator = recordIterator;
+    this.recordMap = recordMap;
     this.useWriterSchema = true;
   }
 
@@ -138,9 +140,17 @@ public class HoodieCreateHandle<T extends HoodieRecordPayload> extends HoodieWri
    * Writes all records passed.
    */
   public void write() {
+    Iterator<String> keyIterator;
+    if (hoodieTable.requireSortedRecords()) {
+      // Sorting the keys limits the amount of extra memory required for writing sorted records
+      keyIterator = recordMap.keySet().stream().sorted().iterator();
+    } else {
+      keyIterator = recordMap.keySet().stream().iterator();
+    }
     try {
-      while (recordIterator.hasNext()) {
-        HoodieRecord<T> record = recordIterator.next();
+      while (keyIterator.hasNext()) {
+        final String key = keyIterator.next();
+        HoodieRecord<T> record = recordMap.get(key);
         if (useWriterSchema) {
           write(record, record.getData().getInsertValue(writerSchemaWithMetafields));
         } else {
diff --git a/hudi-client/src/main/java/org/apache/hudi/io/HoodieMergeHandle.java b/hudi-client/src/main/java/org/apache/hudi/io/HoodieMergeHandle.java
index 8d54065..650237c 100644
--- a/hudi-client/src/main/java/org/apache/hudi/io/HoodieMergeHandle.java
+++ b/hudi-client/src/main/java/org/apache/hudi/io/HoodieMergeHandle.java
@@ -58,16 +58,17 @@ public class HoodieMergeHandle<T extends HoodieRecordPayload> extends HoodieWrit
 
   private static final Logger LOG = LogManager.getLogger(HoodieMergeHandle.class);
 
-  private Map<String, HoodieRecord<T>> keyToNewRecords;
-  private Set<String> writtenRecordKeys;
+  protected Map<String, HoodieRecord<T>> keyToNewRecords;
+  protected Set<String> writtenRecordKeys;
   private HoodieFileWriter<IndexedRecord> fileWriter;
+
   private Path newFilePath;
   private Path oldFilePath;
   private long recordsWritten = 0;
   private long recordsDeleted = 0;
   private long updatedRecordsWritten = 0;
-  private long insertRecordsWritten = 0;
-  private boolean useWriterSchema;
+  protected long insertRecordsWritten = 0;
+  protected boolean useWriterSchema;
   private HoodieBaseFile baseFileToMerge;
 
   public HoodieMergeHandle(HoodieWriteConfig config, String instantTime, HoodieTable<T> hoodieTable,
@@ -179,7 +180,7 @@ public class HoodieMergeHandle<T extends HoodieRecordPayload> extends HoodieWrit
     return writeRecord(hoodieRecord, indexedRecord);
   }
 
-  private boolean writeRecord(HoodieRecord<T> hoodieRecord, Option<IndexedRecord> indexedRecord) {
+  protected boolean writeRecord(HoodieRecord<T> hoodieRecord, Option<IndexedRecord> indexedRecord) {
     Option recordMetadata = hoodieRecord.getData().getMetadata();
     if (!partitionPath.equals(hoodieRecord.getPartitionPath())) {
       HoodieUpsertException failureEx = new HoodieUpsertException("mismatched partition path, record partition: "
diff --git a/hudi-client/src/main/java/org/apache/hudi/io/HoodieSortedMergeHandle.java b/hudi-client/src/main/java/org/apache/hudi/io/HoodieSortedMergeHandle.java
new file mode 100644
index 0000000..dda7b72
--- /dev/null
+++ b/hudi-client/src/main/java/org/apache/hudi/io/HoodieSortedMergeHandle.java
@@ -0,0 +1,126 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hudi.io;
+
+import org.apache.hudi.client.SparkTaskContextSupplier;
+import org.apache.hudi.client.WriteStatus;
+import org.apache.hudi.common.model.HoodieBaseFile;
+import org.apache.hudi.common.model.HoodieRecord;
+import org.apache.hudi.common.model.HoodieRecordPayload;
+import org.apache.hudi.config.HoodieWriteConfig;
+import org.apache.hudi.exception.HoodieUpsertException;
+import org.apache.hudi.table.HoodieTable;
+
+import org.apache.avro.generic.GenericRecord;
+
+import java.io.IOException;
+import java.util.Iterator;
+import java.util.Map;
+import java.util.PriorityQueue;
+import java.util.Queue;
+
+/**
+ * Hoodie merge handle which writes records (new inserts or updates) sorted by their key.
+ *
+ * The implementation performs a merge-sort by comparing the key of the record being written to the list of
+ * keys in newRecordKeys (sorted in-memory).
+ */
+public class HoodieSortedMergeHandle<T extends HoodieRecordPayload> extends HoodieMergeHandle<T> {
+
+  private Queue<String> newRecordKeysSorted = new PriorityQueue<>();
+
+  public HoodieSortedMergeHandle(HoodieWriteConfig config, String instantTime, HoodieTable<T> hoodieTable,
+       Iterator<HoodieRecord<T>> recordItr, String partitionPath, String fileId, SparkTaskContextSupplier sparkTaskContextSupplier) {
+    super(config, instantTime, hoodieTable, recordItr, partitionPath, fileId, sparkTaskContextSupplier);
+    newRecordKeysSorted.addAll(keyToNewRecords.keySet());
+  }
+
+  /**
+   * Called by compactor code path.
+   */
+  public HoodieSortedMergeHandle(HoodieWriteConfig config, String instantTime, HoodieTable<T> hoodieTable,
+      Map<String, HoodieRecord<T>> keyToNewRecordsOrig, String partitionPath, String fileId,
+      HoodieBaseFile dataFileToBeMerged, SparkTaskContextSupplier sparkTaskContextSupplier) {
+    super(config, instantTime, hoodieTable, keyToNewRecordsOrig, partitionPath, fileId, dataFileToBeMerged,
+        sparkTaskContextSupplier);
+
+    newRecordKeysSorted.addAll(keyToNewRecords.keySet());
+  }
+
+  /**
+   * Go through an old record. Here if we detect a newer version shows up, we write the new one to the file.
+   */
+  @Override
+  public void write(GenericRecord oldRecord) {
+    String key = oldRecord.get(HoodieRecord.RECORD_KEY_METADATA_FIELD).toString();
+
+    // To maintain overall sorted order across updates and inserts, write any new inserts whose keys are less than
+    // the oldRecord's key.
+    while (!newRecordKeysSorted.isEmpty() && newRecordKeysSorted.peek().compareTo(key) <= 0) {
+      String keyToPreWrite = newRecordKeysSorted.remove();
+      if (keyToPreWrite.equals(key)) {
+        // will be handled as an update later
+        break;
+      }
+
+      // This is a new insert
+      HoodieRecord<T> hoodieRecord = new HoodieRecord<>(keyToNewRecords.get(keyToPreWrite));
+      if (writtenRecordKeys.contains(keyToPreWrite)) {
+        throw new HoodieUpsertException("Insert/Update not in sorted order");
+      }
+      try {
+        if (useWriterSchema) {
+          writeRecord(hoodieRecord, hoodieRecord.getData().getInsertValue(writerSchemaWithMetafields));
+        } else {
+          writeRecord(hoodieRecord, hoodieRecord.getData().getInsertValue(writerSchema));
+        }
+        insertRecordsWritten++;
+        writtenRecordKeys.add(keyToPreWrite);
+      } catch (IOException e) {
+        throw new HoodieUpsertException("Failed to write records", e);
+      }
+    }
+
+    super.write(oldRecord);
+  }
+
+  @Override
+  public WriteStatus close() {
+    // write out any pending records (this can happen when inserts are turned into updates)
+    newRecordKeysSorted.stream().forEach(key -> {
+      try {
+        HoodieRecord<T> hoodieRecord = keyToNewRecords.get(key);
+        if (!writtenRecordKeys.contains(hoodieRecord.getRecordKey())) {
+          if (useWriterSchema) {
+            writeRecord(hoodieRecord, hoodieRecord.getData().getInsertValue(writerSchemaWithMetafields));
+          } else {
+            writeRecord(hoodieRecord, hoodieRecord.getData().getInsertValue(writerSchema));
+          }
+          insertRecordsWritten++;
+        }
+      } catch (IOException e) {
+        throw new HoodieUpsertException("Failed to close UpdateHandle", e);
+      }
+    });
+    newRecordKeysSorted.clear();
+    keyToNewRecords.clear();
+
+    return super.close();
+  }
+}
diff --git a/hudi-client/src/main/java/org/apache/hudi/io/storage/HoodieFileWriterFactory.java b/hudi-client/src/main/java/org/apache/hudi/io/storage/HoodieFileWriterFactory.java
index 0fab31e..1d4a9a2 100644
--- a/hudi-client/src/main/java/org/apache/hudi/io/storage/HoodieFileWriterFactory.java
+++ b/hudi-client/src/main/java/org/apache/hudi/io/storage/HoodieFileWriterFactory.java
@@ -35,6 +35,7 @@ import org.apache.parquet.avro.AvroSchemaConverter;
 import java.io.IOException;
 
 import static org.apache.hudi.common.model.HoodieFileFormat.PARQUET;
+import static org.apache.hudi.common.model.HoodieFileFormat.HFILE;
 
 public class HoodieFileWriterFactory {
 
@@ -45,16 +46,16 @@ public class HoodieFileWriterFactory {
     if (PARQUET.getFileExtension().equals(extension)) {
       return newParquetFileWriter(instantTime, path, config, schema, hoodieTable, sparkTaskContextSupplier);
     }
+    if (HFILE.getFileExtension().equals(extension)) {
+      return newHFileFileWriter(instantTime, path, config, schema, hoodieTable, sparkTaskContextSupplier);
+    }
     throw new UnsupportedOperationException(extension + " format not supported yet.");
   }
 
   private static <T extends HoodieRecordPayload, R extends IndexedRecord> HoodieFileWriter<R> newParquetFileWriter(
       String instantTime, Path path, HoodieWriteConfig config, Schema schema, HoodieTable hoodieTable,
       SparkTaskContextSupplier sparkTaskContextSupplier) throws IOException {
-    BloomFilter filter = BloomFilterFactory
-        .createBloomFilter(config.getBloomFilterNumEntries(), config.getBloomFilterFPP(),
-            config.getDynamicBloomFilterMaxNumEntries(),
-            config.getBloomFilterType());
+    BloomFilter filter = createBloomFilter(config);
     HoodieAvroWriteSupport writeSupport =
         new HoodieAvroWriteSupport(new AvroSchemaConverter().convert(schema), schema, filter);
 
@@ -64,4 +65,21 @@ public class HoodieFileWriterFactory {
 
     return new HoodieParquetWriter<>(instantTime, path, parquetConfig, schema, sparkTaskContextSupplier);
   }
+
+  private static <T extends HoodieRecordPayload, R extends IndexedRecord> HoodieFileWriter<R> newHFileFileWriter(
+      String instantTime, Path path, HoodieWriteConfig config, Schema schema, HoodieTable hoodieTable,
+      SparkTaskContextSupplier sparkTaskContextSupplier) throws IOException {
+
+    BloomFilter filter = createBloomFilter(config);
+    HoodieHFileConfig hfileConfig = new HoodieHFileConfig(hoodieTable.getHadoopConf(),
+        config.getHFileCompressionAlgorithm(), config.getHFileBlockSize(), config.getHFileMaxFileSize(), filter);
+
+    return new HoodieHFileWriter<>(instantTime, path, hfileConfig, schema, sparkTaskContextSupplier);
+  }
+
+  private static BloomFilter createBloomFilter(HoodieWriteConfig config) {
+    return BloomFilterFactory.createBloomFilter(config.getBloomFilterNumEntries(), config.getBloomFilterFPP(),
+            config.getDynamicBloomFilterMaxNumEntries(),
+            config.getBloomFilterType());
+  }
 }
diff --git a/hudi-client/src/main/java/org/apache/hudi/io/storage/HoodieHFileConfig.java b/hudi-client/src/main/java/org/apache/hudi/io/storage/HoodieHFileConfig.java
new file mode 100644
index 0000000..031f92c
--- /dev/null
+++ b/hudi-client/src/main/java/org/apache/hudi/io/storage/HoodieHFileConfig.java
@@ -0,0 +1,95 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hudi.io.storage;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.hbase.HColumnDescriptor;
+import org.apache.hadoop.hbase.io.compress.Compression;
+import org.apache.hadoop.hbase.io.hfile.CacheConfig;
+import org.apache.hudi.common.bloom.BloomFilter;
+
+public class HoodieHFileConfig {
+
+  private Compression.Algorithm compressionAlgorithm;
+  private int blockSize;
+  private long maxFileSize;
+  private boolean prefetchBlocksOnOpen;
+  private boolean cacheDataInL1;
+  private boolean dropBehindCacheCompaction;
+  private Configuration hadoopConf;
+  private BloomFilter bloomFilter;
+
+  // This is private in CacheConfig so have been copied here.
+  private static boolean DROP_BEHIND_CACHE_COMPACTION_DEFAULT = true;
+
+  public HoodieHFileConfig(Configuration hadoopConf, Compression.Algorithm compressionAlgorithm, int blockSize,
+                           long maxFileSize, BloomFilter bloomFilter) {
+    this(hadoopConf, compressionAlgorithm, blockSize, maxFileSize, CacheConfig.DEFAULT_PREFETCH_ON_OPEN,
+        HColumnDescriptor.DEFAULT_CACHE_DATA_IN_L1, DROP_BEHIND_CACHE_COMPACTION_DEFAULT, bloomFilter);
+  }
+
+  public HoodieHFileConfig(Configuration hadoopConf, Compression.Algorithm compressionAlgorithm, int blockSize,
+                           long maxFileSize, boolean prefetchBlocksOnOpen, boolean cacheDataInL1,
+                           boolean dropBehindCacheCompaction, BloomFilter bloomFilter) {
+    this.hadoopConf = hadoopConf;
+    this.compressionAlgorithm = compressionAlgorithm;
+    this.blockSize = blockSize;
+    this.maxFileSize = maxFileSize;
+    this.prefetchBlocksOnOpen = prefetchBlocksOnOpen;
+    this.cacheDataInL1 = cacheDataInL1;
+    this.dropBehindCacheCompaction = dropBehindCacheCompaction;
+    this.bloomFilter = bloomFilter;
+  }
+
+  public Configuration getHadoopConf() {
+    return hadoopConf;
+  }
+
+  public Compression.Algorithm getCompressionAlgorithm() {
+    return compressionAlgorithm;
+  }
+
+  public int getBlockSize() {
+    return blockSize;
+  }
+
+  public long getMaxFileSize() {
+    return maxFileSize;
+  }
+
+  public boolean shouldPrefetchBlocksOnOpen() {
+    return prefetchBlocksOnOpen;
+  }
+
+  public boolean shouldCacheDataInL1() {
+    return cacheDataInL1;
+  }
+
+  public boolean shouldDropBehindCacheCompaction() {
+    return dropBehindCacheCompaction;
+  }
+
+  public boolean useBloomFilter() {
+    return bloomFilter != null;
+  }
+
+  public BloomFilter getBloomFilter() {
+    return bloomFilter;
+  }
+}
diff --git a/hudi-client/src/main/java/org/apache/hudi/io/storage/HoodieHFileWriter.java b/hudi-client/src/main/java/org/apache/hudi/io/storage/HoodieHFileWriter.java
new file mode 100644
index 0000000..c7f549a
--- /dev/null
+++ b/hudi-client/src/main/java/org/apache/hudi/io/storage/HoodieHFileWriter.java
@@ -0,0 +1,166 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hudi.io.storage;
+
+import org.apache.hudi.avro.HoodieAvroUtils;
+import org.apache.hudi.client.SparkTaskContextSupplier;
+import org.apache.hudi.common.bloom.BloomFilter;
+import org.apache.hudi.common.fs.FSUtils;
+import org.apache.hudi.common.fs.HoodieWrapperFileSystem;
+import org.apache.hudi.common.model.HoodieRecord;
+import org.apache.hudi.common.model.HoodieRecordPayload;
+import org.apache.avro.Schema;
+import org.apache.avro.generic.GenericRecord;
+import org.apache.avro.generic.IndexedRecord;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.hbase.HColumnDescriptor;
+import org.apache.hadoop.hbase.KeyValue;
+import org.apache.hadoop.hbase.io.hfile.CacheConfig;
+import org.apache.hadoop.hbase.io.hfile.HFile;
+import org.apache.hadoop.hbase.io.hfile.HFileContext;
+import org.apache.hadoop.hbase.io.hfile.HFileContextBuilder;
+import org.apache.hadoop.io.Writable;
+
+import java.io.DataInput;
+import java.io.DataOutput;
+import java.io.IOException;
+import java.util.concurrent.atomic.AtomicLong;
+
+/**
+ * HoodieHFileWriter writes IndexedRecords into an HFile. The record's key is used as the key and the
+ * AVRO encoded record bytes are saved as the value.
+ *
+ * Limitations (compared to columnar formats like Parquet or ORC):
+ *  1. Records should be added in order of keys
+ *  2. There are no column stats
+ */
+public class HoodieHFileWriter<T extends HoodieRecordPayload, R extends IndexedRecord>
+    implements HoodieFileWriter<R> {
+  private static AtomicLong recordIndex = new AtomicLong(1);
+
+  private final Path file;
+  private HoodieHFileConfig hfileConfig;
+  private final HoodieWrapperFileSystem fs;
+  private final long maxFileSize;
+  private final String instantTime;
+  private final SparkTaskContextSupplier sparkTaskContextSupplier;
+  private HFile.Writer writer;
+  private String minRecordKey;
+  private String maxRecordKey;
+
+  // This is private in CacheConfig so have been copied here.
+  private static String DROP_BEHIND_CACHE_COMPACTION_KEY = "hbase.hfile.drop.behind.compaction";
+
+  public HoodieHFileWriter(String instantTime, Path file, HoodieHFileConfig hfileConfig, Schema schema,
+      SparkTaskContextSupplier sparkTaskContextSupplier) throws IOException {
+
+    Configuration conf = FSUtils.registerFileSystem(file, hfileConfig.getHadoopConf());
+    this.file = HoodieWrapperFileSystem.convertToHoodiePath(file, conf);
+    this.fs = (HoodieWrapperFileSystem) this.file.getFileSystem(conf);
+    this.hfileConfig = hfileConfig;
+
+    // TODO - compute this compression ratio dynamically by looking at the bytes written to the
+    // stream and the actual file size reported by HDFS
+    // this.maxFileSize = hfileConfig.getMaxFileSize()
+    //    + Math.round(hfileConfig.getMaxFileSize() * hfileConfig.getCompressionRatio());
+    this.maxFileSize = hfileConfig.getMaxFileSize();
+    this.instantTime = instantTime;
+    this.sparkTaskContextSupplier = sparkTaskContextSupplier;
+
+    HFileContext context = new HFileContextBuilder().withBlockSize(hfileConfig.getBlockSize())
+          .withCompression(hfileConfig.getCompressionAlgorithm())
+          .build();
+
+    conf.set(CacheConfig.PREFETCH_BLOCKS_ON_OPEN_KEY, String.valueOf(hfileConfig.shouldPrefetchBlocksOnOpen()));
+    conf.set(HColumnDescriptor.CACHE_DATA_IN_L1, String.valueOf(hfileConfig.shouldCacheDataInL1()));
+    conf.set(DROP_BEHIND_CACHE_COMPACTION_KEY, String.valueOf(hfileConfig.shouldDropBehindCacheCompaction()));
+    CacheConfig cacheConfig = new CacheConfig(conf);
+    this.writer = HFile.getWriterFactory(conf, cacheConfig).withPath(this.fs, this.file).withFileContext(context).create();
+
+    writer.appendFileInfo(HoodieHFileReader.KEY_SCHEMA.getBytes(), schema.toString().getBytes());
+  }
+
+  @Override
+  public void writeAvroWithMetadata(R avroRecord, HoodieRecord record) throws IOException {
+    String seqId =
+        HoodieRecord.generateSequenceId(instantTime, sparkTaskContextSupplier.getPartitionIdSupplier().get(), recordIndex.getAndIncrement());
+    HoodieAvroUtils.addHoodieKeyToRecord((GenericRecord) avroRecord, record.getRecordKey(), record.getPartitionPath(),
+        file.getName());
+    HoodieAvroUtils.addCommitMetadataToRecord((GenericRecord) avroRecord, instantTime, seqId);
+
+    writeAvro(record.getRecordKey(), (IndexedRecord)avroRecord);
+  }
+
+  @Override
+  public boolean canWrite() {
+    return fs.getBytesWritten(file) < maxFileSize;
+  }
+
+  @Override
+  public void writeAvro(String recordKey, IndexedRecord object) throws IOException {
+    byte[] value = HoodieAvroUtils.avroToBytes((GenericRecord)object);
+    KeyValue kv = new KeyValue(recordKey.getBytes(), null, null, value);
+    writer.append(kv);
+
+    if (hfileConfig.useBloomFilter()) {
+      hfileConfig.getBloomFilter().add(recordKey);
+      if (minRecordKey != null) {
+        minRecordKey = minRecordKey.compareTo(recordKey) <= 0 ? minRecordKey : recordKey;
+      } else {
+        minRecordKey = recordKey;
+      }
+
+      if (maxRecordKey != null) {
+        maxRecordKey = maxRecordKey.compareTo(recordKey) >= 0 ? maxRecordKey : recordKey;
+      } else {
+        maxRecordKey = recordKey;
+      }
+    }
+  }
+
+  @Override
+  public void close() throws IOException {
+    if (hfileConfig.useBloomFilter()) {
+      final BloomFilter bloomFilter = hfileConfig.getBloomFilter();
+      if (minRecordKey == null) {
+        minRecordKey = "";
+      }
+      if (maxRecordKey == null) {
+        maxRecordKey = "";
+      }
+      writer.appendFileInfo(HoodieHFileReader.KEY_MIN_RECORD.getBytes(), minRecordKey.getBytes());
+      writer.appendFileInfo(HoodieHFileReader.KEY_MAX_RECORD.getBytes(), maxRecordKey.getBytes());
+      writer.appendFileInfo(HoodieHFileReader.KEY_BLOOM_FILTER_TYPE_CODE.getBytes(),
+          bloomFilter.getBloomFilterTypeCode().toString().getBytes());
+      writer.appendMetaBlock(HoodieHFileReader.KEY_BLOOM_FILTER_META_BLOCK, new Writable() {
+        @Override
+        public void write(DataOutput out) throws IOException {
+          out.write(bloomFilter.serializeToString().getBytes());
+        }
+
+        @Override
+        public void readFields(DataInput in) throws IOException { }
+      });
+    }
+
+    writer.close();
+    writer = null;
+  }
+}
diff --git a/hudi-client/src/main/java/org/apache/hudi/table/HoodieCopyOnWriteTable.java b/hudi-client/src/main/java/org/apache/hudi/table/HoodieCopyOnWriteTable.java
index 6292e56..31ba537 100644
--- a/hudi-client/src/main/java/org/apache/hudi/table/HoodieCopyOnWriteTable.java
+++ b/hudi-client/src/main/java/org/apache/hudi/table/HoodieCopyOnWriteTable.java
@@ -37,6 +37,7 @@ import org.apache.hudi.exception.HoodieNotSupportedException;
 import org.apache.hudi.exception.HoodieUpsertException;
 import org.apache.hudi.io.HoodieCreateHandle;
 import org.apache.hudi.io.HoodieMergeHandle;
+import org.apache.hudi.io.HoodieSortedMergeHandle;
 import org.apache.hudi.table.action.HoodieWriteMetadata;
 import org.apache.hudi.table.action.bootstrap.BootstrapCommitActionExecutor;
 import org.apache.hudi.table.action.bootstrap.HoodieBootstrapWriteMetadata;
@@ -158,7 +159,6 @@ public class HoodieCopyOnWriteTable<T extends HoodieRecordPayload> extends Hoodi
       MergeHelper.runMerge(this, upsertHandle);
     }
 
-
     // TODO(vc): This needs to be revisited
     if (upsertHandle.getWriteStatus().getPartitionPath() == null) {
       LOG.info("Upsert Handle has partition path as null " + upsertHandle.getOldFilePath() + ", "
@@ -169,14 +169,19 @@ public class HoodieCopyOnWriteTable<T extends HoodieRecordPayload> extends Hoodi
 
   protected HoodieMergeHandle getUpdateHandle(String instantTime, String partitionPath, String fileId,
       Map<String, HoodieRecord<T>> keyToNewRecords, HoodieBaseFile dataFileToBeMerged) {
-    return new HoodieMergeHandle<>(config, instantTime, this, keyToNewRecords,
-            partitionPath, fileId, dataFileToBeMerged, sparkTaskContextSupplier);
+    if (requireSortedRecords()) {
+      return new HoodieSortedMergeHandle<>(config, instantTime, this, keyToNewRecords, partitionPath, fileId,
+          dataFileToBeMerged, sparkTaskContextSupplier);
+    } else {
+      return new HoodieMergeHandle<>(config, instantTime, this, keyToNewRecords, partitionPath, fileId,
+          dataFileToBeMerged, sparkTaskContextSupplier);
+    }
   }
 
   public Iterator<List<WriteStatus>> handleInsert(String instantTime, String partitionPath, String fileId,
-      Iterator<HoodieRecord<T>> recordItr) {
+      Map<String, HoodieRecord<? extends HoodieRecordPayload>> recordMap) {
     HoodieCreateHandle createHandle =
-        new HoodieCreateHandle(config, instantTime, this, partitionPath, fileId, recordItr, sparkTaskContextSupplier);
+        new HoodieCreateHandle(config, instantTime, this, partitionPath, fileId, recordMap, sparkTaskContextSupplier);
     createHandle.write();
     return Collections.singletonList(Collections.singletonList(createHandle.close())).iterator();
   }
diff --git a/hudi-client/src/main/java/org/apache/hudi/table/HoodieTable.java b/hudi-client/src/main/java/org/apache/hudi/table/HoodieTable.java
index 4ed32b4..71bcb31 100644
--- a/hudi-client/src/main/java/org/apache/hudi/table/HoodieTable.java
+++ b/hudi-client/src/main/java/org/apache/hudi/table/HoodieTable.java
@@ -593,6 +593,8 @@ public abstract class HoodieTable<T extends HoodieRecordPayload> implements Seri
     switch (getBaseFileFormat()) {
       case PARQUET:
         return HoodieLogBlockType.AVRO_DATA_BLOCK;
+      case HFILE:
+        return HoodieLogBlockType.HFILE_DATA_BLOCK;
       default:
         throw new HoodieException("Base file format " + getBaseFileFormat()
             + " does not have associated log block format");
@@ -602,4 +604,8 @@ public abstract class HoodieTable<T extends HoodieRecordPayload> implements Seri
   public String getBaseFileExtension() {
     return getBaseFileFormat().getFileExtension();
   }
+
+  public boolean requireSortedRecords() {
+    return getBaseFileFormat() == HoodieFileFormat.HFILE;
+  }
 }
diff --git a/hudi-client/src/main/java/org/apache/hudi/table/action/commit/BaseCommitActionExecutor.java b/hudi-client/src/main/java/org/apache/hudi/table/action/commit/BaseCommitActionExecutor.java
index 5408d44..9efacde 100644
--- a/hudi-client/src/main/java/org/apache/hudi/table/action/commit/BaseCommitActionExecutor.java
+++ b/hudi-client/src/main/java/org/apache/hudi/table/action/commit/BaseCommitActionExecutor.java
@@ -18,17 +18,11 @@
 
 package org.apache.hudi.table.action.commit;
 
-import java.io.IOException;
-import java.nio.charset.StandardCharsets;
-import java.time.Duration;
-import java.time.Instant;
-import java.util.Iterator;
-import java.util.List;
-import java.util.Map;
 import org.apache.hudi.client.SparkTaskContextSupplier;
 import org.apache.hudi.client.WriteStatus;
 import org.apache.hudi.client.utils.SparkConfigUtils;
 import org.apache.hudi.common.model.HoodieCommitMetadata;
+import org.apache.hudi.common.model.HoodieKey;
 import org.apache.hudi.common.model.HoodieRecord;
 import org.apache.hudi.common.model.HoodieRecordPayload;
 import org.apache.hudi.common.model.HoodieWriteStat;
@@ -50,9 +44,21 @@ import org.apache.hudi.table.action.HoodieWriteMetadata;
 import org.apache.log4j.LogManager;
 import org.apache.log4j.Logger;
 import org.apache.spark.Partitioner;
+import org.apache.spark.api.java.JavaPairRDD;
 import org.apache.spark.api.java.JavaRDD;
 import org.apache.spark.api.java.JavaSparkContext;
 import org.apache.spark.storage.StorageLevel;
+
+import java.io.IOException;
+import java.io.Serializable;
+import java.nio.charset.StandardCharsets;
+import java.time.Duration;
+import java.time.Instant;
+import java.util.Comparator;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Map;
+
 import scala.Tuple2;
 
 public abstract class BaseCommitActionExecutor<T extends HoodieRecordPayload<T>, R>
@@ -153,9 +159,26 @@ public abstract class BaseCommitActionExecutor<T extends HoodieRecordPayload<T>,
   }
 
   private JavaRDD<HoodieRecord<T>> partition(JavaRDD<HoodieRecord<T>> dedupedRecords, Partitioner partitioner) {
-    return dedupedRecords.mapToPair(
-        record -> new Tuple2<>(new Tuple2<>(record.getKey(), Option.ofNullable(record.getCurrentLocation())), record))
-        .partitionBy(partitioner).map(Tuple2::_2);
+    JavaPairRDD<Tuple2, HoodieRecord<T>> mappedRDD = dedupedRecords.mapToPair(
+        record -> new Tuple2<>(new Tuple2<>(record.getKey(), Option.ofNullable(record.getCurrentLocation())), record));
+
+    JavaPairRDD<Tuple2, HoodieRecord<T>> partitionedRDD;
+    if (table.requireSortedRecords()) {
+      // Partition and sort within each partition as a single step. This is faster than partitioning first and then
+      // applying a sort.
+      Comparator<Tuple2> comparator = (Comparator<Tuple2> & Serializable)(t1, t2) -> {
+        HoodieKey key1 = (HoodieKey) t1._1;
+        HoodieKey key2 = (HoodieKey) t2._1;
+        return key1.getRecordKey().compareTo(key2.getRecordKey());
+      };
+
+      partitionedRDD = mappedRDD.repartitionAndSortWithinPartitions(partitioner, comparator);
+    } else {
+      // Partition only
+      partitionedRDD = mappedRDD.partitionBy(partitioner);
+    }
+
+    return partitionedRDD.map(Tuple2::_2);
   }
 
   protected void updateIndexAndCommitIfNeeded(JavaRDD<WriteStatus> writeStatusRDD, HoodieWriteMetadata result) {
diff --git a/hudi-client/src/main/java/org/apache/hudi/table/action/commit/CommitActionExecutor.java b/hudi-client/src/main/java/org/apache/hudi/table/action/commit/CommitActionExecutor.java
index fc721ec..f35acaf 100644
--- a/hudi-client/src/main/java/org/apache/hudi/table/action/commit/CommitActionExecutor.java
+++ b/hudi-client/src/main/java/org/apache/hudi/table/action/commit/CommitActionExecutor.java
@@ -28,6 +28,7 @@ import org.apache.hudi.config.HoodieWriteConfig;
 import org.apache.hudi.exception.HoodieUpsertException;
 import org.apache.hudi.execution.LazyInsertIterable;
 import org.apache.hudi.io.HoodieMergeHandle;
+import org.apache.hudi.io.HoodieSortedMergeHandle;
 import org.apache.hudi.table.HoodieTable;
 import org.apache.hudi.table.WorkloadProfile;
 import org.apache.hudi.table.action.HoodieWriteMetadata;
@@ -98,7 +99,11 @@ public abstract class CommitActionExecutor<T extends HoodieRecordPayload<T>>
   }
 
   protected HoodieMergeHandle getUpdateHandle(String partitionPath, String fileId, Iterator<HoodieRecord<T>> recordItr) {
-    return new HoodieMergeHandle<>(config, instantTime, (HoodieTable<T>)table, recordItr, partitionPath, fileId, sparkTaskContextSupplier);
+    if (table.requireSortedRecords()) {
+      return new HoodieSortedMergeHandle<>(config, instantTime, (HoodieTable<T>)table, recordItr, partitionPath, fileId, sparkTaskContextSupplier);
+    } else {
+      return new HoodieMergeHandle<>(config, instantTime, (HoodieTable<T>)table, recordItr, partitionPath, fileId, sparkTaskContextSupplier);
+    }
   }
 
   protected HoodieMergeHandle getUpdateHandle(String partitionPath, String fileId,
diff --git a/hudi-client/src/main/java/org/apache/hudi/table/action/compact/HoodieMergeOnReadTableCompactor.java b/hudi-client/src/main/java/org/apache/hudi/table/action/compact/HoodieMergeOnReadTableCompactor.java
index 80afcac..c4343f8 100644
--- a/hudi-client/src/main/java/org/apache/hudi/table/action/compact/HoodieMergeOnReadTableCompactor.java
+++ b/hudi-client/src/main/java/org/apache/hudi/table/action/compact/HoodieMergeOnReadTableCompactor.java
@@ -135,15 +135,14 @@ public class HoodieMergeOnReadTableCompactor implements HoodieCompactor {
 
     // Compacting is very similar to applying updates to existing file
     Iterator<List<WriteStatus>> result;
-    // If the dataFile is present, there is a base parquet file present, perform updates else perform inserts into a
-    // new base parquet file.
+    // If the dataFile is present, perform updates else perform inserts into a new base file.
     if (oldDataFileOpt.isPresent()) {
       result = hoodieCopyOnWriteTable.handleUpdate(instantTime, operation.getPartitionPath(),
               operation.getFileId(), scanner.getRecords(),
           oldDataFileOpt.get());
     } else {
       result = hoodieCopyOnWriteTable.handleInsert(instantTime, operation.getPartitionPath(), operation.getFileId(),
-          scanner.iterator());
+          scanner.getRecords());
     }
     Iterable<List<WriteStatus>> resultIterable = () -> result;
     return StreamSupport.stream(resultIterable.spliterator(), false).flatMap(Collection::stream).peek(s -> {
diff --git a/hudi-client/src/test/java/org/apache/hudi/client/TestHoodieClientOnCopyOnWriteStorage.java b/hudi-client/src/test/java/org/apache/hudi/client/TestHoodieClientOnCopyOnWriteStorage.java
index 24e538e..ba4ffb4 100644
--- a/hudi-client/src/test/java/org/apache/hudi/client/TestHoodieClientOnCopyOnWriteStorage.java
+++ b/hudi-client/src/test/java/org/apache/hudi/client/TestHoodieClientOnCopyOnWriteStorage.java
@@ -1261,7 +1261,8 @@ public class TestHoodieClientOnCopyOnWriteStorage extends HoodieClientTestBase {
                 .insertSplitSize(insertSplitSize).build())
         .withStorageConfig(
             HoodieStorageConfig.newBuilder()
-                .limitFileSize(dataGen.getEstimatedFileSizeInBytes(200)).build())
+                .hfileMaxFileSize(dataGen.getEstimatedFileSizeInBytes(200))
+                .parquetMaxFileSize(dataGen.getEstimatedFileSizeInBytes(200)).build())
         .build();
   }
 }
diff --git a/hudi-client/src/test/java/org/apache/hudi/client/TestTableSchemaEvolution.java b/hudi-client/src/test/java/org/apache/hudi/client/TestTableSchemaEvolution.java
index 21bf601..0e48746 100644
--- a/hudi-client/src/test/java/org/apache/hudi/client/TestTableSchemaEvolution.java
+++ b/hudi-client/src/test/java/org/apache/hudi/client/TestTableSchemaEvolution.java
@@ -411,7 +411,7 @@ public class TestTableSchemaEvolution extends HoodieClientTestBase {
   private void checkReadRecords(String instantTime, int numExpectedRecords) throws IOException {
     if (tableType == HoodieTableType.COPY_ON_WRITE) {
       HoodieTimeline timeline = metaClient.reloadActiveTimeline().getCommitTimeline();
-      assertEquals(numExpectedRecords, HoodieClientTestUtils.readSince(basePath, sqlContext, timeline, instantTime).count());
+      assertEquals(numExpectedRecords, HoodieClientTestUtils.countRecordsSince(jsc, basePath, sqlContext, timeline, instantTime));
     } else {
       // TODO: This code fails to read records under the following conditions:
       // 1. No parquet files yet (i.e. no compaction done yet)
diff --git a/hudi-client/src/test/java/org/apache/hudi/client/TestUpdateSchemaEvolution.java b/hudi-client/src/test/java/org/apache/hudi/client/TestUpdateSchemaEvolution.java
index 2287979..8b42671 100644
--- a/hudi-client/src/test/java/org/apache/hudi/client/TestUpdateSchemaEvolution.java
+++ b/hudi-client/src/test/java/org/apache/hudi/client/TestUpdateSchemaEvolution.java
@@ -45,6 +45,9 @@ import java.io.IOException;
 import java.util.ArrayList;
 import java.util.Arrays;
 import java.util.List;
+import java.util.Map;
+import java.util.function.Function;
+import java.util.stream.Collectors;
 
 import static org.apache.hudi.common.testutils.SchemaTestUtil.getSchemaFromResource;
 import static org.junit.jupiter.api.Assertions.assertDoesNotThrow;
@@ -89,8 +92,10 @@ public class TestUpdateSchemaEvolution extends HoodieClientTestHarness {
       insertRecords
           .add(new HoodieRecord(new HoodieKey(rowChange3.getRowKey(), rowChange3.getPartitionPath()), rowChange3));
 
+      Map<String, HoodieRecord> insertRecordMap = insertRecords.stream()
+          .collect(Collectors.toMap(r -> r.getRecordKey(), Function.identity()));
       HoodieCreateHandle createHandle =
-          new HoodieCreateHandle(config, "100", table, rowChange1.getPartitionPath(), "f1-0", insertRecords.iterator(), supplier);
+          new HoodieCreateHandle(config, "100", table, rowChange1.getPartitionPath(), "f1-0", insertRecordMap, supplier);
       createHandle.write();
       return createHandle.close();
     }).collect();
diff --git a/hudi-client/src/test/java/org/apache/hudi/index/TestHoodieIndex.java b/hudi-client/src/test/java/org/apache/hudi/index/TestHoodieIndex.java
index 39c8201..257f732 100644
--- a/hudi-client/src/test/java/org/apache/hudi/index/TestHoodieIndex.java
+++ b/hudi-client/src/test/java/org/apache/hudi/index/TestHoodieIndex.java
@@ -441,7 +441,7 @@ public class TestHoodieIndex extends HoodieClientTestHarness {
         .withWriteStatusClass(MetadataMergeWriteStatus.class)
         .withConsistencyGuardConfig(ConsistencyGuardConfig.newBuilder().withConsistencyCheckEnabled(true).build())
         .withCompactionConfig(HoodieCompactionConfig.newBuilder().compactionSmallFileSize(1024 * 1024).build())
-        .withStorageConfig(HoodieStorageConfig.newBuilder().limitFileSize(1024 * 1024).build())
+        .withStorageConfig(HoodieStorageConfig.newBuilder().hfileMaxFileSize(1024 * 1024).parquetMaxFileSize(1024 * 1024).build())
         .forTable("test-trip-table")
         .withIndexConfig(HoodieIndexConfig.newBuilder().withIndexType(indexType).build())
         .withEmbeddedTimelineServerEnabled(true).withFileSystemViewConfig(FileSystemViewStorageConfig.newBuilder()
diff --git a/hudi-client/src/test/java/org/apache/hudi/index/hbase/TestHBaseIndex.java b/hudi-client/src/test/java/org/apache/hudi/index/hbase/TestHBaseIndex.java
index 20406cd..b68cba6 100644
--- a/hudi-client/src/test/java/org/apache/hudi/index/hbase/TestHBaseIndex.java
+++ b/hudi-client/src/test/java/org/apache/hudi/index/hbase/TestHBaseIndex.java
@@ -465,7 +465,8 @@ public class TestHBaseIndex extends FunctionalTestHarness {
         .withParallelism(1, 1)
         .withCompactionConfig(HoodieCompactionConfig.newBuilder().compactionSmallFileSize(1024 * 1024)
             .withInlineCompaction(false).build())
-        .withAutoCommit(false).withStorageConfig(HoodieStorageConfig.newBuilder().limitFileSize(1024 * 1024).build())
+        .withAutoCommit(false).withStorageConfig(HoodieStorageConfig.newBuilder()
+            .hfileMaxFileSize(1024 * 1024).parquetMaxFileSize(1024 * 1024).build())
         .forTable("test-trip-table")
         .withIndexConfig(HoodieIndexConfig.newBuilder().withIndexType(HoodieIndex.IndexType.HBASE)
             .withHBaseIndexConfig(new HoodieHBaseIndexConfig.Builder()
diff --git a/hudi-client/src/test/java/org/apache/hudi/index/hbase/TestHBaseQPSResourceAllocator.java b/hudi-client/src/test/java/org/apache/hudi/index/hbase/TestHBaseQPSResourceAllocator.java
index d7591e6..8972d00 100644
--- a/hudi-client/src/test/java/org/apache/hudi/index/hbase/TestHBaseQPSResourceAllocator.java
+++ b/hudi-client/src/test/java/org/apache/hudi/index/hbase/TestHBaseQPSResourceAllocator.java
@@ -76,7 +76,8 @@ public class TestHBaseQPSResourceAllocator {
         .withParallelism(1, 1)
         .withCompactionConfig(HoodieCompactionConfig.newBuilder().compactionSmallFileSize(1024 * 1024)
             .withInlineCompaction(false).build())
-        .withAutoCommit(false).withStorageConfig(HoodieStorageConfig.newBuilder().limitFileSize(1024 * 1024).build())
+        .withAutoCommit(false).withStorageConfig(HoodieStorageConfig.newBuilder()
+            .hfileMaxFileSize(1000 * 1024).parquetMaxFileSize(1024 * 1024).build())
         .forTable("test-trip-table").withIndexConfig(HoodieIndexConfig.newBuilder()
             .withIndexType(HoodieIndex.IndexType.HBASE).withHBaseIndexConfig(hoodieHBaseIndexConfig).build());
   }
diff --git a/hudi-client/src/test/java/org/apache/hudi/io/TestHoodieKeyLocationFetchHandle.java b/hudi-client/src/test/java/org/apache/hudi/io/TestHoodieKeyLocationFetchHandle.java
index 4784854..22337f5 100644
--- a/hudi-client/src/test/java/org/apache/hudi/io/TestHoodieKeyLocationFetchHandle.java
+++ b/hudi-client/src/test/java/org/apache/hudi/io/TestHoodieKeyLocationFetchHandle.java
@@ -178,7 +178,7 @@ public class TestHoodieKeyLocationFetchHandle extends HoodieClientTestHarness {
         .withWriteStatusClass(MetadataMergeWriteStatus.class)
         .withConsistencyGuardConfig(ConsistencyGuardConfig.newBuilder().withConsistencyCheckEnabled(true).build())
         .withCompactionConfig(HoodieCompactionConfig.newBuilder().compactionSmallFileSize(1024 * 1024).build())
-        .withStorageConfig(HoodieStorageConfig.newBuilder().limitFileSize(1024 * 1024).build())
+        .withStorageConfig(HoodieStorageConfig.newBuilder().hfileMaxFileSize(1024 * 1024).parquetMaxFileSize(1024 * 1024).build())
         .forTable("test-trip-table")
         .withIndexConfig(HoodieIndexConfig.newBuilder().build())
         .withEmbeddedTimelineServerEnabled(true).withFileSystemViewConfig(FileSystemViewStorageConfig.newBuilder()
diff --git a/hudi-client/src/test/java/org/apache/hudi/io/TestHoodieMergeHandle.java b/hudi-client/src/test/java/org/apache/hudi/io/TestHoodieMergeHandle.java
index b45eae0..76baa71 100644
--- a/hudi-client/src/test/java/org/apache/hudi/io/TestHoodieMergeHandle.java
+++ b/hudi-client/src/test/java/org/apache/hudi/io/TestHoodieMergeHandle.java
@@ -310,7 +310,7 @@ public class TestHoodieMergeHandle extends HoodieClientTestHarness {
     return HoodieWriteConfig.newBuilder().withPath(basePath).withSchema(HoodieTestDataGenerator.TRIP_EXAMPLE_SCHEMA)
         .withParallelism(2, 2)
         .withCompactionConfig(HoodieCompactionConfig.newBuilder().compactionSmallFileSize(1024 * 1024).build())
-        .withStorageConfig(HoodieStorageConfig.newBuilder().limitFileSize(1024 * 1024).build())
+        .withStorageConfig(HoodieStorageConfig.newBuilder().hfileMaxFileSize(1024 * 1024).parquetMaxFileSize(1024 * 1024).build())
         .forTable("test-trip-table")
         .withIndexConfig(HoodieIndexConfig.newBuilder().withIndexType(HoodieIndex.IndexType.BLOOM).build())
         .withBulkInsertParallelism(2).withWriteStatusClass(TestWriteStatus.class);
diff --git a/hudi-client/src/test/java/org/apache/hudi/io/storage/TestHoodieFileWriterFactory.java b/hudi-client/src/test/java/org/apache/hudi/io/storage/TestHoodieFileWriterFactory.java
index ed740ea..6ea2b7e 100644
--- a/hudi-client/src/test/java/org/apache/hudi/io/storage/TestHoodieFileWriterFactory.java
+++ b/hudi-client/src/test/java/org/apache/hudi/io/storage/TestHoodieFileWriterFactory.java
@@ -50,6 +50,11 @@ public class TestHoodieFileWriterFactory extends HoodieClientTestBase {
         parquetPath, table, cfg, HoodieTestDataGenerator.AVRO_SCHEMA, supplier);
     assertTrue(parquetWriter instanceof HoodieParquetWriter);
 
+    final Path hfilePath = new Path(basePath + "/partition/path/f1_1-0-1_000.hfile");
+    HoodieFileWriter<IndexedRecord> hfileWriter = HoodieFileWriterFactory.getFileWriter(instantTime,
+        hfilePath, table, cfg, HoodieTestDataGenerator.AVRO_SCHEMA, supplier);
+    assertTrue(hfileWriter instanceof HoodieHFileWriter);
+
     // other file format exception.
     final Path logPath = new Path(basePath + "/partition/path/f.b51192a8-574b-4a85-b246-bcfec03ac8bf_100.log.2_1-0-1");
     final Throwable thrown = assertThrows(UnsupportedOperationException.class, () -> {
diff --git a/hudi-client/src/test/java/org/apache/hudi/table/TestHoodieMergeOnReadTable.java b/hudi-client/src/test/java/org/apache/hudi/table/TestHoodieMergeOnReadTable.java
index 09ff604..fb16af0 100644
--- a/hudi-client/src/test/java/org/apache/hudi/table/TestHoodieMergeOnReadTable.java
+++ b/hudi-client/src/test/java/org/apache/hudi/table/TestHoodieMergeOnReadTable.java
@@ -49,7 +49,9 @@ import org.apache.hudi.config.HoodieIndexConfig;
 import org.apache.hudi.config.HoodieStorageConfig;
 import org.apache.hudi.config.HoodieWriteConfig;
 import org.apache.hudi.exception.HoodieIOException;
+import org.apache.hudi.hadoop.HoodieHFileInputFormat;
 import org.apache.hudi.hadoop.HoodieParquetInputFormat;
+import org.apache.hudi.hadoop.realtime.HoodieHFileRealtimeInputFormat;
 import org.apache.hudi.hadoop.realtime.HoodieParquetRealtimeInputFormat;
 import org.apache.hudi.hadoop.utils.HoodieHiveUtils;
 import org.apache.hudi.hadoop.utils.HoodieInputFormatUtils;
@@ -69,9 +71,9 @@ import org.apache.hadoop.mapred.FileInputFormat;
 import org.apache.hadoop.mapred.JobConf;
 import org.apache.spark.api.java.JavaRDD;
 import org.junit.jupiter.api.AfterEach;
+import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.api.Test;
 import org.junit.jupiter.api.io.TempDir;
-import org.junit.jupiter.params.ParameterizedTest;
-import org.junit.jupiter.params.provider.MethodSource;
 
 import java.io.File;
 import java.io.IOException;
@@ -100,15 +102,11 @@ public class TestHoodieMergeOnReadTable extends HoodieClientTestHarness {
 
   @TempDir
   public java.nio.file.Path tempFolder;
-  private HoodieFileFormat baseFileFormat;
 
-  static Stream<HoodieFileFormat> argumentsProvider() {
-    return Stream.of(HoodieFileFormat.PARQUET);
-  }
+  private HoodieFileFormat baseFileFormat;
 
   public void init(HoodieFileFormat baseFileFormat) throws IOException {
     this.baseFileFormat = baseFileFormat;
-
     initDFS();
     initSparkContexts("TestHoodieMergeOnReadTable");
     hadoopConf.addResource(dfs.getConf());
@@ -122,15 +120,65 @@ public class TestHoodieMergeOnReadTable extends HoodieClientTestHarness {
     rtJobConf = new JobConf(hadoopConf);
   }
 
+  @BeforeEach
+  public void init() throws IOException {
+    init(HoodieFileFormat.PARQUET);
+  }
+
   @AfterEach
   public void clean() throws IOException {
     cleanupResources();
   }
 
-  @ParameterizedTest
-  @MethodSource("argumentsProvider")
-  public void testSimpleInsertAndUpdate(HoodieFileFormat baseFileFormat) throws Exception {
-    init(baseFileFormat);
+  @Test
+  public void testSimpleInsertAndUpdate() throws Exception {
+    HoodieWriteConfig cfg = getConfig(true);
+    try (HoodieWriteClient client = getHoodieWriteClient(cfg);) {
+
+      /**
+       * Write 1 (only inserts)
+       */
+      String newCommitTime = "001";
+      client.startCommitWithTime(newCommitTime);
+
+      List<HoodieRecord> records = dataGen.generateInserts(newCommitTime, 200);
+      insertAndGetFilePaths(records, client, cfg, newCommitTime);
+
+      /**
+       * Write 2 (updates)
+       */
+      newCommitTime = "004";
+      client.startCommitWithTime(newCommitTime);
+      records = dataGen.generateUpdates(newCommitTime, 100);
+      updateAndGetFilePaths(records, client, cfg, newCommitTime);
+
+      String compactionCommitTime = client.scheduleCompaction(Option.empty()).get().toString();
+      client.compact(compactionCommitTime);
+
+      HoodieTable hoodieTable = HoodieTable.create(metaClient, cfg, hadoopConf);
+      FileStatus[] allFiles = listAllDataFilesInPath(hoodieTable, cfg.getBasePath());
+      tableView = getHoodieTableFileSystemView(metaClient, hoodieTable.getCompletedCommitsTimeline(), allFiles);
+      HoodieTableFileSystemView roView = new HoodieTableFileSystemView(metaClient, hoodieTable.getCompletedCommitsTimeline(), allFiles);
+      Stream<HoodieBaseFile> dataFilesToRead = tableView.getLatestBaseFiles();
+      assertTrue(dataFilesToRead.findAny().isPresent());
+
+      // verify that there is a commit
+      metaClient = HoodieTableMetaClient.reload(metaClient);
+      HoodieTimeline timeline = metaClient.getCommitTimeline().filterCompletedInstants();
+      assertEquals(1, timeline.findInstantsAfter("000", Integer.MAX_VALUE).countInstants(),
+          "Expecting a single commit.");
+      String latestCompactionCommitTime = timeline.lastInstant().get().getTimestamp();
+      assertTrue(HoodieTimeline.compareTimestamps("000", HoodieTimeline.LESSER_THAN, latestCompactionCommitTime));
+
+      assertEquals(200, HoodieClientTestUtils.countRecordsSince(jsc, basePath, sqlContext, timeline, "000"),
+          "Must contain 200 records");
+    }
+  }
+
+  @Test
+  public void testSimpleInsertAndUpdateHFile() throws Exception {
+    clean();
+    init(HoodieFileFormat.HFILE);
 
     HoodieWriteConfig cfg = getConfig(true);
     try (HoodieWriteClient client = getHoodieWriteClient(cfg);) {
@@ -170,18 +218,15 @@ public class TestHoodieMergeOnReadTable extends HoodieClientTestHarness {
       String latestCompactionCommitTime = timeline.lastInstant().get().getTimestamp();
       assertTrue(HoodieTimeline.compareTimestamps("000", HoodieTimeline.LESSER_THAN, latestCompactionCommitTime));
 
-      assertEquals(200, HoodieClientTestUtils.readSince(basePath, sqlContext, timeline, "000").count(),
+      assertEquals(200, HoodieClientTestUtils.countRecordsSince(jsc, basePath, sqlContext, timeline, "000"),
           "Must contain 200 records");
     }
   }
 
   // test incremental read does not go past compaction instant for RO views
   // For RT views, incremental read can go past compaction
-  @ParameterizedTest
-  @MethodSource("argumentsProvider")
-  public void testIncrementalReadsWithCompaction(HoodieFileFormat baseFileFormat) throws Exception {
-    init(baseFileFormat);
-
+  @Test
+  public void testIncrementalReadsWithCompaction() throws Exception {
     String partitionPath = "2020/02/20"; // use only one partition for this test
     dataGen = new HoodieTestDataGenerator(new String[] { partitionPath });
     HoodieWriteConfig cfg = getConfig(true);
@@ -279,11 +324,8 @@ public class TestHoodieMergeOnReadTable extends HoodieClientTestHarness {
   }
 
   // Check if record level metadata is aggregated properly at the end of write.
-  @ParameterizedTest
-  @MethodSource("argumentsProvider")
-  public void testMetadataAggregateFromWriteStatus(HoodieFileFormat baseFileFormat) throws Exception {
-    init(baseFileFormat);
-
+  @Test
+  public void testMetadataAggregateFromWriteStatus() throws Exception {
     HoodieWriteConfig cfg = getConfigBuilder(false).withWriteStatusClass(MetadataMergeWriteStatus.class).build();
     try (HoodieWriteClient client = getHoodieWriteClient(cfg);) {
 
@@ -305,11 +347,8 @@ public class TestHoodieMergeOnReadTable extends HoodieClientTestHarness {
     }
   }
 
-  @ParameterizedTest
-  @MethodSource("argumentsProvider")
-  public void testSimpleInsertUpdateAndDelete(HoodieFileFormat baseFileFormat) throws Exception {
-    init(baseFileFormat);
-
+  @Test
+  public void testSimpleInsertUpdateAndDelete() throws Exception {
     HoodieWriteConfig cfg = getConfig(true);
     try (HoodieWriteClient client = getHoodieWriteClient(cfg);) {
 
@@ -388,10 +427,9 @@ public class TestHoodieMergeOnReadTable extends HoodieClientTestHarness {
     }
   }
 
-  private void testCOWToMORConvertedTableRollback(HoodieFileFormat baseFileFormat, Boolean rollbackUsingMarkers) throws Exception {
-    init(baseFileFormat);
+  private void testCOWToMORConvertedTableRollback(Boolean rollbackUsingMarkers) throws Exception {
     // Set TableType to COW
-    HoodieTestUtils.init(hadoopConf, basePath, HoodieTableType.COPY_ON_WRITE, baseFileFormat);
+    HoodieTestUtils.init(hadoopConf, basePath, HoodieTableType.COPY_ON_WRITE);
 
     HoodieWriteConfig cfg = getConfig(false, rollbackUsingMarkers);
     try (HoodieWriteClient client = getHoodieWriteClient(cfg);) {
@@ -428,7 +466,7 @@ public class TestHoodieMergeOnReadTable extends HoodieClientTestHarness {
       assertNoWriteErrors(statuses);
 
       // Set TableType to MOR
-      HoodieTestUtils.init(hadoopConf, basePath, HoodieTableType.MERGE_ON_READ, baseFileFormat);
+      HoodieTestUtils.init(hadoopConf, basePath, HoodieTableType.MERGE_ON_READ);
 
       // rollback a COW commit when TableType is MOR
       client.rollback(newCommitTime);
@@ -443,22 +481,19 @@ public class TestHoodieMergeOnReadTable extends HoodieClientTestHarness {
     }
   }
 
-  @ParameterizedTest
-  @MethodSource("argumentsProvider")
-  public void testCOWToMORConvertedTableRollbackUsingFileList(HoodieFileFormat baseFileFormat) throws Exception {
-    testCOWToMORConvertedTableRollback(baseFileFormat, false);
+  @Test
+  public void testCOWToMORConvertedTableRollbackUsingFileList() throws Exception {
+    testCOWToMORConvertedTableRollback(false);
   }
 
-  @ParameterizedTest
-  @MethodSource("argumentsProvider")
-  public void testCOWToMORConvertedTableRollbackUsingMarkers(HoodieFileFormat baseFileFormat) throws Exception {
-    testCOWToMORConvertedTableRollback(baseFileFormat, true);
+  @Test
+  public void testCOWToMORConvertedTableRollbackUsingMarkers() throws Exception {
+    testCOWToMORConvertedTableRollback(true);
   }
 
-  private void testRollbackWithDeltaAndCompactionCommit(HoodieFileFormat baseFileFormat, Boolean rollbackUsingMarkers) throws Exception {
-    init(baseFileFormat);
-
+  private void testRollbackWithDeltaAndCompactionCommit(Boolean rollbackUsingMarkers) throws Exception {
     HoodieWriteConfig cfg = getConfig(false, rollbackUsingMarkers);
+
     try (HoodieWriteClient client = getHoodieWriteClient(cfg);) {
 
       // Test delta commit rollback
@@ -604,23 +639,18 @@ public class TestHoodieMergeOnReadTable extends HoodieClientTestHarness {
     }
   }
 
-  @ParameterizedTest
-  @MethodSource("argumentsProvider")
-  public void testRollbackWithDeltaAndCompactionCommitUsingFileList(HoodieFileFormat baseFileFormat) throws Exception {
-    testRollbackWithDeltaAndCompactionCommit(baseFileFormat, false);
+  @Test
+  public void testRollbackWithDeltaAndCompactionCommitUsingFileList() throws Exception {
+    testRollbackWithDeltaAndCompactionCommit(false);
   }
 
-  @ParameterizedTest
-  @MethodSource("argumentsProvider")
-  public void testRollbackWithDeltaAndCompactionCommitUsingMarkers(HoodieFileFormat baseFileFormat) throws Exception {
-    testRollbackWithDeltaAndCompactionCommit(baseFileFormat, true);
+  @Test
+  public void testRollbackWithDeltaAndCompactionCommitUsingMarkers() throws Exception {
+    testRollbackWithDeltaAndCompactionCommit(true);
   }
 
-  @ParameterizedTest
-  @MethodSource("argumentsProvider")
-  public void testMultiRollbackWithDeltaAndCompactionCommit(HoodieFileFormat baseFileFormat) throws Exception {
-    init(baseFileFormat);
-
+  @Test
+  public void testMultiRollbackWithDeltaAndCompactionCommit() throws Exception {
     HoodieWriteConfig cfg = getConfig(false);
     try (final HoodieWriteClient client = getHoodieWriteClient(cfg);) {
       /**
@@ -777,15 +807,12 @@ public class TestHoodieMergeOnReadTable extends HoodieClientTestHarness {
         .withCompactionConfig(HoodieCompactionConfig.newBuilder().compactionSmallFileSize(1024)
             .withInlineCompaction(false).withMaxNumDeltaCommitsBeforeCompaction(1).build())
         .withEmbeddedTimelineServerEnabled(true)
-        .withStorageConfig(HoodieStorageConfig.newBuilder().limitFileSize(1024).build()).forTable("test-trip-table")
+        .withStorageConfig(HoodieStorageConfig.newBuilder().hfileMaxFileSize(1024).parquetMaxFileSize(1024).build()).forTable("test-trip-table")
         .build();
   }
 
-  @ParameterizedTest
-  @MethodSource("argumentsProvider")
-  public void testUpsertPartitioner(HoodieFileFormat baseFileFormat) throws Exception {
-    init(baseFileFormat);
-
+  @Test
+  public void testUpsertPartitioner() throws Exception {
     HoodieWriteConfig cfg = getConfig(true);
     try (HoodieWriteClient client = getHoodieWriteClient(cfg);) {
 
@@ -863,11 +890,8 @@ public class TestHoodieMergeOnReadTable extends HoodieClientTestHarness {
     }
   }
 
-  @ParameterizedTest
-  @MethodSource("argumentsProvider")
-  public void testLogFileCountsAfterCompaction(HoodieFileFormat baseFileFormat) throws Exception {
-    init(baseFileFormat);
-
+  @Test
+  public void testLogFileCountsAfterCompaction() throws Exception {
     // insert 100 records
     HoodieWriteConfig config = getConfig(true);
     try (HoodieWriteClient writeClient = getHoodieWriteClient(config);) {
@@ -939,11 +963,8 @@ public class TestHoodieMergeOnReadTable extends HoodieClientTestHarness {
     }
   }
 
-  @ParameterizedTest
-  @MethodSource("argumentsProvider")
-  public void testSimpleInsertsGeneratedIntoLogFiles(HoodieFileFormat baseFileFormat) throws Exception {
-    init(baseFileFormat);
-
+  @Test
+  public void testSimpleInsertsGeneratedIntoLogFiles() throws Exception {
     // insert 100 records
     // Setting IndexType to be InMemory to simulate Global Index nature
     HoodieWriteConfig config = getConfigBuilder(false, IndexType.INMEMORY).build();
@@ -979,10 +1000,7 @@ public class TestHoodieMergeOnReadTable extends HoodieClientTestHarness {
     }
   }
 
-  private void testInsertsGeneratedIntoLogFilesRollback(HoodieFileFormat baseFileFormat,
-                                                        Boolean rollbackUsingMarkers) throws Exception {
-    init(baseFileFormat);
-
+  private void testInsertsGeneratedIntoLogFilesRollback(Boolean rollbackUsingMarkers) throws Exception {
     // insert 100 records
     // Setting IndexType to be InMemory to simulate Global Index nature
     HoodieWriteConfig config = getConfigBuilder(false, rollbackUsingMarkers, IndexType.INMEMORY).build();
@@ -1069,22 +1087,17 @@ public class TestHoodieMergeOnReadTable extends HoodieClientTestHarness {
     }
   }
 
-  @ParameterizedTest
-  @MethodSource("argumentsProvider")
-  public void testInsertsGeneratedIntoLogFilesRollbackUsingFileList(HoodieFileFormat baseFileFormat) throws Exception {
-    testInsertsGeneratedIntoLogFilesRollback(baseFileFormat, false);
+  @Test
+  public void testInsertsGeneratedIntoLogFilesRollbackUsingFileList() throws Exception {
+    testInsertsGeneratedIntoLogFilesRollback(false);
   }
 
-  @ParameterizedTest
-  @MethodSource("argumentsProvider")
-  public void testInsertsGeneratedIntoLogFilesRollbackUsingMarkers(HoodieFileFormat baseFileFormat) throws Exception {
-    testInsertsGeneratedIntoLogFilesRollback(baseFileFormat, true);
+  @Test
+  public void testInsertsGeneratedIntoLogFilesRollbackUsingMarkers() throws Exception {
+    testInsertsGeneratedIntoLogFilesRollback(true);
   }
 
-  private void testInsertsGeneratedIntoLogFilesRollbackAfterCompaction(HoodieFileFormat baseFileFormat,
-                                                                       Boolean rollbackUsingMarkers) throws Exception {
-    init(baseFileFormat);
-
+  private void testInsertsGeneratedIntoLogFilesRollbackAfterCompaction(Boolean rollbackUsingMarkers) throws Exception {
     // insert 100 records
     // Setting IndexType to be InMemory to simulate Global Index nature
     HoodieWriteConfig config = getConfigBuilder(false, rollbackUsingMarkers, IndexType.INMEMORY).build();
@@ -1135,23 +1148,20 @@ public class TestHoodieMergeOnReadTable extends HoodieClientTestHarness {
     }
   }
 
-  @ParameterizedTest
-  @MethodSource("argumentsProvider")
-  public void testInsertsGeneratedIntoLogFilesRollbackAfterCompactionUsingFileList(HoodieFileFormat baseFileFormat) throws Exception {
-    testInsertsGeneratedIntoLogFilesRollbackAfterCompaction(baseFileFormat, false);
+  @Test
+  public void testInsertsGeneratedIntoLogFilesRollbackAfterCompactionUsingFileList() throws Exception {
+    testInsertsGeneratedIntoLogFilesRollbackAfterCompaction(false);
   }
 
-  @ParameterizedTest
-  @MethodSource("argumentsProvider")
-  public void testInsertsGeneratedIntoLogFilesRollbackAfterCompactionUsingMarkers(HoodieFileFormat baseFileFormat) throws Exception {
-    testInsertsGeneratedIntoLogFilesRollbackAfterCompaction(baseFileFormat, true);
+  @Test
+  public void testInsertsGeneratedIntoLogFilesRollbackAfterCompactionUsingMarkers() throws Exception {
+    testInsertsGeneratedIntoLogFilesRollbackAfterCompaction(true);
   }
 
   /**
    * Test to ensure metadata stats are correctly written to metadata file.
    */
-  public void testMetadataStatsOnCommit(HoodieFileFormat baseFileFormat, Boolean rollbackUsingMarkers) throws Exception {
-    init(baseFileFormat);
+  public void testMetadataStatsOnCommit(Boolean rollbackUsingMarkers) throws Exception {
     HoodieWriteConfig cfg = getConfigBuilder(false, rollbackUsingMarkers, IndexType.INMEMORY)
         .withAutoCommit(false).build();
     try (HoodieWriteClient client = getHoodieWriteClient(cfg);) {
@@ -1231,26 +1241,21 @@ public class TestHoodieMergeOnReadTable extends HoodieClientTestHarness {
   /**
    * Test to ensure rolling stats are correctly written to metadata file.
    */
-  @ParameterizedTest
-  @MethodSource("argumentsProvider")
-  public void testMetadataStatsOnCommitUsingFileList(HoodieFileFormat baseFileFormat) throws Exception {
-    testMetadataStatsOnCommit(baseFileFormat, false);
+  @Test
+  public void testMetadataStatsOnCommitUsingFileList() throws Exception {
+    testMetadataStatsOnCommit(false);
   }
 
-  @ParameterizedTest
-  @MethodSource("argumentsProvider")
-  public void testMetadataStatsOnCommitUsingMarkers(HoodieFileFormat baseFileFormat) throws Exception {
-    testMetadataStatsOnCommit(baseFileFormat, true);
+  @Test
+  public void testMetadataStatsOnCommitUsingMarkers() throws Exception {
+    testMetadataStatsOnCommit(true);
   }
 
   /**
    * Test to ensure rolling stats are correctly written to the metadata file, identifies small files and corrects them.
    */
-  @ParameterizedTest
-  @MethodSource("argumentsProvider")
-  public void testMetadataStatsWithSmallFileHandling(HoodieFileFormat baseFileFormat) throws Exception {
-    init(baseFileFormat);
-
+  @Test
+  public void testRollingStatsWithSmallFileHandling() throws Exception {
     HoodieWriteConfig cfg = getConfigBuilder(false, IndexType.INMEMORY).withAutoCommit(false).build();
     try (HoodieWriteClient client = getHoodieWriteClient(cfg);) {
       Map<String, Long> fileIdToInsertsMap = new HashMap<>();
@@ -1364,11 +1369,8 @@ public class TestHoodieMergeOnReadTable extends HoodieClientTestHarness {
   /**
    * Test to validate invoking table.handleUpdate() with input records from multiple partitions will fail.
    */
-  @ParameterizedTest
-  @MethodSource("argumentsProvider")
-  public void testHandleUpdateWithMultiplePartitions(HoodieFileFormat baseFileFormat) throws Exception {
-    init(baseFileFormat);
-
+  @Test
+  public void testHandleUpdateWithMultiplePartitions() throws Exception {
     HoodieWriteConfig cfg = getConfig(true);
     try (HoodieWriteClient client = getHoodieWriteClient(cfg);) {
 
@@ -1467,7 +1469,7 @@ public class TestHoodieMergeOnReadTable extends HoodieClientTestHarness {
         .withAutoCommit(autoCommit).withAssumeDatePartitioning(true)
         .withCompactionConfig(HoodieCompactionConfig.newBuilder().compactionSmallFileSize(1024 * 1024 * 1024)
             .withInlineCompaction(false).withMaxNumDeltaCommitsBeforeCompaction(1).build())
-        .withStorageConfig(HoodieStorageConfig.newBuilder().limitFileSize(1024 * 1024 * 1024).build())
+        .withStorageConfig(HoodieStorageConfig.newBuilder().hfileMaxFileSize(1024 * 1024 * 1024).parquetMaxFileSize(1024 * 1024 * 1024).build())
         .withEmbeddedTimelineServerEnabled(true).forTable("test-trip-table")
         .withFileSystemViewConfig(new FileSystemViewStorageConfig.Builder()
             .withEnableBackupForRemoteFileSystemView(false).build())
@@ -1606,6 +1608,12 @@ public class TestHoodieMergeOnReadTable extends HoodieClientTestHarness {
         } else {
           return ((HoodieParquetInputFormat)inputFormat).listStatus(jobConf);
         }
+      case HFILE:
+        if (realtime) {
+          return ((HoodieHFileRealtimeInputFormat)inputFormat).listStatus(jobConf);
+        } else {
+          return ((HoodieHFileInputFormat)inputFormat).listStatus(jobConf);
+        }
       default:
         throw new HoodieIOException("Hoodie InputFormat not implemented for base file format " + baseFileFormat);
     }
diff --git a/hudi-client/src/test/java/org/apache/hudi/table/action/commit/TestCopyOnWriteActionExecutor.java b/hudi-client/src/test/java/org/apache/hudi/table/action/commit/TestCopyOnWriteActionExecutor.java
index cc78684..564ed08 100644
--- a/hudi-client/src/test/java/org/apache/hudi/table/action/commit/TestCopyOnWriteActionExecutor.java
+++ b/hudi-client/src/test/java/org/apache/hudi/table/action/commit/TestCopyOnWriteActionExecutor.java
@@ -366,7 +366,8 @@ public class TestCopyOnWriteActionExecutor extends HoodieClientTestBase {
   @Test
   public void testFileSizeUpsertRecords() throws Exception {
     HoodieWriteConfig config = makeHoodieClientConfigBuilder().withStorageConfig(HoodieStorageConfig.newBuilder()
-        .limitFileSize(64 * 1024).parquetBlockSize(64 * 1024).parquetPageSize(64 * 1024).build()).build();
+        .parquetMaxFileSize(64 * 1024).hfileMaxFileSize(64 * 1024)
+        .parquetBlockSize(64 * 1024).parquetPageSize(64 * 1024).build()).build();
     String instantTime = HoodieTestUtils.makeNewCommitTime();
     metaClient = HoodieTableMetaClient.reload(metaClient);
     HoodieCopyOnWriteTable table = (HoodieCopyOnWriteTable) HoodieTable.create(metaClient, config, hadoopConf);
@@ -401,7 +402,8 @@ public class TestCopyOnWriteActionExecutor extends HoodieClientTestBase {
   @Test
   public void testInsertUpsertWithHoodieAvroPayload() throws Exception {
     HoodieWriteConfig config = makeHoodieClientConfigBuilder()
-            .withStorageConfig(HoodieStorageConfig.newBuilder().limitFileSize(1000 * 1024).build()).build();
+            .withStorageConfig(HoodieStorageConfig.newBuilder()
+                .parquetMaxFileSize(1000 * 1024).hfileMaxFileSize(1000 * 1024).build()).build();
     metaClient = HoodieTableMetaClient.reload(metaClient);
     final HoodieCopyOnWriteTable table = (HoodieCopyOnWriteTable) HoodieTable.create(metaClient, config, hadoopConf);
     String instantTime = "000";
diff --git a/hudi-client/src/test/java/org/apache/hudi/table/action/commit/TestUpsertPartitioner.java b/hudi-client/src/test/java/org/apache/hudi/table/action/commit/TestUpsertPartitioner.java
index b8df5ef..138f60e 100644
--- a/hudi-client/src/test/java/org/apache/hudi/table/action/commit/TestUpsertPartitioner.java
+++ b/hudi-client/src/test/java/org/apache/hudi/table/action/commit/TestUpsertPartitioner.java
@@ -71,7 +71,8 @@ public class TestUpsertPartitioner extends HoodieClientTestBase {
     HoodieWriteConfig config = makeHoodieClientConfigBuilder()
         .withCompactionConfig(HoodieCompactionConfig.newBuilder().compactionSmallFileSize(smallFileSize)
             .insertSplitSize(100).autoTuneInsertSplits(autoSplitInserts).build())
-        .withStorageConfig(HoodieStorageConfig.newBuilder().limitFileSize(1000 * 1024).build()).build();
+        .withStorageConfig(HoodieStorageConfig.newBuilder().hfileMaxFileSize(1000 * 1024).parquetMaxFileSize(1000 * 1024).build())
+        .build();
 
     FileCreateUtils.createCommit(basePath, "001");
     FileCreateUtils.createDataFile(basePath, testPartitionPath, "001", "file1", fileSize);
diff --git a/hudi-client/src/test/java/org/apache/hudi/table/action/compact/CompactionTestBase.java b/hudi-client/src/test/java/org/apache/hudi/table/action/compact/CompactionTestBase.java
index 697febe..bd8c8c8 100644
--- a/hudi-client/src/test/java/org/apache/hudi/table/action/compact/CompactionTestBase.java
+++ b/hudi-client/src/test/java/org/apache/hudi/table/action/compact/CompactionTestBase.java
@@ -72,7 +72,8 @@ public class CompactionTestBase extends HoodieClientTestBase {
         .withAutoCommit(autoCommit).withAssumeDatePartitioning(true)
         .withCompactionConfig(HoodieCompactionConfig.newBuilder().compactionSmallFileSize(1024 * 1024 * 1024)
             .withInlineCompaction(false).withMaxNumDeltaCommitsBeforeCompaction(1).build())
-        .withStorageConfig(HoodieStorageConfig.newBuilder().limitFileSize(1024 * 1024 * 1024).build())
+        .withStorageConfig(HoodieStorageConfig.newBuilder()
+            .hfileMaxFileSize(1024 * 1024 * 1024).parquetMaxFileSize(1024 * 1024 * 1024).build())
         .forTable("test-trip-table")
         .withIndexConfig(HoodieIndexConfig.newBuilder().withIndexType(HoodieIndex.IndexType.BLOOM).build())
         .withEmbeddedTimelineServerEnabled(true).withFileSystemViewConfig(FileSystemViewStorageConfig.newBuilder()
@@ -194,7 +195,7 @@ public class CompactionTestBase extends HoodieClientTestBase {
     assertEquals(latestCompactionCommitTime, compactionInstantTime,
         "Expect compaction instant time to be the latest commit time");
     assertEquals(expectedNumRecs,
-        HoodieClientTestUtils.readSince(basePath, sqlContext, timeline, "000").count(),
+        HoodieClientTestUtils.countRecordsSince(jsc, basePath, sqlContext, timeline, "000"),
         "Must contain expected records");
 
   }
diff --git a/hudi-client/src/test/java/org/apache/hudi/table/action/compact/TestHoodieCompactor.java b/hudi-client/src/test/java/org/apache/hudi/table/action/compact/TestHoodieCompactor.java
index c044bee..09a9cca 100644
--- a/hudi-client/src/test/java/org/apache/hudi/table/action/compact/TestHoodieCompactor.java
+++ b/hudi-client/src/test/java/org/apache/hudi/table/action/compact/TestHoodieCompactor.java
@@ -95,7 +95,7 @@ public class TestHoodieCompactor extends HoodieClientTestHarness {
         .withParallelism(2, 2)
         .withCompactionConfig(HoodieCompactionConfig.newBuilder().compactionSmallFileSize(1024 * 1024)
             .withInlineCompaction(false).build())
-        .withStorageConfig(HoodieStorageConfig.newBuilder().limitFileSize(1024 * 1024).build())
+        .withStorageConfig(HoodieStorageConfig.newBuilder().hfileMaxFileSize(1024 * 1024).parquetMaxFileSize(1024 * 1024).build())
         .withMemoryConfig(HoodieMemoryConfig.newBuilder().withMaxDFSStreamBufferSize(1 * 1024 * 1024).build())
         .forTable("test-trip-table")
         .withIndexConfig(HoodieIndexConfig.newBuilder().withIndexType(HoodieIndex.IndexType.BLOOM).build());
diff --git a/hudi-client/src/test/java/org/apache/hudi/testutils/HoodieClientTestBase.java b/hudi-client/src/test/java/org/apache/hudi/testutils/HoodieClientTestBase.java
index 203cc54..bd933ab 100644
--- a/hudi-client/src/test/java/org/apache/hudi/testutils/HoodieClientTestBase.java
+++ b/hudi-client/src/test/java/org/apache/hudi/testutils/HoodieClientTestBase.java
@@ -130,7 +130,7 @@ public class HoodieClientTestBase extends HoodieClientTestHarness {
         .withWriteStatusClass(MetadataMergeWriteStatus.class)
         .withConsistencyGuardConfig(ConsistencyGuardConfig.newBuilder().withConsistencyCheckEnabled(true).build())
         .withCompactionConfig(HoodieCompactionConfig.newBuilder().compactionSmallFileSize(1024 * 1024).build())
-        .withStorageConfig(HoodieStorageConfig.newBuilder().limitFileSize(1024 * 1024).build())
+        .withStorageConfig(HoodieStorageConfig.newBuilder().hfileMaxFileSize(1024 * 1024).parquetMaxFileSize(1024 * 1024).build())
         .forTable("test-trip-table")
         .withIndexConfig(HoodieIndexConfig.newBuilder().withIndexType(indexType).build())
         .withEmbeddedTimelineServerEnabled(true).withFileSystemViewConfig(FileSystemViewStorageConfig.newBuilder()
@@ -459,12 +459,12 @@ public class HoodieClientTestBase extends HoodieClientTestHarness {
 
       // Check that the incremental consumption from prevCommitTime
       assertEquals(HoodieClientTestUtils.readCommit(basePath, sqlContext, timeline, newCommitTime).count(),
-          HoodieClientTestUtils.readSince(basePath, sqlContext, timeline, prevCommitTime).count(),
+          HoodieClientTestUtils.countRecordsSince(jsc, basePath, sqlContext, timeline, prevCommitTime),
           "Incremental consumption from " + prevCommitTime + " should give all records in latest commit");
       if (commitTimesBetweenPrevAndNew.isPresent()) {
         commitTimesBetweenPrevAndNew.get().forEach(ct -> {
           assertEquals(HoodieClientTestUtils.readCommit(basePath, sqlContext, timeline, newCommitTime).count(),
-              HoodieClientTestUtils.readSince(basePath, sqlContext, timeline, ct).count(),
+              HoodieClientTestUtils.countRecordsSince(jsc, basePath, sqlContext, timeline, ct),
               "Incremental consumption from " + ct + " should give all records in latest commit");
         });
       }
@@ -527,7 +527,7 @@ public class HoodieClientTestBase extends HoodieClientTestHarness {
 
       // Check that the incremental consumption from prevCommitTime
       assertEquals(HoodieClientTestUtils.readCommit(basePath, sqlContext, timeline, newCommitTime).count(),
-          HoodieClientTestUtils.readSince(basePath, sqlContext, timeline, prevCommitTime).count(),
+          HoodieClientTestUtils.countRecordsSince(jsc, basePath, sqlContext, timeline, prevCommitTime),
           "Incremental consumption from " + prevCommitTime + " should give no records in latest commit,"
               + " since it is a delete operation");
     }
diff --git a/hudi-client/src/test/java/org/apache/hudi/testutils/HoodieClientTestUtils.java b/hudi-client/src/test/java/org/apache/hudi/testutils/HoodieClientTestUtils.java
index 353b34c..c4c67fa 100644
--- a/hudi-client/src/test/java/org/apache/hudi/testutils/HoodieClientTestUtils.java
+++ b/hudi-client/src/test/java/org/apache/hudi/testutils/HoodieClientTestUtils.java
@@ -31,6 +31,7 @@ import org.apache.hudi.common.model.HoodieCommitMetadata;
 import org.apache.hudi.common.model.HoodieFileFormat;
 import org.apache.hudi.common.model.HoodieRecord;
 import org.apache.hudi.common.table.HoodieTableMetaClient;
+import org.apache.hudi.common.table.timeline.HoodieActiveTimeline;
 import org.apache.hudi.common.table.timeline.HoodieInstant;
 import org.apache.hudi.common.table.timeline.HoodieTimeline;
 import org.apache.hudi.common.table.view.HoodieTableFileSystemView;
@@ -45,6 +46,10 @@ import org.apache.avro.Schema;
 import org.apache.avro.generic.GenericRecord;
 import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.hbase.Cell;
+import org.apache.hadoop.hbase.io.hfile.CacheConfig;
+import org.apache.hadoop.hbase.io.hfile.HFile;
+import org.apache.hadoop.hbase.io.hfile.HFileScanner;
 import org.apache.log4j.LogManager;
 import org.apache.log4j.Logger;
 import org.apache.parquet.avro.AvroSchemaConverter;
@@ -57,12 +62,15 @@ import org.apache.spark.sql.Row;
 import org.apache.spark.sql.SQLContext;
 
 import java.io.IOException;
+
 import java.util.ArrayList;
 import java.util.Arrays;
 import java.util.HashMap;
+import java.util.LinkedList;
 import java.util.List;
 import java.util.UUID;
 import java.util.stream.Collectors;
+import java.util.stream.Stream;
 
 /**
  * Utility methods to aid testing inside the HoodieClient module.
@@ -129,7 +137,7 @@ public class HoodieClientTestUtils {
   /**
    * Obtain all new data written into the Hoodie table since the given timestamp.
    */
-  public static Dataset<Row> readSince(String basePath, SQLContext sqlContext,
+  public static long countRecordsSince(JavaSparkContext jsc, String basePath, SQLContext sqlContext,
                                        HoodieTimeline commitTimeline, String lastCommitTime) {
     List<HoodieInstant> commitsToReturn =
         commitTimeline.findInstantsAfter(lastCommitTime, Integer.MAX_VALUE).getInstants().collect(Collectors.toList());
@@ -137,12 +145,17 @@ public class HoodieClientTestUtils {
       // Go over the commit metadata, and obtain the new files that need to be read.
       HashMap<String, String> fileIdToFullPath = getLatestFileIDsToFullPath(basePath, commitTimeline, commitsToReturn);
       String[] paths = fileIdToFullPath.values().toArray(new String[fileIdToFullPath.size()]);
-      Dataset<Row> rows = null;
       if (paths[0].endsWith(HoodieFileFormat.PARQUET.getFileExtension())) {
-        rows = sqlContext.read().parquet(paths);
+        return sqlContext.read().parquet(paths)
+            .filter(String.format("%s >'%s'", HoodieRecord.COMMIT_TIME_METADATA_FIELD, lastCommitTime))
+            .count();
+      } else if (paths[0].endsWith(HoodieFileFormat.HFILE.getFileExtension())) {
+        return readHFile(jsc, paths)
+            .filter(gr -> HoodieTimeline.compareTimestamps(lastCommitTime, HoodieActiveTimeline.LESSER_THAN,
+                gr.get(HoodieRecord.COMMIT_TIME_METADATA_FIELD).toString()))
+            .count();
       }
-
-      return rows.filter(String.format("%s >'%s'", HoodieRecord.COMMIT_TIME_METADATA_FIELD, lastCommitTime));
+      throw new HoodieException("Unsupported base file format for file :" + paths[0]);
     } catch (IOException e) {
       throw new HoodieException("Error pulling data incrementally from commitTimestamp :" + lastCommitTime, e);
     }
@@ -170,6 +183,37 @@ public class HoodieClientTestUtils {
     }
   }
 
+  public static Stream<GenericRecord> readHFile(JavaSparkContext jsc, String[] paths) {
+    // TODO: this should be ported to use HoodieStorageReader
+    List<GenericRecord> valuesAsList = new LinkedList<>();
+
+    FileSystem fs = FSUtils.getFs(paths[0], jsc.hadoopConfiguration());
+    CacheConfig cacheConfig = new CacheConfig(fs.getConf());
+    Schema schema = null;
+    for (String path : paths) {
+      try {
+        HFile.Reader reader = HFile.createReader(fs, new Path(path), cacheConfig, fs.getConf());
+        if (schema == null) {
+          schema = new Schema.Parser().parse(new String(reader.loadFileInfo().get("schema".getBytes())));
+        }
+        HFileScanner scanner = reader.getScanner(false, false);
+        if (!scanner.seekTo()) {
+          // EOF reached
+          continue;
+        }
+
+        do {
+          Cell c = scanner.getKeyValue();
+          byte[] value = Arrays.copyOfRange(c.getValueArray(), c.getValueOffset(), c.getValueOffset() + c.getValueLength());
+          valuesAsList.add(HoodieAvroUtils.bytesToAvro(value, schema));
+        } while (scanner.next());
+      } catch (IOException e) {
+        throw new HoodieException("Error reading hfile " + path + " as a dataframe", e);
+      }
+    }
+    return valuesAsList.stream();
+  }
+
   /**
    * TODO Incorporate into {@link org.apache.hudi.common.testutils.HoodieTestTable}.
    */
diff --git a/hudi-client/src/test/java/org/apache/hudi/testutils/SparkDatasetTestUtils.java b/hudi-client/src/test/java/org/apache/hudi/testutils/SparkDatasetTestUtils.java
index 83b966a..8d7c094 100644
--- a/hudi-client/src/test/java/org/apache/hudi/testutils/SparkDatasetTestUtils.java
+++ b/hudi-client/src/test/java/org/apache/hudi/testutils/SparkDatasetTestUtils.java
@@ -166,7 +166,7 @@ public class SparkDatasetTestUtils {
     return HoodieWriteConfig.newBuilder().withPath(basePath).withSchema(HoodieTestDataGenerator.TRIP_EXAMPLE_SCHEMA)
         .withParallelism(2, 2)
         .withCompactionConfig(HoodieCompactionConfig.newBuilder().compactionSmallFileSize(1024 * 1024).build())
-        .withStorageConfig(HoodieStorageConfig.newBuilder().limitFileSize(1024 * 1024).build())
+        .withStorageConfig(HoodieStorageConfig.newBuilder().hfileMaxFileSize(1024 * 1024).parquetMaxFileSize(1024 * 1024).build())
         .forTable("test-trip-table")
         .withIndexConfig(HoodieIndexConfig.newBuilder().withIndexType(HoodieIndex.IndexType.BLOOM).build())
         .withBulkInsertParallelism(2);
diff --git a/hudi-common/src/main/java/org/apache/hudi/avro/HoodieAvroUtils.java b/hudi-common/src/main/java/org/apache/hudi/avro/HoodieAvroUtils.java
index 7247c41..57dde9f 100644
--- a/hudi-common/src/main/java/org/apache/hudi/avro/HoodieAvroUtils.java
+++ b/hudi-common/src/main/java/org/apache/hudi/avro/HoodieAvroUtils.java
@@ -35,6 +35,7 @@ import org.apache.avro.generic.GenericData;
 import org.apache.avro.generic.GenericDatumReader;
 import org.apache.avro.generic.GenericDatumWriter;
 import org.apache.avro.generic.GenericRecord;
+import org.apache.avro.generic.IndexedRecord;
 import org.apache.avro.io.BinaryDecoder;
 import org.apache.avro.io.BinaryEncoder;
 import org.apache.avro.io.DatumWriter;
@@ -84,7 +85,11 @@ public class HoodieAvroUtils {
    * Convert a given avro record to bytes.
    */
   public static byte[] avroToBytes(GenericRecord record) {
-    GenericDatumWriter<GenericRecord> writer = new GenericDatumWriter<>(record.getSchema());
+    return indexedRecordToBytes(record);
+  }
+
+  public static <T extends IndexedRecord> byte[] indexedRecordToBytes(T record) {
+    GenericDatumWriter<T> writer = new GenericDatumWriter<>(record.getSchema());
     try (ByteArrayOutputStream out = new ByteArrayOutputStream()) {
       BinaryEncoder encoder = EncoderFactory.get().binaryEncoder(out, reuseEncoder.get());
       reuseEncoder.set(encoder);
@@ -115,9 +120,16 @@ public class HoodieAvroUtils {
    * Convert serialized bytes back into avro record.
    */
   public static GenericRecord bytesToAvro(byte[] bytes, Schema schema) throws IOException {
+    return bytesToAvro(bytes, schema, schema);
+  }
+
+  /**
+   * Convert serialized bytes back into avro record.
+   */
+  public static GenericRecord bytesToAvro(byte[] bytes, Schema writerSchema, Schema readerSchema) throws IOException {
     BinaryDecoder decoder = DecoderFactory.get().binaryDecoder(bytes, reuseDecoder.get());
     reuseDecoder.set(decoder);
-    GenericDatumReader<GenericRecord> reader = new GenericDatumReader<>(schema);
+    GenericDatumReader<GenericRecord> reader = new GenericDatumReader<>(writerSchema, readerSchema);
     return reader.read(null, decoder);
   }
 
diff --git a/hudi-common/src/main/java/org/apache/hudi/common/table/log/AbstractHoodieLogRecordScanner.java b/hudi-common/src/main/java/org/apache/hudi/common/table/log/AbstractHoodieLogRecordScanner.java
index 806d55f..52483ea 100644
--- a/hudi-common/src/main/java/org/apache/hudi/common/table/log/AbstractHoodieLogRecordScanner.java
+++ b/hudi-common/src/main/java/org/apache/hudi/common/table/log/AbstractHoodieLogRecordScanner.java
@@ -27,6 +27,7 @@ import org.apache.hudi.common.table.log.block.HoodieAvroDataBlock;
 import org.apache.hudi.common.table.log.block.HoodieCommandBlock;
 import org.apache.hudi.common.table.log.block.HoodieDataBlock;
 import org.apache.hudi.common.table.log.block.HoodieDeleteBlock;
+import org.apache.hudi.common.table.log.block.HoodieHFileDataBlock;
 import org.apache.hudi.common.table.log.block.HoodieLogBlock;
 import org.apache.hudi.common.table.timeline.HoodieTimeline;
 import org.apache.hudi.common.util.SpillableMapUtils;
@@ -145,6 +146,7 @@ public abstract class AbstractHoodieLogRecordScanner {
           break;
         }
         switch (r.getBlockType()) {
+          case HFILE_DATA_BLOCK:
           case AVRO_DATA_BLOCK:
             LOG.info("Reading a data block from file " + logFile.getPath());
             if (isNewInstantBlock(r) && !readBlocksLazily) {
@@ -305,6 +307,9 @@ public abstract class AbstractHoodieLogRecordScanner {
         case AVRO_DATA_BLOCK:
           processDataBlock((HoodieAvroDataBlock) lastBlock);
           break;
+        case HFILE_DATA_BLOCK:
+          processDataBlock((HoodieHFileDataBlock) lastBlock);
+          break;
         case DELETE_BLOCK:
           Arrays.stream(((HoodieDeleteBlock) lastBlock).getKeysToDelete()).forEach(this::processNextDeletedKey);
           break;
diff --git a/hudi-common/src/main/java/org/apache/hudi/common/table/log/HoodieLogFileReader.java b/hudi-common/src/main/java/org/apache/hudi/common/table/log/HoodieLogFileReader.java
index 38bf83c..5d2e185 100644
--- a/hudi-common/src/main/java/org/apache/hudi/common/table/log/HoodieLogFileReader.java
+++ b/hudi-common/src/main/java/org/apache/hudi/common/table/log/HoodieLogFileReader.java
@@ -24,6 +24,7 @@ import org.apache.hudi.common.table.log.block.HoodieAvroDataBlock;
 import org.apache.hudi.common.table.log.block.HoodieCommandBlock;
 import org.apache.hudi.common.table.log.block.HoodieCorruptBlock;
 import org.apache.hudi.common.table.log.block.HoodieDeleteBlock;
+import org.apache.hudi.common.table.log.block.HoodieHFileDataBlock;
 import org.apache.hudi.common.table.log.block.HoodieLogBlock;
 import org.apache.hudi.common.table.log.block.HoodieLogBlock.HeaderMetadataType;
 import org.apache.hudi.common.table.log.block.HoodieLogBlock.HoodieLogBlockType;
@@ -179,6 +180,7 @@ public class HoodieLogFileReader implements HoodieLogFormat.Reader {
 
     // 8. Read log block length, if present. This acts as a reverse pointer when traversing a
     // log file in reverse
+    @SuppressWarnings("unused")
     long logBlockLength = 0;
     if (nextBlockVersion.hasLogBlockLength()) {
       logBlockLength = inputStream.readLong();
@@ -196,6 +198,9 @@ public class HoodieLogFileReader implements HoodieLogFormat.Reader {
           return new HoodieAvroDataBlock(logFile, inputStream, Option.ofNullable(content), readBlockLazily,
               contentPosition, contentLength, blockEndPos, readerSchema, header, footer);
         }
+      case HFILE_DATA_BLOCK:
+        return new HoodieHFileDataBlock(logFile, inputStream, Option.ofNullable(content), readBlockLazily,
+              contentPosition, contentLength, blockEndPos, readerSchema, header, footer);
       case DELETE_BLOCK:
         return HoodieDeleteBlock.getBlock(logFile, inputStream, Option.ofNullable(content), readBlockLazily,
             contentPosition, contentLength, blockEndPos, header, footer);
diff --git a/hudi-common/src/main/java/org/apache/hudi/common/table/log/block/HoodieDataBlock.java b/hudi-common/src/main/java/org/apache/hudi/common/table/log/block/HoodieDataBlock.java
index 1a70fc3..8f5b741 100644
--- a/hudi-common/src/main/java/org/apache/hudi/common/table/log/block/HoodieDataBlock.java
+++ b/hudi-common/src/main/java/org/apache/hudi/common/table/log/block/HoodieDataBlock.java
@@ -77,6 +77,8 @@ public abstract class HoodieDataBlock extends HoodieLogBlock {
     switch (logDataBlockFormat) {
       case AVRO_DATA_BLOCK:
         return new HoodieAvroDataBlock(recordList, header);
+      case HFILE_DATA_BLOCK:
+        return new HoodieHFileDataBlock(recordList, header);
       default:
         throw new HoodieException("Data block format " + logDataBlockFormat + " not implemented");
     }
diff --git a/hudi-common/src/main/java/org/apache/hudi/common/table/log/block/HoodieHFileDataBlock.java b/hudi-common/src/main/java/org/apache/hudi/common/table/log/block/HoodieHFileDataBlock.java
new file mode 100644
index 0000000..61d9b7f
--- /dev/null
+++ b/hudi-common/src/main/java/org/apache/hudi/common/table/log/block/HoodieHFileDataBlock.java
@@ -0,0 +1,159 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hudi.common.table.log.block;
+
+import org.apache.hudi.avro.HoodieAvroUtils;
+import org.apache.hudi.common.model.HoodieLogFile;
+import org.apache.hudi.common.model.HoodieRecord;
+import org.apache.hudi.common.util.Option;
+import org.apache.hudi.exception.HoodieIOException;
+import org.apache.hudi.io.storage.HoodieHFileReader;
+import org.apache.log4j.LogManager;
+import org.apache.log4j.Logger;
+
+import org.apache.avro.Schema;
+import org.apache.avro.Schema.Field;
+import org.apache.avro.generic.IndexedRecord;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FSDataInputStream;
+import org.apache.hadoop.fs.FSDataOutputStream;
+import org.apache.hadoop.hbase.KeyValue;
+import org.apache.hadoop.hbase.io.compress.Compression;
+import org.apache.hadoop.hbase.io.hfile.CacheConfig;
+import org.apache.hadoop.hbase.io.hfile.HFile;
+import org.apache.hadoop.hbase.io.hfile.HFileContext;
+import org.apache.hadoop.hbase.io.hfile.HFileContextBuilder;
+import org.apache.hadoop.hbase.util.Pair;
+
+import java.io.ByteArrayOutputStream;
+import java.io.IOException;
+import java.util.HashMap;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Map;
+import java.util.TreeMap;
+import java.util.stream.Collectors;
+
+import javax.annotation.Nonnull;
+
+/**
+ * HoodieHFileDataBlock contains a list of records stored inside an HFile format. It is used with the HFile
+ * base file format.
+ */
+public class HoodieHFileDataBlock extends HoodieDataBlock {
+  private static final Logger LOG = LogManager.getLogger(HoodieHFileDataBlock.class);
+  private static Compression.Algorithm compressionAlgorithm = Compression.Algorithm.GZ;
+  private static int blockSize = 1 * 1024 * 1024;
+
+  public HoodieHFileDataBlock(@Nonnull Map<HeaderMetadataType, String> logBlockHeader,
+       @Nonnull Map<HeaderMetadataType, String> logBlockFooter,
+       @Nonnull Option<HoodieLogBlockContentLocation> blockContentLocation, @Nonnull Option<byte[]> content,
+       FSDataInputStream inputStream, boolean readBlockLazily) {
+    super(logBlockHeader, logBlockFooter, blockContentLocation, content, inputStream, readBlockLazily);
+  }
+
+  public HoodieHFileDataBlock(HoodieLogFile logFile, FSDataInputStream inputStream, Option<byte[]> content,
+       boolean readBlockLazily, long position, long blockSize, long blockEndpos, Schema readerSchema,
+       Map<HeaderMetadataType, String> header, Map<HeaderMetadataType, String> footer) {
+    super(content, inputStream, readBlockLazily,
+          Option.of(new HoodieLogBlockContentLocation(logFile, position, blockSize, blockEndpos)), readerSchema, header,
+          footer);
+  }
+
+  public HoodieHFileDataBlock(@Nonnull List<IndexedRecord> records, @Nonnull Map<HeaderMetadataType, String> header) {
+    super(records, header, new HashMap<>());
+  }
+
+  @Override
+  public HoodieLogBlockType getBlockType() {
+    return HoodieLogBlockType.HFILE_DATA_BLOCK;
+  }
+
+  @Override
+  protected byte[] serializeRecords() throws IOException {
+    HFileContext context = new HFileContextBuilder().withBlockSize(blockSize).withCompression(compressionAlgorithm)
+        .build();
+    Configuration conf = new Configuration();
+    CacheConfig cacheConfig = new CacheConfig(conf);
+    ByteArrayOutputStream baos = new ByteArrayOutputStream();
+    FSDataOutputStream ostream = new FSDataOutputStream(baos, null);
+
+    HFile.Writer writer = HFile.getWriterFactory(conf, cacheConfig)
+        .withOutputStream(ostream).withFileContext(context).create();
+
+    // Serialize records into bytes
+    Map<String, byte[]> sortedRecordsMap = new TreeMap<>();
+    Iterator<IndexedRecord> itr = records.iterator();
+    boolean useIntegerKey = false;
+    int key = 0;
+    int keySize = 0;
+    Field keyField = records.get(0).getSchema().getField(HoodieRecord.RECORD_KEY_METADATA_FIELD);
+    if (keyField == null) {
+      // Missing key metadata field so we should use an integer sequence key
+      useIntegerKey = true;
+      keySize = (int) Math.ceil(Math.log(records.size())) + 1;
+    }
+    while (itr.hasNext()) {
+      IndexedRecord record = itr.next();
+      String recordKey;
+      if (useIntegerKey) {
+        recordKey = String.format("%" + keySize + "s", key++);
+      } else {
+        recordKey = record.get(keyField.pos()).toString();
+      }
+      byte[] recordBytes = HoodieAvroUtils.indexedRecordToBytes(record);
+      sortedRecordsMap.put(recordKey, recordBytes);
+    }
+
+    // Write the records
+    sortedRecordsMap.forEach((recordKey, recordBytes) -> {
+      try {
+        KeyValue kv = new KeyValue(recordKey.getBytes(), null, null, recordBytes);
+        writer.append(kv);
+      } catch (IOException e) {
+        throw new HoodieIOException("IOException serializing records", e);
+      }
+    });
+
+    writer.close();
+    ostream.flush();
+    ostream.close();
+
+    return baos.toByteArray();
+  }
+
+  @Override
+  protected void deserializeRecords() throws IOException {
+    // Get schema from the header
+    Schema writerSchema = new Schema.Parser().parse(super.getLogBlockHeader().get(HeaderMetadataType.SCHEMA));
+
+    // If readerSchema was not present, use writerSchema
+    if (schema == null) {
+      schema = writerSchema;
+    }
+
+    // Read the content
+    HoodieHFileReader reader = new HoodieHFileReader<>(getContent().get());
+    List<Pair<String, IndexedRecord>> records = reader.readAllRecords(writerSchema, schema);
+    this.records = records.stream().map(t -> t.getSecond()).collect(Collectors.toList());
+
+    // Free up content to be GC'd, deflate
+    deflate();
+  }
+}
diff --git a/hudi-common/src/main/java/org/apache/hudi/common/table/log/block/HoodieLogBlock.java b/hudi-common/src/main/java/org/apache/hudi/common/table/log/block/HoodieLogBlock.java
index b1a88c1..1d185e4 100644
--- a/hudi-common/src/main/java/org/apache/hudi/common/table/log/block/HoodieLogBlock.java
+++ b/hudi-common/src/main/java/org/apache/hudi/common/table/log/block/HoodieLogBlock.java
@@ -110,7 +110,7 @@ public abstract class HoodieLogBlock {
    * Type of the log block WARNING: This enum is serialized as the ordinal. Only add new enums at the end.
    */
   public enum HoodieLogBlockType {
-    COMMAND_BLOCK, DELETE_BLOCK, CORRUPT_BLOCK, AVRO_DATA_BLOCK
+    COMMAND_BLOCK, DELETE_BLOCK, CORRUPT_BLOCK, AVRO_DATA_BLOCK, HFILE_DATA_BLOCK
   }
 
   /**
diff --git a/hudi-common/src/main/java/org/apache/hudi/io/storage/HoodieFileReader.java b/hudi-common/src/main/java/org/apache/hudi/io/storage/HoodieFileReader.java
index 0e5ead9..fefe7eb 100644
--- a/hudi-common/src/main/java/org/apache/hudi/io/storage/HoodieFileReader.java
+++ b/hudi-common/src/main/java/org/apache/hudi/io/storage/HoodieFileReader.java
@@ -25,6 +25,7 @@ import java.util.Set;
 import org.apache.avro.Schema;
 import org.apache.avro.generic.IndexedRecord;
 import org.apache.hudi.common.bloom.BloomFilter;
+import org.apache.hudi.common.util.Option;
 
 public interface HoodieFileReader<R extends IndexedRecord> {
 
@@ -34,7 +35,19 @@ public interface HoodieFileReader<R extends IndexedRecord> {
 
   public Set<String> filterRowKeys(Set<String> candidateRowKeys);
 
-  public Iterator<R> getRecordIterator(Schema schema) throws IOException;
+  public Iterator<R> getRecordIterator(Schema readerSchema) throws IOException;
+
+  default Iterator<R> getRecordIterator() throws IOException {
+    return getRecordIterator(getSchema());
+  }
+
+  default Option<R> getRecordByKey(String key, Schema readerSchema) throws IOException {
+    throw new UnsupportedOperationException();
+  }
+
+  default Option<R> getRecordByKey(String key) throws IOException {
+    return getRecordByKey(key, getSchema());
+  }
 
   Schema getSchema();
 
diff --git a/hudi-common/src/main/java/org/apache/hudi/io/storage/HoodieFileReaderFactory.java b/hudi-common/src/main/java/org/apache/hudi/io/storage/HoodieFileReaderFactory.java
index 1ad85d3..3c97b36 100644
--- a/hudi-common/src/main/java/org/apache/hudi/io/storage/HoodieFileReaderFactory.java
+++ b/hudi-common/src/main/java/org/apache/hudi/io/storage/HoodieFileReaderFactory.java
@@ -24,10 +24,12 @@ import org.apache.hudi.common.model.HoodieRecordPayload;
 import org.apache.avro.generic.IndexedRecord;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.hbase.io.hfile.CacheConfig;
 
 import java.io.IOException;
 
 import static org.apache.hudi.common.model.HoodieFileFormat.PARQUET;
+import static org.apache.hudi.common.model.HoodieFileFormat.HFILE;
 
 public class HoodieFileReaderFactory {
 
@@ -37,6 +39,10 @@ public class HoodieFileReaderFactory {
     if (PARQUET.getFileExtension().equals(extension)) {
       return newParquetFileReader(conf, path);
     }
+    if (HFILE.getFileExtension().equals(extension)) {
+      return newHFileFileReader(conf, path);
+    }
+
     throw new UnsupportedOperationException(extension + " format not supported yet.");
   }
 
@@ -44,4 +50,10 @@ public class HoodieFileReaderFactory {
       Configuration conf, Path path) throws IOException {
     return new HoodieParquetReader<>(conf, path);
   }
+
+  private static <T extends HoodieRecordPayload, R extends IndexedRecord> HoodieFileReader<R> newHFileFileReader(
+      Configuration conf, Path path) throws IOException {
+    CacheConfig cacheConfig = new CacheConfig(conf);
+    return new HoodieHFileReader<>(conf, path, cacheConfig);
+  }
 }
diff --git a/hudi-common/src/main/java/org/apache/hudi/io/storage/HoodieHFileReader.java b/hudi-common/src/main/java/org/apache/hudi/io/storage/HoodieHFileReader.java
new file mode 100644
index 0000000..1d76929
--- /dev/null
+++ b/hudi-common/src/main/java/org/apache/hudi/io/storage/HoodieHFileReader.java
@@ -0,0 +1,300 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hudi.io.storage;
+
+import java.io.ByteArrayInputStream;
+import java.io.IOException;
+import java.nio.ByteBuffer;
+import java.util.Arrays;
+import java.util.HashSet;
+import java.util.Iterator;
+import java.util.LinkedList;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+
+import org.apache.avro.Schema;
+import org.apache.avro.generic.IndexedRecord;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FSDataInputStream;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.fs.PositionedReadable;
+import org.apache.hadoop.fs.Seekable;
+import org.apache.hadoop.hbase.Cell;
+import org.apache.hadoop.hbase.KeyValue;
+import org.apache.hadoop.hbase.io.FSDataInputStreamWrapper;
+import org.apache.hadoop.hbase.io.hfile.CacheConfig;
+import org.apache.hadoop.hbase.io.hfile.HFile;
+import org.apache.hadoop.hbase.io.hfile.HFileScanner;
+import org.apache.hadoop.hbase.util.Pair;
+import org.apache.hudi.avro.HoodieAvroUtils;
+import org.apache.hudi.common.bloom.BloomFilter;
+import org.apache.hudi.common.bloom.BloomFilterFactory;
+import org.apache.hudi.common.fs.FSUtils;
+import org.apache.hudi.common.util.Option;
+import org.apache.hudi.exception.HoodieException;
+import org.apache.hudi.exception.HoodieIOException;
+
+public class HoodieHFileReader<R extends IndexedRecord> implements HoodieFileReader {
+  private Path path;
+  private Configuration conf;
+  private HFile.Reader reader;
+  private Schema schema;
+
+  public static final String KEY_SCHEMA = "schema";
+  public static final String KEY_BLOOM_FILTER_META_BLOCK = "bloomFilter";
+  public static final String KEY_BLOOM_FILTER_TYPE_CODE = "bloomFilterTypeCode";
+  public static final String KEY_MIN_RECORD = "minRecordKey";
+  public static final String KEY_MAX_RECORD = "maxRecordKey";
+
+  public HoodieHFileReader(Configuration configuration, Path path, CacheConfig cacheConfig) throws IOException {
+    this.conf = configuration;
+    this.path = path;
+    this.reader = HFile.createReader(FSUtils.getFs(path.toString(), configuration), path, cacheConfig, conf);
+  }
+
+  public HoodieHFileReader(byte[] content) throws IOException {
+    Configuration conf = new Configuration();
+    Path path = new Path("hoodie");
+    SeekableByteArrayInputStream bis = new SeekableByteArrayInputStream(content);
+    FSDataInputStream fsdis = new FSDataInputStream(bis);
+    this.reader = HFile.createReader(FSUtils.getFs("hoodie", conf), path, new FSDataInputStreamWrapper(fsdis),
+        content.length, new CacheConfig(conf), conf);
+  }
+
+  @Override
+  public String[] readMinMaxRecordKeys() {
+    try {
+      Map<byte[], byte[]> fileInfo = reader.loadFileInfo();
+      return new String[] { new String(fileInfo.get(KEY_MIN_RECORD.getBytes())),
+          new String(fileInfo.get(KEY_MAX_RECORD.getBytes()))};
+    } catch (IOException e) {
+      throw new HoodieException("Could not read min/max record key out of file information block correctly from path", e);
+    }
+  }
+
+  @Override
+  public Schema getSchema() {
+    if (schema == null) {
+      try {
+        Map<byte[], byte[]> fileInfo = reader.loadFileInfo();
+        schema = new Schema.Parser().parse(new String(fileInfo.get(KEY_SCHEMA.getBytes())));
+      } catch (IOException e) {
+        throw new HoodieException("Could not read schema of file from path", e);
+      }
+    }
+
+    return schema;
+  }
+
+  @Override
+  public BloomFilter readBloomFilter() {
+    Map<byte[], byte[]> fileInfo;
+    try {
+      fileInfo = reader.loadFileInfo();
+      ByteBuffer serializedFilter = reader.getMetaBlock(KEY_BLOOM_FILTER_META_BLOCK, false);
+      byte[] filterBytes = new byte[serializedFilter.remaining()];
+      serializedFilter.get(filterBytes); // read the bytes that were written
+      return BloomFilterFactory.fromString(new String(filterBytes),
+          new String(fileInfo.get(KEY_BLOOM_FILTER_TYPE_CODE.getBytes())));
+    } catch (IOException e) {
+      throw new HoodieException("Could not read bloom filter from " + path, e);
+    }
+  }
+
+  @Override
+  public Set<String> filterRowKeys(Set candidateRowKeys) {
+    // Current implementation reads all records and filters them. In certain cases, it many be better to:
+    //  1. Scan a limited subset of keys (min/max range of candidateRowKeys)
+    //  2. Lookup keys individually (if the size of candidateRowKeys is much less than the total keys in file)
+    try {
+      List<Pair<String, R>> allRecords = readAllRecords();
+      Set<String> rowKeys = new HashSet<>();
+      allRecords.forEach(t -> {
+        if (candidateRowKeys.contains(t.getFirst())) {
+          rowKeys.add(t.getFirst());
+        }
+      });
+      return rowKeys;
+    } catch (IOException e) {
+      throw new HoodieIOException("Failed to read row keys from " + path, e);
+    }
+  }
+
+  public List<Pair<String, R>> readAllRecords(Schema writerSchema, Schema readerSchema) throws IOException {
+    List<Pair<String, R>> recordList = new LinkedList<>();
+    try {
+      HFileScanner scanner = reader.getScanner(false, false);
+      if (scanner.seekTo()) {
+        do {
+          Cell c = scanner.getKeyValue();
+          byte[] keyBytes = Arrays.copyOfRange(c.getRowArray(), c.getRowOffset(), c.getRowOffset() + c.getRowLength());
+          R record = getRecordFromCell(c, writerSchema, readerSchema);
+          recordList.add(new Pair<>(new String(keyBytes), record));
+        } while (scanner.next());
+      }
+
+      return recordList;
+    } catch (IOException e) {
+      throw new HoodieException("Error reading hfile " + path + " as a dataframe", e);
+    }
+  }
+
+  public List<Pair<String, R>> readAllRecords() throws IOException {
+    Schema schema = new Schema.Parser().parse(new String(reader.loadFileInfo().get(KEY_SCHEMA.getBytes())));
+    return readAllRecords(schema, schema);
+  }
+
+  @Override
+  public Iterator getRecordIterator(Schema readerSchema) throws IOException {
+    final HFileScanner scanner = reader.getScanner(false, false);
+    return new Iterator<R>() {
+      private R next = null;
+      private boolean eof = false;
+
+      @Override
+      public boolean hasNext() {
+        try {
+          // To handle when hasNext() is called multiple times for idempotency and/or the first time
+          if (this.next == null && !this.eof) {
+            if (!scanner.isSeeked() && scanner.seekTo()) {
+                this.next = (R)getRecordFromCell(scanner.getKeyValue(), getSchema(), readerSchema);
+            }
+          }
+          return this.next != null;
+        } catch (IOException io) {
+          throw new HoodieIOException("unable to read next record from hfile ", io);
+        }
+      }
+
+      @Override
+      public R next() {
+        try {
+          // To handle case when next() is called before hasNext()
+          if (this.next == null) {
+            if (!hasNext()) {
+              throw new HoodieIOException("No more records left to read from hfile");
+            }
+          }
+          R retVal = this.next;
+          if (scanner.next()) {
+            this.next = (R)getRecordFromCell(scanner.getKeyValue(), getSchema(), readerSchema);
+          } else {
+            this.next = null;
+            this.eof = true;
+          }
+          return retVal;
+        } catch (IOException io) {
+          throw new HoodieIOException("unable to read next record from parquet file ", io);
+        }
+      }
+    };
+  }
+
+  @Override
+  public Option getRecordByKey(String key, Schema readerSchema) throws IOException {
+    HFileScanner scanner = reader.getScanner(false, true);
+    KeyValue kv = new KeyValue(key.getBytes(), null, null, null);
+    if (scanner.seekTo(kv) == 0) {
+      Cell c = scanner.getKeyValue();
+      byte[] keyBytes = Arrays.copyOfRange(c.getRowArray(), c.getRowOffset(), c.getRowOffset() + c.getRowLength());
+      R record = getRecordFromCell(c, getSchema(), readerSchema);
+      return Option.of(record);
+    }
+
+    return Option.empty();
+  }
+
+  private R getRecordFromCell(Cell c, Schema writerSchema, Schema readerSchema) throws IOException {
+    byte[] value = Arrays.copyOfRange(c.getValueArray(), c.getValueOffset(), c.getValueOffset() + c.getValueLength());
+    return (R)HoodieAvroUtils.bytesToAvro(value, writerSchema, readerSchema);
+  }
+
+  @Override
+  public long getTotalRecords() {
+    return reader.getEntries();
+  }
+
+  @Override
+  public void close() {
+    try {
+      reader.close();
+      reader = null;
+    } catch (IOException e) {
+      e.printStackTrace();
+    }
+  }
+
+  static class SeekableByteArrayInputStream extends ByteArrayInputStream implements Seekable, PositionedReadable {
+    public SeekableByteArrayInputStream(byte[] buf) {
+      super(buf);
+    }
+
+    @Override
+    public long getPos() throws IOException {
+      return pos;
+    }
+
+    @Override
+    public void seek(long pos) throws IOException {
+      if (mark != 0) {
+        throw new IllegalStateException();
+      }
+
+      reset();
+      long skipped = skip(pos);
+
+      if (skipped != pos) {
+        throw new IOException();
+      }
+    }
+
+    @Override
+    public boolean seekToNewSource(long targetPos) throws IOException {
+      return false;
+    }
+
+    @Override
+    public int read(long position, byte[] buffer, int offset, int length) throws IOException {
+
+      if (position >= buf.length) {
+        throw new IllegalArgumentException();
+      }
+      if (position + length > buf.length) {
+        throw new IllegalArgumentException();
+      }
+      if (length > buffer.length) {
+        throw new IllegalArgumentException();
+      }
+
+      System.arraycopy(buf, (int) position, buffer, offset, length);
+      return length;
+    }
+
+    @Override
+    public void readFully(long position, byte[] buffer) throws IOException {
+      read(position, buffer, 0, buffer.length);
+    }
+
+    @Override
+    public void readFully(long position, byte[] buffer, int offset, int length) throws IOException {
+      read(position, buffer, offset, length);
+    }
+  }
+}
diff --git a/hudi-common/src/test/java/org/apache/hudi/common/functional/TestHoodieAvroLogFormat.java b/hudi-common/src/test/java/org/apache/hudi/common/functional/TestHoodieAvroLogFormat.java
deleted file mode 100644
index 940ae87..0000000
--- a/hudi-common/src/test/java/org/apache/hudi/common/functional/TestHoodieAvroLogFormat.java
+++ /dev/null
@@ -1,30 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *      http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.hudi.common.functional;
-
-import org.apache.hudi.common.table.log.block.HoodieLogBlock.HoodieLogBlockType;
-
-/**
- * Tests Avro log format {@link HoodieAvroDataBlock}.
- */
-public class TestHoodieAvroLogFormat extends TestHoodieLogFormat {
-  public TestHoodieAvroLogFormat() {
-    super(HoodieLogBlockType.AVRO_DATA_BLOCK);
-  }
-}
diff --git a/hudi-common/src/test/java/org/apache/hudi/common/functional/TestHoodieLogFormat.java b/hudi-common/src/test/java/org/apache/hudi/common/functional/TestHoodieLogFormat.java
index 6214af1..1d7ca97 100755
--- a/hudi-common/src/test/java/org/apache/hudi/common/functional/TestHoodieLogFormat.java
+++ b/hudi-common/src/test/java/org/apache/hudi/common/functional/TestHoodieLogFormat.java
@@ -34,6 +34,7 @@ import org.apache.hudi.common.table.log.block.HoodieAvroDataBlock;
 import org.apache.hudi.common.table.log.block.HoodieCommandBlock;
 import org.apache.hudi.common.table.log.block.HoodieDataBlock;
 import org.apache.hudi.common.table.log.block.HoodieDeleteBlock;
+import org.apache.hudi.common.table.log.block.HoodieHFileDataBlock;
 import org.apache.hudi.common.table.log.block.HoodieLogBlock;
 import org.apache.hudi.common.table.log.block.HoodieLogBlock.HeaderMetadataType;
 import org.apache.hudi.common.table.log.block.HoodieLogBlock.HoodieLogBlockType;
@@ -56,6 +57,7 @@ import org.junit.jupiter.api.BeforeAll;
 import org.junit.jupiter.api.BeforeEach;
 import org.junit.jupiter.api.Test;
 import org.junit.jupiter.params.ParameterizedTest;
+import org.junit.jupiter.params.provider.EnumSource;
 import org.junit.jupiter.params.provider.ValueSource;
 
 import java.io.IOException;
@@ -82,20 +84,13 @@ import static org.junit.jupiter.api.Assertions.assertTrue;
  * Tests hoodie log format {@link HoodieLogFormat}.
  */
 @SuppressWarnings("Duplicates")
-public abstract class TestHoodieLogFormat extends HoodieCommonTestHarness {
+public class TestHoodieLogFormat extends HoodieCommonTestHarness {
 
   private static String BASE_OUTPUT_PATH = "/tmp/";
   private FileSystem fs;
   private Path partitionPath;
   private int bufferSize = 4096;
-  private HoodieLogBlockType dataBlockType;
-
-  public TestHoodieLogFormat(HoodieLogBlockType dataBlockType) {
-    this.dataBlockType = dataBlockType;
-  }
-
-  private TestHoodieLogFormat() {
-  }
+  private HoodieLogBlockType dataBlockType = HoodieLogBlockType.AVRO_DATA_BLOCK;
 
   @BeforeAll
   public static void setUpClass() throws IOException, InterruptedException {
@@ -133,8 +128,9 @@ public abstract class TestHoodieLogFormat extends HoodieCommonTestHarness {
     assertEquals(1, writer.getLogFile().getLogVersion(), "Version should be 1 for new log created");
   }
 
-  @Test
-  public void testBasicAppend() throws IOException, InterruptedException, URISyntaxException {
+  @ParameterizedTest
+  @EnumSource(names = { "AVRO_DATA_BLOCK", "HFILE_DATA_BLOCK" })
+  public void testBasicAppend(HoodieLogBlockType dataBlockType) throws IOException, InterruptedException, URISyntaxException {
     Writer writer =
         HoodieLogFormat.newWriterBuilder().onParentPath(partitionPath).withFileExtension(HoodieLogFile.DELTA_EXTENSION)
             .withFileId("test-fileid1").overBaseCommit("100").withFs(fs).build();
@@ -142,7 +138,7 @@ public abstract class TestHoodieLogFormat extends HoodieCommonTestHarness {
     Map<HeaderMetadataType, String> header = new HashMap<>();
     header.put(HoodieLogBlock.HeaderMetadataType.INSTANT_TIME, "100");
     header.put(HoodieLogBlock.HeaderMetadataType.SCHEMA, getSimpleSchema().toString());
-    HoodieDataBlock dataBlock = getDataBlock(records, header);
+    HoodieDataBlock dataBlock = getDataBlock(dataBlockType, records, header);
     writer = writer.appendBlock(dataBlock);
     long size = writer.getCurrentSize();
     assertTrue(size > 0, "We just wrote a block - size should be > 0");
@@ -151,7 +147,8 @@ public abstract class TestHoodieLogFormat extends HoodieCommonTestHarness {
     writer.close();
   }
 
-  @Test
+  @ParameterizedTest
+  @EnumSource(names = { "AVRO_DATA_BLOCK", "HFILE_DATA_BLOCK" })
   public void testRollover() throws IOException, InterruptedException, URISyntaxException {
     Writer writer =
         HoodieLogFormat.newWriterBuilder().onParentPath(partitionPath).withFileExtension(HoodieLogFile.DELTA_EXTENSION)
@@ -335,7 +332,8 @@ public abstract class TestHoodieLogFormat extends HoodieCommonTestHarness {
     assertEquals(2, statuses.length);
   }
 
-  @Test
+  @ParameterizedTest
+  @EnumSource(names = { "AVRO_DATA_BLOCK", "HFILE_DATA_BLOCK" })
   public void testBasicWriteAndScan() throws IOException, URISyntaxException, InterruptedException {
     Writer writer =
         HoodieLogFormat.newWriterBuilder().onParentPath(partitionPath).withFileExtension(HoodieLogFile.DELTA_EXTENSION)
@@ -363,7 +361,8 @@ public abstract class TestHoodieLogFormat extends HoodieCommonTestHarness {
     reader.close();
   }
 
-  @Test
+  @ParameterizedTest
+  @EnumSource(names = { "AVRO_DATA_BLOCK", "HFILE_DATA_BLOCK" })
   public void testBasicAppendAndRead() throws IOException, URISyntaxException, InterruptedException {
     Writer writer =
         HoodieLogFormat.newWriterBuilder().onParentPath(partitionPath).withFileExtension(HoodieLogFile.DELTA_EXTENSION)
@@ -1440,9 +1439,16 @@ public abstract class TestHoodieLogFormat extends HoodieCommonTestHarness {
   }
 
   private HoodieDataBlock getDataBlock(List<IndexedRecord> records, Map<HeaderMetadataType, String> header) {
+    return getDataBlock(dataBlockType, records, header);
+  }
+
+  private HoodieDataBlock getDataBlock(HoodieLogBlockType dataBlockType, List<IndexedRecord> records,
+                                       Map<HeaderMetadataType, String> header) {
     switch (dataBlockType) {
       case AVRO_DATA_BLOCK:
         return new HoodieAvroDataBlock(records, header);
+      case HFILE_DATA_BLOCK:
+        return new HoodieHFileDataBlock(records, header);
       default:
         throw new RuntimeException("Unknown data block type " + dataBlockType);
     }
diff --git a/hudi-hadoop-mr/src/main/java/org/apache/hudi/hadoop/HoodieHFileInputFormat.java b/hudi-hadoop-mr/src/main/java/org/apache/hudi/hadoop/HoodieHFileInputFormat.java
new file mode 100644
index 0000000..1747888
--- /dev/null
+++ b/hudi-hadoop-mr/src/main/java/org/apache/hudi/hadoop/HoodieHFileInputFormat.java
@@ -0,0 +1,163 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hudi.hadoop;
+
+import org.apache.hadoop.conf.Configurable;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileStatus;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.io.ArrayWritable;
+import org.apache.hadoop.io.NullWritable;
+import org.apache.hadoop.mapred.FileInputFormat;
+import org.apache.hadoop.mapred.InputSplit;
+import org.apache.hadoop.mapred.JobConf;
+import org.apache.hadoop.mapred.RecordReader;
+import org.apache.hadoop.mapred.Reporter;
+import org.apache.hadoop.mapreduce.Job;
+import org.apache.hudi.common.model.HoodieFileFormat;
+import org.apache.hudi.common.table.HoodieTableMetaClient;
+import org.apache.hudi.common.table.timeline.HoodieDefaultTimeline;
+import org.apache.hudi.common.table.timeline.HoodieInstant;
+import org.apache.hudi.common.table.timeline.HoodieTimeline;
+import org.apache.hudi.common.util.Option;
+import org.apache.hudi.hadoop.utils.HoodieHiveUtils;
+import org.apache.hudi.hadoop.utils.HoodieInputFormatUtils;
+import org.apache.log4j.LogManager;
+import org.apache.log4j.Logger;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.List;
+import java.util.Map;
+
+/**
+ * HoodieInputFormat for HUDI datasets which store data in HFile base file format.
+ */
+@UseFileSplitsFromInputFormat
+public class HoodieHFileInputFormat extends FileInputFormat<NullWritable, ArrayWritable> implements Configurable {
+
+  private static final Logger LOG = LogManager.getLogger(HoodieHFileInputFormat.class);
+
+  protected Configuration conf;
+
+  protected HoodieDefaultTimeline filterInstantsTimeline(HoodieDefaultTimeline timeline) {
+    return HoodieInputFormatUtils.filterInstantsTimeline(timeline);
+  }
+
+  @Override
+  public FileStatus[] listStatus(JobConf job) throws IOException {
+    // Segregate inputPaths[] to incremental, snapshot and non hoodie paths
+    List<String> incrementalTables = HoodieHiveUtils.getIncrementalTableNames(Job.getInstance(job));
+    InputPathHandler inputPathHandler = new InputPathHandler(conf, getInputPaths(job), incrementalTables);
+    List<FileStatus> returns = new ArrayList<>();
+
+    Map<String, HoodieTableMetaClient> tableMetaClientMap = inputPathHandler.getTableMetaClientMap();
+    // process incremental pulls first
+    for (String table : incrementalTables) {
+      HoodieTableMetaClient metaClient = tableMetaClientMap.get(table);
+      if (metaClient == null) {
+        /* This can happen when the INCREMENTAL mode is set for a table but there were no InputPaths
+         * in the jobConf
+         */
+        continue;
+      }
+      List<Path> inputPaths = inputPathHandler.getGroupedIncrementalPaths().get(metaClient);
+      List<FileStatus> result = listStatusForIncrementalMode(job, metaClient, inputPaths);
+      if (result != null) {
+        returns.addAll(result);
+      }
+    }
+
+    // process non hoodie Paths next.
+    List<Path> nonHoodiePaths = inputPathHandler.getNonHoodieInputPaths();
+    if (nonHoodiePaths.size() > 0) {
+      setInputPaths(job, nonHoodiePaths.toArray(new Path[nonHoodiePaths.size()]));
+      FileStatus[] fileStatuses = super.listStatus(job);
+      returns.addAll(Arrays.asList(fileStatuses));
+    }
+
+    // process snapshot queries next.
+    List<Path> snapshotPaths = inputPathHandler.getSnapshotPaths();
+    if (snapshotPaths.size() > 0) {
+      setInputPaths(job, snapshotPaths.toArray(new Path[snapshotPaths.size()]));
+      FileStatus[] fileStatuses = super.listStatus(job);
+      Map<HoodieTableMetaClient, List<FileStatus>> groupedFileStatus =
+          HoodieInputFormatUtils.groupFileStatusForSnapshotPaths(fileStatuses, HoodieFileFormat.HFILE.getFileExtension(),
+              tableMetaClientMap.values());
+      LOG.info("Found a total of " + groupedFileStatus.size() + " groups");
+      for (Map.Entry<HoodieTableMetaClient, List<FileStatus>> entry : groupedFileStatus.entrySet()) {
+        List<FileStatus> result = HoodieInputFormatUtils.filterFileStatusForSnapshotMode(job, entry.getKey(), entry.getValue());
+        if (result != null) {
+          returns.addAll(result);
+        }
+      }
+    }
+    return returns.toArray(new FileStatus[returns.size()]);
+  }
+
+  /**
+   * Achieves listStatus functionality for an incrementally queried table. Instead of listing all
+   * partitions and then filtering based on the commits of interest, this logic first extracts the
+   * partitions touched by the desired commits and then lists only those partitions.
+   */
+  private List<FileStatus> listStatusForIncrementalMode(
+      JobConf job, HoodieTableMetaClient tableMetaClient, List<Path> inputPaths) throws IOException {
+    String tableName = tableMetaClient.getTableConfig().getTableName();
+    Job jobContext = Job.getInstance(job);
+    Option<HoodieTimeline> timeline = HoodieInputFormatUtils.getFilteredCommitsTimeline(jobContext, tableMetaClient);
+    if (!timeline.isPresent()) {
+      return null;
+    }
+    Option<List<HoodieInstant>> commitsToCheck = HoodieInputFormatUtils.getCommitsForIncrementalQuery(jobContext, tableName, timeline.get());
+    if (!commitsToCheck.isPresent()) {
+      return null;
+    }
+    Option<String> incrementalInputPaths = HoodieInputFormatUtils.getAffectedPartitions(commitsToCheck.get(), tableMetaClient, timeline.get(), inputPaths);
+    // Mutate the JobConf to set the input paths to only partitions touched by incremental pull.
+    if (!incrementalInputPaths.isPresent()) {
+      return null;
+    }
+    setInputPaths(job, incrementalInputPaths.get());
+    FileStatus[] fileStatuses = super.listStatus(job);
+    return HoodieInputFormatUtils.filterIncrementalFileStatus(jobContext, tableMetaClient, timeline.get(), fileStatuses, commitsToCheck.get());
+  }
+
+  @Override
+  public RecordReader<NullWritable, ArrayWritable> getRecordReader(final InputSplit split, final JobConf job,
+      final Reporter reporter) throws IOException {
+    return new HoodieHFileRecordReader(conf, split, job);
+  }
+
+  @Override
+  protected boolean isSplitable(FileSystem fs, Path filename) {
+    // This file isn't splittable.
+    return false;
+  }
+
+  public void setConf(Configuration conf) {
+    this.conf = conf;
+  }
+
+  @Override
+  public Configuration getConf() {
+    return conf;
+  }
+}
diff --git a/hudi-hadoop-mr/src/main/java/org/apache/hudi/hadoop/HoodieHFileRecordReader.java b/hudi-hadoop-mr/src/main/java/org/apache/hudi/hadoop/HoodieHFileRecordReader.java
new file mode 100644
index 0000000..53ccb74
--- /dev/null
+++ b/hudi-hadoop-mr/src/main/java/org/apache/hudi/hadoop/HoodieHFileRecordReader.java
@@ -0,0 +1,101 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hudi.hadoop;
+
+import java.io.IOException;
+import java.util.Iterator;
+
+import org.apache.avro.Schema;
+import org.apache.avro.generic.GenericRecord;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.hbase.io.hfile.CacheConfig;
+import org.apache.hadoop.io.ArrayWritable;
+import org.apache.hadoop.io.NullWritable;
+import org.apache.hadoop.io.Writable;
+import org.apache.hadoop.mapred.FileSplit;
+import org.apache.hadoop.mapred.InputSplit;
+import org.apache.hadoop.mapred.JobConf;
+import org.apache.hadoop.mapred.RecordReader;
+import org.apache.hudi.hadoop.utils.HoodieRealtimeRecordReaderUtils;
+import org.apache.hudi.io.storage.HoodieHFileReader;
+
+public class HoodieHFileRecordReader implements RecordReader<NullWritable, ArrayWritable> {
+
+  private long count = 0;
+  private ArrayWritable valueObj;
+  private HoodieHFileReader reader;
+  private Iterator<GenericRecord> recordIterator;
+  private Schema schema;
+
+  public HoodieHFileRecordReader(Configuration conf, InputSplit split, JobConf job) throws IOException {
+    FileSplit fileSplit = (FileSplit) split;
+    Path path = fileSplit.getPath();
+    reader = new HoodieHFileReader(conf, path, new CacheConfig(conf));
+
+    schema = reader.getSchema();
+    valueObj = new ArrayWritable(Writable.class, new Writable[schema.getFields().size()]);
+  }
+
+  @Override
+  public boolean next(NullWritable key, ArrayWritable value) throws IOException {
+    if (recordIterator == null) {
+      recordIterator = reader.getRecordIterator(schema);
+    }
+
+    if (!recordIterator.hasNext()) {
+      return false;
+    }
+
+    GenericRecord record = recordIterator.next();
+    ArrayWritable aWritable = (ArrayWritable) HoodieRealtimeRecordReaderUtils.avroToArrayWritable(record, schema);
+    value.set(aWritable.get());
+    count++;
+    return true;
+  }
+
+  @Override
+  public NullWritable createKey() {
+    return null;
+  }
+
+  @Override
+  public ArrayWritable createValue() {
+    return valueObj;
+  }
+
+  @Override
+  public long getPos() throws IOException {
+    // TODO Auto-generated method stub
+    return 0;
+  }
+
+  @Override
+  public void close() throws IOException {
+    if (reader != null) {
+      reader.close();
+      reader = null;
+    }
+  }
+
+  @Override
+  public float getProgress() throws IOException {
+    return 1.0f * count / reader.getTotalRecords();
+  }
+}
diff --git a/hudi-hadoop-mr/src/main/java/org/apache/hudi/hadoop/InputPathHandler.java b/hudi-hadoop-mr/src/main/java/org/apache/hudi/hadoop/InputPathHandler.java
index 1ad3812..0a5055a 100644
--- a/hudi-hadoop-mr/src/main/java/org/apache/hudi/hadoop/InputPathHandler.java
+++ b/hudi-hadoop-mr/src/main/java/org/apache/hudi/hadoop/InputPathHandler.java
@@ -51,7 +51,7 @@ public class InputPathHandler {
   private final List<Path> snapshotPaths;
   private final List<Path> nonHoodieInputPaths;
 
-  InputPathHandler(Configuration conf, Path[] inputPaths, List<String> incrementalTables) throws IOException {
+  public InputPathHandler(Configuration conf, Path[] inputPaths, List<String> incrementalTables) throws IOException {
     this.conf = conf;
     tableMetaClientMap = new HashMap<>();
     snapshotPaths = new ArrayList<>();
diff --git a/hudi-hadoop-mr/src/main/java/org/apache/hudi/hadoop/realtime/HoodieParquetRealtimeInputFormat.java b/hudi-hadoop-mr/src/main/java/org/apache/hudi/hadoop/realtime/HoodieHFileRealtimeInputFormat.java
similarity index 50%
copy from hudi-hadoop-mr/src/main/java/org/apache/hudi/hadoop/realtime/HoodieParquetRealtimeInputFormat.java
copy to hudi-hadoop-mr/src/main/java/org/apache/hudi/hadoop/realtime/HoodieHFileRealtimeInputFormat.java
index 9cb0ada..e75cff6 100644
--- a/hudi-hadoop-mr/src/main/java/org/apache/hudi/hadoop/realtime/HoodieParquetRealtimeInputFormat.java
+++ b/hudi-hadoop-mr/src/main/java/org/apache/hudi/hadoop/realtime/HoodieHFileRealtimeInputFormat.java
@@ -18,17 +18,10 @@
 
 package org.apache.hudi.hadoop.realtime;
 
-import org.apache.hudi.common.model.HoodieRecord;
-import org.apache.hudi.common.table.timeline.HoodieDefaultTimeline;
-import org.apache.hudi.common.util.ValidationUtils;
-import org.apache.hudi.hadoop.HoodieColumnProjectionUtils;
-import org.apache.hudi.hadoop.HoodieParquetInputFormat;
-import org.apache.hudi.hadoop.UseFileSplitsFromInputFormat;
-import org.apache.hudi.hadoop.utils.HoodieInputFormatUtils;
-import org.apache.hudi.hadoop.utils.HoodieRealtimeInputFormatUtils;
+import java.io.IOException;
+import java.util.Arrays;
+import java.util.stream.Stream;
 
-import org.apache.hadoop.conf.Configurable;
-import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.fs.FileStatus;
 import org.apache.hadoop.hive.serde2.ColumnProjectionUtils;
 import org.apache.hadoop.io.ArrayWritable;
@@ -38,42 +31,34 @@ import org.apache.hadoop.mapred.InputSplit;
 import org.apache.hadoop.mapred.JobConf;
 import org.apache.hadoop.mapred.RecordReader;
 import org.apache.hadoop.mapred.Reporter;
+import org.apache.hudi.common.table.timeline.HoodieDefaultTimeline;
+import org.apache.hudi.common.util.ValidationUtils;
+import org.apache.hudi.hadoop.HoodieHFileInputFormat;
+import org.apache.hudi.hadoop.UseFileSplitsFromInputFormat;
 import org.apache.hudi.hadoop.UseRecordReaderFromInputFormat;
+import org.apache.hudi.hadoop.utils.HoodieInputFormatUtils;
+import org.apache.hudi.hadoop.utils.HoodieRealtimeInputFormatUtils;
 import org.apache.log4j.LogManager;
 import org.apache.log4j.Logger;
 
-import java.io.IOException;
-import java.util.ArrayList;
-import java.util.Arrays;
-import java.util.List;
-import java.util.stream.Stream;
-
 /**
- * Input Format, that provides a real-time view of data in a Hoodie table.
+ * HoodieRealtimeInputFormat for HUDI datasets which store data in HFile base file format.
  */
 @UseRecordReaderFromInputFormat
 @UseFileSplitsFromInputFormat
-public class HoodieParquetRealtimeInputFormat extends HoodieParquetInputFormat implements Configurable {
-
-  private static final Logger LOG = LogManager.getLogger(HoodieParquetRealtimeInputFormat.class);
+public class HoodieHFileRealtimeInputFormat extends HoodieHFileInputFormat {
 
-  // To make Hive on Spark queries work with RT tables. Our theory is that due to
-  // {@link org.apache.hadoop.hive.ql.io.parquet.ProjectionPusher}
-  // not handling empty list correctly, the ParquetRecordReaderWrapper ends up adding the same column ids multiple
-  // times which ultimately breaks the query.
+  private static final Logger LOG = LogManager.getLogger(HoodieHFileRealtimeInputFormat.class);
 
   @Override
   public InputSplit[] getSplits(JobConf job, int numSplits) throws IOException {
-
     Stream<FileSplit> fileSplits = Arrays.stream(super.getSplits(job, numSplits)).map(is -> (FileSplit) is);
-
     return HoodieRealtimeInputFormatUtils.getRealtimeSplits(job, fileSplits);
   }
 
   @Override
   public FileStatus[] listStatus(JobConf job) throws IOException {
-    // Call the HoodieInputFormat::listStatus to obtain all latest parquet files, based on commit
-    // timeline.
+    // Call the HoodieInputFormat::listStatus to obtain all latest hfiles, based on commit timeline.
     return super.listStatus(job);
   }
 
@@ -83,71 +68,9 @@ public class HoodieParquetRealtimeInputFormat extends HoodieParquetInputFormat i
     return timeline;
   }
 
-  /**
-   * Add a field to the existing fields projected.
-   */
-  private static Configuration addProjectionField(Configuration conf, String fieldName, int fieldIndex) {
-    String readColNames = conf.get(ColumnProjectionUtils.READ_COLUMN_NAMES_CONF_STR, "");
-    String readColIds = conf.get(ColumnProjectionUtils.READ_COLUMN_IDS_CONF_STR, "");
-
-    String readColNamesPrefix = readColNames + ",";
-    if (readColNames == null || readColNames.isEmpty()) {
-      readColNamesPrefix = "";
-    }
-    String readColIdsPrefix = readColIds + ",";
-    if (readColIds == null || readColIds.isEmpty()) {
-      readColIdsPrefix = "";
-    }
-
-    if (!readColNames.contains(fieldName)) {
-      // If not already in the list - then add it
-      conf.set(ColumnProjectionUtils.READ_COLUMN_NAMES_CONF_STR, readColNamesPrefix + fieldName);
-      conf.set(ColumnProjectionUtils.READ_COLUMN_IDS_CONF_STR, readColIdsPrefix + fieldIndex);
-      if (LOG.isDebugEnabled()) {
-        LOG.debug(String.format("Adding extra column " + fieldName + ", to enable log merging cols (%s) ids (%s) ",
-            conf.get(ColumnProjectionUtils.READ_COLUMN_NAMES_CONF_STR),
-            conf.get(ColumnProjectionUtils.READ_COLUMN_IDS_CONF_STR)));
-      }
-    }
-    return conf;
-  }
-
-  private static void addRequiredProjectionFields(Configuration configuration) {
-    List<Integer> projectedIds = new ArrayList<>(HoodieColumnProjectionUtils.getReadColumnIDs(configuration));
-    List<String> projectedNames = new ArrayList<>(Arrays.asList(HoodieColumnProjectionUtils.getReadColumnNames(configuration)));
-    projectedIds.addAll(Arrays.asList(
-        HoodieInputFormatUtils.HOODIE_RECORD_KEY_COL_POS,
-        HoodieInputFormatUtils.HOODIE_COMMIT_TIME_COL_POS,
-        HoodieInputFormatUtils.HOODIE_PARTITION_PATH_COL_POS)
-    );
-    projectedNames.addAll(Arrays.asList(
-        HoodieRecord.RECORD_KEY_METADATA_FIELD,
-        HoodieRecord.COMMIT_TIME_METADATA_FIELD,
-        HoodieRecord.PARTITION_PATH_METADATA_FIELD)
-    );
-
-    HoodieColumnProjectionUtils.setReadColumns(configuration, projectedIds, projectedNames);
-  }
-
-  /**
-   * Hive will append read columns' ids to old columns' ids during getRecordReader. In some cases, e.g. SELECT COUNT(*),
-   * the read columns' id is an empty string and Hive will combine it with Hoodie required projection ids and becomes
-   * e.g. ",2,0,3" and will cause an error. Actually this method is a temporary solution because the real bug is from
-   * Hive. Hive has fixed this bug after 3.0.0, but the version before that would still face this problem. (HIVE-22438)
-   */
-  private static void cleanProjectionColumnIds(Configuration conf) {
-    String columnIds = conf.get(ColumnProjectionUtils.READ_COLUMN_IDS_CONF_STR);
-    if (!columnIds.isEmpty() && columnIds.charAt(0) == ',') {
-      conf.set(ColumnProjectionUtils.READ_COLUMN_IDS_CONF_STR, columnIds.substring(1));
-      if (LOG.isDebugEnabled()) {
-        LOG.debug("The projection Ids: {" + columnIds + "} start with ','. First comma is removed");
-      }
-    }
-  }
-
   @Override
   public RecordReader<NullWritable, ArrayWritable> getRecordReader(final InputSplit split, final JobConf jobConf,
-                                                                   final Reporter reporter) throws IOException {
+      final Reporter reporter) throws IOException {
     // Hive on Spark invokes multiple getRecordReaders from different threads in the same spark task (and hence the
     // same JVM) unlike Hive on MR. Due to this, accesses to JobConf, which is shared across all threads, is at the
     // risk of experiencing race conditions. Hence, we synchronize on the JobConf object here. There is negligible
@@ -166,8 +89,8 @@ public class HoodieParquetRealtimeInputFormat extends HoodieParquetInputFormat i
           // For e:g _hoodie_record_key would be missing and merge step would throw exceptions.
           // TO fix this, hoodie columns are appended late at the time record-reader gets built instead of construction
           // time.
-          cleanProjectionColumnIds(jobConf);
-          addRequiredProjectionFields(jobConf);
+          HoodieRealtimeInputFormatUtils.cleanProjectionColumnIds(jobConf);
+          HoodieRealtimeInputFormatUtils.addRequiredProjectionFields(jobConf);
 
           this.conf = jobConf;
           this.conf.set(HoodieInputFormatUtils.HOODIE_READ_COLUMNS_PROP, "true");
@@ -177,17 +100,11 @@ public class HoodieParquetRealtimeInputFormat extends HoodieParquetInputFormat i
 
     LOG.info("Creating record reader with readCols :" + jobConf.get(ColumnProjectionUtils.READ_COLUMN_NAMES_CONF_STR)
         + ", Ids :" + jobConf.get(ColumnProjectionUtils.READ_COLUMN_IDS_CONF_STR));
-
     // sanity check
-    ValidationUtils.checkArgument(split instanceof RealtimeSplit,
-        "HoodieRealtimeRecordReader can only work on RealtimeSplit and not with " + split);
+    ValidationUtils.checkArgument(split instanceof HoodieRealtimeFileSplit,
+        "HoodieRealtimeRecordReader can only work on HoodieRealtimeFileSplit and not with " + split);
 
-    return new HoodieRealtimeRecordReader((RealtimeSplit) split, jobConf,
+    return new HoodieRealtimeRecordReader((HoodieRealtimeFileSplit) split, jobConf,
         super.getRecordReader(split, jobConf, reporter));
   }
-
-  @Override
-  public Configuration getConf() {
-    return conf;
-  }
-}
+}
\ No newline at end of file
diff --git a/hudi-hadoop-mr/src/main/java/org/apache/hudi/hadoop/realtime/HoodieParquetRealtimeInputFormat.java b/hudi-hadoop-mr/src/main/java/org/apache/hudi/hadoop/realtime/HoodieParquetRealtimeInputFormat.java
index 9cb0ada..5bcfbe9 100644
--- a/hudi-hadoop-mr/src/main/java/org/apache/hudi/hadoop/realtime/HoodieParquetRealtimeInputFormat.java
+++ b/hudi-hadoop-mr/src/main/java/org/apache/hudi/hadoop/realtime/HoodieParquetRealtimeInputFormat.java
@@ -18,10 +18,8 @@
 
 package org.apache.hudi.hadoop.realtime;
 
-import org.apache.hudi.common.model.HoodieRecord;
 import org.apache.hudi.common.table.timeline.HoodieDefaultTimeline;
 import org.apache.hudi.common.util.ValidationUtils;
-import org.apache.hudi.hadoop.HoodieColumnProjectionUtils;
 import org.apache.hudi.hadoop.HoodieParquetInputFormat;
 import org.apache.hudi.hadoop.UseFileSplitsFromInputFormat;
 import org.apache.hudi.hadoop.utils.HoodieInputFormatUtils;
@@ -43,9 +41,7 @@ import org.apache.log4j.LogManager;
 import org.apache.log4j.Logger;
 
 import java.io.IOException;
-import java.util.ArrayList;
 import java.util.Arrays;
-import java.util.List;
 import java.util.stream.Stream;
 
 /**
@@ -83,68 +79,6 @@ public class HoodieParquetRealtimeInputFormat extends HoodieParquetInputFormat i
     return timeline;
   }
 
-  /**
-   * Add a field to the existing fields projected.
-   */
-  private static Configuration addProjectionField(Configuration conf, String fieldName, int fieldIndex) {
-    String readColNames = conf.get(ColumnProjectionUtils.READ_COLUMN_NAMES_CONF_STR, "");
-    String readColIds = conf.get(ColumnProjectionUtils.READ_COLUMN_IDS_CONF_STR, "");
-
-    String readColNamesPrefix = readColNames + ",";
-    if (readColNames == null || readColNames.isEmpty()) {
-      readColNamesPrefix = "";
-    }
-    String readColIdsPrefix = readColIds + ",";
-    if (readColIds == null || readColIds.isEmpty()) {
-      readColIdsPrefix = "";
-    }
-
-    if (!readColNames.contains(fieldName)) {
-      // If not already in the list - then add it
-      conf.set(ColumnProjectionUtils.READ_COLUMN_NAMES_CONF_STR, readColNamesPrefix + fieldName);
-      conf.set(ColumnProjectionUtils.READ_COLUMN_IDS_CONF_STR, readColIdsPrefix + fieldIndex);
-      if (LOG.isDebugEnabled()) {
-        LOG.debug(String.format("Adding extra column " + fieldName + ", to enable log merging cols (%s) ids (%s) ",
-            conf.get(ColumnProjectionUtils.READ_COLUMN_NAMES_CONF_STR),
-            conf.get(ColumnProjectionUtils.READ_COLUMN_IDS_CONF_STR)));
-      }
-    }
-    return conf;
-  }
-
-  private static void addRequiredProjectionFields(Configuration configuration) {
-    List<Integer> projectedIds = new ArrayList<>(HoodieColumnProjectionUtils.getReadColumnIDs(configuration));
-    List<String> projectedNames = new ArrayList<>(Arrays.asList(HoodieColumnProjectionUtils.getReadColumnNames(configuration)));
-    projectedIds.addAll(Arrays.asList(
-        HoodieInputFormatUtils.HOODIE_RECORD_KEY_COL_POS,
-        HoodieInputFormatUtils.HOODIE_COMMIT_TIME_COL_POS,
-        HoodieInputFormatUtils.HOODIE_PARTITION_PATH_COL_POS)
-    );
-    projectedNames.addAll(Arrays.asList(
-        HoodieRecord.RECORD_KEY_METADATA_FIELD,
-        HoodieRecord.COMMIT_TIME_METADATA_FIELD,
-        HoodieRecord.PARTITION_PATH_METADATA_FIELD)
-    );
-
-    HoodieColumnProjectionUtils.setReadColumns(configuration, projectedIds, projectedNames);
-  }
-
-  /**
-   * Hive will append read columns' ids to old columns' ids during getRecordReader. In some cases, e.g. SELECT COUNT(*),
-   * the read columns' id is an empty string and Hive will combine it with Hoodie required projection ids and becomes
-   * e.g. ",2,0,3" and will cause an error. Actually this method is a temporary solution because the real bug is from
-   * Hive. Hive has fixed this bug after 3.0.0, but the version before that would still face this problem. (HIVE-22438)
-   */
-  private static void cleanProjectionColumnIds(Configuration conf) {
-    String columnIds = conf.get(ColumnProjectionUtils.READ_COLUMN_IDS_CONF_STR);
-    if (!columnIds.isEmpty() && columnIds.charAt(0) == ',') {
-      conf.set(ColumnProjectionUtils.READ_COLUMN_IDS_CONF_STR, columnIds.substring(1));
-      if (LOG.isDebugEnabled()) {
-        LOG.debug("The projection Ids: {" + columnIds + "} start with ','. First comma is removed");
-      }
-    }
-  }
-
   @Override
   public RecordReader<NullWritable, ArrayWritable> getRecordReader(final InputSplit split, final JobConf jobConf,
                                                                    final Reporter reporter) throws IOException {
@@ -166,8 +100,8 @@ public class HoodieParquetRealtimeInputFormat extends HoodieParquetInputFormat i
           // For e:g _hoodie_record_key would be missing and merge step would throw exceptions.
           // TO fix this, hoodie columns are appended late at the time record-reader gets built instead of construction
           // time.
-          cleanProjectionColumnIds(jobConf);
-          addRequiredProjectionFields(jobConf);
+          HoodieRealtimeInputFormatUtils.cleanProjectionColumnIds(jobConf);
+          HoodieRealtimeInputFormatUtils.addRequiredProjectionFields(jobConf);
 
           this.conf = jobConf;
           this.conf.set(HoodieInputFormatUtils.HOODIE_READ_COLUMNS_PROP, "true");
diff --git a/hudi-hadoop-mr/src/main/java/org/apache/hudi/hadoop/utils/HoodieInputFormatUtils.java b/hudi-hadoop-mr/src/main/java/org/apache/hudi/hadoop/utils/HoodieInputFormatUtils.java
index bf46f46..f2c090d 100644
--- a/hudi-hadoop-mr/src/main/java/org/apache/hudi/hadoop/utils/HoodieInputFormatUtils.java
+++ b/hudi-hadoop-mr/src/main/java/org/apache/hudi/hadoop/utils/HoodieInputFormatUtils.java
@@ -33,8 +33,10 @@ import org.apache.hudi.common.util.Option;
 import org.apache.hudi.common.util.StringUtils;
 import org.apache.hudi.exception.HoodieIOException;
 import org.apache.hudi.hadoop.FileStatusWithBootstrapBaseFile;
+import org.apache.hudi.hadoop.HoodieHFileInputFormat;
 import org.apache.hudi.hadoop.HoodieParquetInputFormat;
 import org.apache.hudi.hadoop.LocatedFileStatusWithBootstrapBaseFile;
+import org.apache.hudi.hadoop.realtime.HoodieHFileRealtimeInputFormat;
 import org.apache.hudi.hadoop.realtime.HoodieParquetRealtimeInputFormat;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.fs.FileStatus;
@@ -82,6 +84,16 @@ public class HoodieInputFormatUtils {
           inputFormat.setConf(conf);
           return inputFormat;
         }
+      case HFILE:
+        if (realtime) {
+          HoodieHFileRealtimeInputFormat inputFormat = new HoodieHFileRealtimeInputFormat();
+          inputFormat.setConf(conf);
+          return inputFormat;
+        } else {
+          HoodieHFileInputFormat inputFormat = new HoodieHFileInputFormat();
+          inputFormat.setConf(conf);
+          return inputFormat;
+        }
       default:
         throw new HoodieIOException("Hoodie InputFormat not implemented for base file format " + baseFileFormat);
     }
@@ -96,6 +108,8 @@ public class HoodieInputFormatUtils {
     switch (baseFileFormat) {
       case PARQUET:
         return MapredParquetOutputFormat.class.getName();
+      case HFILE:
+        return MapredParquetOutputFormat.class.getName();
       default:
         throw new HoodieIOException("No OutputFormat for base file format " + baseFileFormat);
     }
@@ -105,6 +119,8 @@ public class HoodieInputFormatUtils {
     switch (baseFileFormat) {
       case PARQUET:
         return ParquetHiveSerDe.class.getName();
+      case HFILE:
+        return ParquetHiveSerDe.class.getName();
       default:
         throw new HoodieIOException("No SerDe for base file format " + baseFileFormat);
     }
@@ -115,6 +131,9 @@ public class HoodieInputFormatUtils {
     if (extension.equals(HoodieFileFormat.PARQUET.getFileExtension())) {
       return getInputFormat(HoodieFileFormat.PARQUET, realtime, conf);
     }
+    if (extension.equals(HoodieFileFormat.HFILE.getFileExtension())) {
+      return getInputFormat(HoodieFileFormat.HFILE, realtime, conf);
+    }
     throw new HoodieIOException("Hoodie InputFormat not implemented for base file of type " + extension);
   }
 
diff --git a/hudi-hadoop-mr/src/main/java/org/apache/hudi/hadoop/utils/HoodieRealtimeInputFormatUtils.java b/hudi-hadoop-mr/src/main/java/org/apache/hudi/hadoop/utils/HoodieRealtimeInputFormatUtils.java
index 346d7a0..760dd96 100644
--- a/hudi-hadoop-mr/src/main/java/org/apache/hudi/hadoop/utils/HoodieRealtimeInputFormatUtils.java
+++ b/hudi-hadoop-mr/src/main/java/org/apache/hudi/hadoop/utils/HoodieRealtimeInputFormatUtils.java
@@ -22,6 +22,7 @@ import org.apache.hudi.common.fs.FSUtils;
 import org.apache.hudi.common.model.FileSlice;
 import org.apache.hudi.common.model.HoodieBaseFile;
 import org.apache.hudi.common.model.HoodieLogFile;
+import org.apache.hudi.common.model.HoodieRecord;
 import org.apache.hudi.common.table.HoodieTableMetaClient;
 import org.apache.hudi.common.table.timeline.HoodieInstant;
 import org.apache.hudi.common.table.timeline.HoodieTimeline;
@@ -36,6 +37,7 @@ import org.apache.hudi.hadoop.realtime.RealtimeBootstrapBaseFileSplit;
 
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.hive.serde2.ColumnProjectionUtils;
 import org.apache.hadoop.mapred.FileSplit;
 import org.apache.hadoop.mapred.InputSplit;
 import org.apache.hadoop.mapred.SplitLocationInfo;
@@ -165,4 +167,56 @@ public class HoodieRealtimeInputFormatUtils extends HoodieInputFormatUtils {
     return resultMap;
   }
 
+
+  /**
+   * Add a field to the existing fields projected.
+   */
+  private static Configuration addProjectionField(Configuration conf, String fieldName, int fieldIndex) {
+    String readColNames = conf.get(ColumnProjectionUtils.READ_COLUMN_NAMES_CONF_STR, "");
+    String readColIds = conf.get(ColumnProjectionUtils.READ_COLUMN_IDS_CONF_STR, "");
+
+    String readColNamesPrefix = readColNames + ",";
+    if (readColNames == null || readColNames.isEmpty()) {
+      readColNamesPrefix = "";
+    }
+    String readColIdsPrefix = readColIds + ",";
+    if (readColIds == null || readColIds.isEmpty()) {
+      readColIdsPrefix = "";
+    }
+
+    if (!readColNames.contains(fieldName)) {
+      // If not already in the list - then add it
+      conf.set(ColumnProjectionUtils.READ_COLUMN_NAMES_CONF_STR, readColNamesPrefix + fieldName);
+      conf.set(ColumnProjectionUtils.READ_COLUMN_IDS_CONF_STR, readColIdsPrefix + fieldIndex);
+      if (LOG.isDebugEnabled()) {
+        LOG.debug(String.format("Adding extra column " + fieldName + ", to enable log merging cols (%s) ids (%s) ",
+            conf.get(ColumnProjectionUtils.READ_COLUMN_NAMES_CONF_STR),
+            conf.get(ColumnProjectionUtils.READ_COLUMN_IDS_CONF_STR)));
+      }
+    }
+    return conf;
+  }
+
+  public static void addRequiredProjectionFields(Configuration configuration) {
+    // Need this to do merge records in HoodieRealtimeRecordReader
+    addProjectionField(configuration, HoodieRecord.RECORD_KEY_METADATA_FIELD, HoodieInputFormatUtils.HOODIE_RECORD_KEY_COL_POS);
+    addProjectionField(configuration, HoodieRecord.COMMIT_TIME_METADATA_FIELD, HoodieInputFormatUtils.HOODIE_COMMIT_TIME_COL_POS);
+    addProjectionField(configuration, HoodieRecord.PARTITION_PATH_METADATA_FIELD, HoodieInputFormatUtils.HOODIE_PARTITION_PATH_COL_POS);
+  }
+
+  /**
+   * Hive will append read columns' ids to old columns' ids during getRecordReader. In some cases, e.g. SELECT COUNT(*),
+   * the read columns' id is an empty string and Hive will combine it with Hoodie required projection ids and becomes
+   * e.g. ",2,0,3" and will cause an error. Actually this method is a temporary solution because the real bug is from
+   * Hive. Hive has fixed this bug after 3.0.0, but the version before that would still face this problem. (HIVE-22438)
+   */
+  public static void cleanProjectionColumnIds(Configuration conf) {
+    String columnIds = conf.get(ColumnProjectionUtils.READ_COLUMN_IDS_CONF_STR);
+    if (!columnIds.isEmpty() && columnIds.charAt(0) == ',') {
+      conf.set(ColumnProjectionUtils.READ_COLUMN_IDS_CONF_STR, columnIds.substring(1));
+      if (LOG.isDebugEnabled()) {
+        LOG.debug("The projection Ids: {" + columnIds + "} start with ','. First comma is removed");
+      }
+    }
+  }
 }
diff --git a/hudi-hadoop-mr/src/test/java/org/apache/hudi/hadoop/TestHoodieHFileInputFormat.java b/hudi-hadoop-mr/src/test/java/org/apache/hudi/hadoop/TestHoodieHFileInputFormat.java
new file mode 100644
index 0000000..2c34027
--- /dev/null
+++ b/hudi-hadoop-mr/src/test/java/org/apache/hudi/hadoop/TestHoodieHFileInputFormat.java
@@ -0,0 +1,426 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hudi.hadoop;
+
+import org.apache.hudi.avro.model.HoodieCompactionPlan;
+import org.apache.hudi.common.fs.FSUtils;
+import org.apache.hudi.common.model.HoodieCommitMetadata;
+import org.apache.hudi.common.model.HoodieFileFormat;
+import org.apache.hudi.common.model.HoodieWriteStat;
+import org.apache.hudi.common.table.HoodieTableMetaClient;
+import org.apache.hudi.common.table.timeline.HoodieActiveTimeline;
+import org.apache.hudi.common.table.timeline.HoodieInstant;
+import org.apache.hudi.common.table.timeline.HoodieTimeline;
+import org.apache.hudi.common.table.timeline.TimelineMetadataUtils;
+import org.apache.hudi.common.testutils.HoodieTestUtils;
+import org.apache.hudi.hadoop.testutils.InputFormatTestUtil;
+import org.apache.hudi.hadoop.utils.HoodieHiveUtils;
+
+import org.apache.avro.Schema;
+import org.apache.hadoop.fs.FileStatus;
+import org.apache.hadoop.io.ArrayWritable;
+import org.apache.hadoop.io.NullWritable;
+import org.apache.hadoop.mapred.FileInputFormat;
+import org.apache.hadoop.mapred.InputSplit;
+import org.apache.hadoop.mapred.JobConf;
+import org.apache.hadoop.mapred.RecordReader;
+import org.apache.hadoop.mapreduce.Job;
+import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.api.Test;
+import org.junit.jupiter.api.io.TempDir;
+
+import java.io.File;
+import java.io.FileOutputStream;
+import java.io.IOException;
+import java.nio.charset.StandardCharsets;
+import java.util.ArrayList;
+import java.util.List;
+
+import static org.apache.hudi.common.testutils.SchemaTestUtil.getSchemaFromResource;
+import static org.junit.jupiter.api.Assertions.assertEquals;
+import static org.junit.jupiter.api.Assertions.assertFalse;
+import static org.junit.jupiter.api.Assertions.assertTrue;
+
+public class TestHoodieHFileInputFormat {
+
+  private HoodieHFileInputFormat inputFormat;
+  private JobConf jobConf;
+  private final HoodieFileFormat baseFileFormat = HoodieFileFormat.HFILE;
+  private final String baseFileExtension = baseFileFormat.getFileExtension();
+
+  public static void ensureFilesInCommit(String msg, FileStatus[] files, String commit, int expected) {
+    int count = 0;
+    for (FileStatus file : files) {
+      String commitTs = FSUtils.getCommitTime(file.getPath().getName());
+      if (commit.equals(commitTs)) {
+        count++;
+      }
+    }
+    assertEquals(expected, count, msg);
+  }
+
+  @BeforeEach
+  public void setUp() {
+    inputFormat = new HoodieHFileInputFormat();
+    jobConf = new JobConf();
+    inputFormat.setConf(jobConf);
+  }
+
+  @TempDir
+  public java.nio.file.Path basePath;
+
+  // Verify that HoodieParquetInputFormat does not return instants after pending compaction
+  @Test
+  public void testPendingCompactionWithActiveCommits() throws IOException {
+    // setup 4 sample instants in timeline
+    List<HoodieInstant> instants = new ArrayList<>();
+    HoodieInstant t1 = new HoodieInstant(HoodieInstant.State.COMPLETED, HoodieTimeline.COMMIT_ACTION, "1");
+    HoodieInstant t2 = new HoodieInstant(HoodieInstant.State.INFLIGHT, HoodieTimeline.DELTA_COMMIT_ACTION, "2");
+    HoodieInstant t3 = new HoodieInstant(HoodieInstant.State.REQUESTED, HoodieTimeline.COMPACTION_ACTION, "3");
+    HoodieInstant t4 = new HoodieInstant(HoodieInstant.State.COMPLETED, HoodieTimeline.DELTA_COMMIT_ACTION, "4");
+    HoodieInstant t5 = new HoodieInstant(HoodieInstant.State.REQUESTED, HoodieTimeline.COMPACTION_ACTION, "5");
+    HoodieInstant t6 = new HoodieInstant(HoodieInstant.State.COMPLETED, HoodieTimeline.DELTA_COMMIT_ACTION, "6");
+
+    instants.add(t1);
+    instants.add(t2);
+    instants.add(t3);
+    instants.add(t4);
+    instants.add(t5);
+    instants.add(t6);
+    HoodieTableMetaClient metaClient = HoodieTestUtils.init(basePath.toString(), HoodieFileFormat.HFILE);
+    HoodieActiveTimeline timeline = new HoodieActiveTimeline(metaClient);
+    timeline.setInstants(instants);
+
+    // Verify getCommitsTimelineBeforePendingCompaction does not return instants after first compaction instant
+    HoodieTimeline filteredTimeline = inputFormat.filterInstantsTimeline(timeline);
+    assertTrue(filteredTimeline.containsInstant(t1));
+    assertTrue(filteredTimeline.containsInstant(t2));
+    assertFalse(filteredTimeline.containsInstant(t3));
+    assertFalse(filteredTimeline.containsInstant(t4));
+    assertFalse(filteredTimeline.containsInstant(t5));
+    assertFalse(filteredTimeline.containsInstant(t6));
+
+
+    // remove compaction instant and setup timeline again
+    instants.remove(t3);
+    timeline = new HoodieActiveTimeline(metaClient);
+    timeline.setInstants(instants);
+    filteredTimeline = inputFormat.filterInstantsTimeline(timeline);
+
+    // verify all remaining instants are returned.
+    assertTrue(filteredTimeline.containsInstant(t1));
+    assertTrue(filteredTimeline.containsInstant(t2));
+    assertFalse(filteredTimeline.containsInstant(t3));
+    assertTrue(filteredTimeline.containsInstant(t4));
+    assertFalse(filteredTimeline.containsInstant(t5));
+    assertFalse(filteredTimeline.containsInstant(t6));
+
+    // remove remaining compaction instant and setup timeline again
+    instants.remove(t5);
+    timeline = new HoodieActiveTimeline(metaClient);
+    timeline.setInstants(instants);
+    filteredTimeline = inputFormat.filterInstantsTimeline(timeline);
+
+    // verify all remaining instants are returned.
+    assertTrue(filteredTimeline.containsInstant(t1));
+    assertTrue(filteredTimeline.containsInstant(t2));
+    assertFalse(filteredTimeline.containsInstant(t3));
+    assertTrue(filteredTimeline.containsInstant(t4));
+    assertFalse(filteredTimeline.containsInstant(t5));
+    assertTrue(filteredTimeline.containsInstant(t6));
+  }
+
+  @Test
+  public void testInputFormatLoad() throws IOException {
+    // initial commit
+    File partitionDir = InputFormatTestUtil.prepareTable(basePath, baseFileFormat, 10, "100");
+    InputFormatTestUtil.commit(basePath, "100");
+
+    // Add the paths
+    FileInputFormat.setInputPaths(jobConf, partitionDir.getPath());
+
+    FileStatus[] files = inputFormat.listStatus(jobConf);
+    assertEquals(10, files.length);
+    InputSplit[] inputSplits = inputFormat.getSplits(jobConf, 10);
+    assertEquals(10, inputSplits.length);
+  }
+
+  @Test
+  public void testInputFormatUpdates() throws IOException {
+    // initial commit
+    File partitionDir = InputFormatTestUtil.prepareTable(basePath, baseFileFormat, 10, "100");
+    InputFormatTestUtil.commit(basePath, "100");
+
+    // Add the paths
+    FileInputFormat.setInputPaths(jobConf, partitionDir.getPath());
+
+    FileStatus[] files = inputFormat.listStatus(jobConf);
+    assertEquals(10, files.length);
+
+    // update files
+    InputFormatTestUtil.simulateUpdates(partitionDir, baseFileExtension, "100", 5, "200", true);
+    // Before the commit
+    files = inputFormat.listStatus(jobConf);
+    assertEquals(10, files.length);
+    ensureFilesInCommit("Commit 200 has not been committed. We should not see files from this commit", files, "200", 0);
+    InputFormatTestUtil.commit(basePath, "200");
+    files = inputFormat.listStatus(jobConf);
+    assertEquals(10, files.length);
+    ensureFilesInCommit("5 files have been updated to commit 200. We should see 5 files from commit 200 and 5 "
+        + "files from 100 commit", files, "200", 5);
+    ensureFilesInCommit("5 files have been updated to commit 200. We should see 5 files from commit 100 and 5 "
+        + "files from 200 commit", files, "100", 5);
+  }
+
+  @Test
+  public void testInputFormatWithCompaction() throws IOException {
+    // initial commit
+    File partitionDir = InputFormatTestUtil.prepareTable(basePath, baseFileFormat, 10, "100");
+    InputFormatTestUtil.commit(basePath, "100");
+
+    // Add the paths
+    FileInputFormat.setInputPaths(jobConf, partitionDir.getPath());
+
+    InputSplit[] inputSplits = inputFormat.getSplits(jobConf, 10);
+    assertEquals(10, inputSplits.length);
+
+    FileStatus[] files = inputFormat.listStatus(jobConf);
+    assertEquals(10, files.length);
+
+    // simulate compaction requested
+    createCompactionFile(basePath, "125");
+
+    // add inserts after compaction timestamp
+    InputFormatTestUtil.simulateInserts(partitionDir, baseFileExtension, "fileId2", 5, "200");
+    InputFormatTestUtil.commit(basePath, "200");
+
+    // verify snapshot reads show all new inserts even though there is pending compaction
+    files = inputFormat.listStatus(jobConf);
+    assertEquals(15, files.length);
+
+    // verify that incremental reads do NOT show inserts after compaction timestamp
+    InputFormatTestUtil.setupIncremental(jobConf, "100", 10);
+    files = inputFormat.listStatus(jobConf);
+    assertEquals(0, files.length,
+        "We should exclude commit 200 when there is a pending compaction at 150");
+  }
+
+  @Test
+  public void testIncrementalSimple() throws IOException {
+    // initial commit
+    File partitionDir = InputFormatTestUtil.prepareTable(basePath, baseFileFormat, 10, "100");
+    createCommitFile(basePath, "100", "2016/05/01");
+
+    // Add the paths
+    FileInputFormat.setInputPaths(jobConf, partitionDir.getPath());
+
+    InputFormatTestUtil.setupIncremental(jobConf, "100", 1);
+
+    FileStatus[] files = inputFormat.listStatus(jobConf);
+    assertEquals(0, files.length,
+        "We should exclude commit 100 when returning incremental pull with start commit time as 100");
+  }
+
+  private void createCommitFile(java.nio.file.Path basePath, String commitNumber, String partitionPath)
+      throws IOException {
+    List<HoodieWriteStat> writeStats = HoodieTestUtils.generateFakeHoodieWriteStat(1);
+    HoodieCommitMetadata commitMetadata = new HoodieCommitMetadata();
+    writeStats.forEach(stat -> commitMetadata.addWriteStat(partitionPath, stat));
+    File file = basePath.resolve(".hoodie").resolve(commitNumber + ".commit").toFile();
+    file.createNewFile();
+    FileOutputStream fileOutputStream = new FileOutputStream(file);
+    fileOutputStream.write(commitMetadata.toJsonString().getBytes(StandardCharsets.UTF_8));
+    fileOutputStream.flush();
+    fileOutputStream.close();
+  }
+
+  private File createCompactionFile(java.nio.file.Path basePath, String commitTime)
+      throws IOException {
+    File file = basePath.resolve(".hoodie")
+        .resolve(HoodieTimeline.makeRequestedCompactionFileName(commitTime)).toFile();
+    assertTrue(file.createNewFile());
+    FileOutputStream os = new FileOutputStream(file);
+    try {
+      HoodieCompactionPlan compactionPlan = HoodieCompactionPlan.newBuilder().setVersion(2).build();
+      // Write empty commit metadata
+      os.write(TimelineMetadataUtils.serializeCompactionPlan(compactionPlan).get());
+      return file;
+    } finally {
+      os.close();
+    }
+  }
+
+  @Test
+  public void testIncrementalWithMultipleCommits() throws IOException {
+    // initial commit
+    File partitionDir = InputFormatTestUtil.prepareTable(basePath, baseFileFormat, 10, "100");
+    createCommitFile(basePath, "100", "2016/05/01");
+
+    // Add the paths
+    FileInputFormat.setInputPaths(jobConf, partitionDir.getPath());
+    // update files
+    InputFormatTestUtil.simulateUpdates(partitionDir, baseFileExtension, "100", 5, "200", false);
+    createCommitFile(basePath, "200", "2016/05/01");
+
+    InputFormatTestUtil.simulateUpdates(partitionDir, baseFileExtension, "100", 4, "300", false);
+    createCommitFile(basePath, "300", "2016/05/01");
+
+    InputFormatTestUtil.simulateUpdates(partitionDir, baseFileExtension, "100", 3, "400", false);
+    createCommitFile(basePath, "400", "2016/05/01");
+
+    InputFormatTestUtil.simulateUpdates(partitionDir, baseFileExtension, "100", 2, "500", false);
+    createCommitFile(basePath, "500", "2016/05/01");
+
+    InputFormatTestUtil.simulateUpdates(partitionDir, baseFileExtension, "100", 1, "600", false);
+    createCommitFile(basePath, "600", "2016/05/01");
+
+    InputFormatTestUtil.setupIncremental(jobConf, "100", 1);
+    FileStatus[] files = inputFormat.listStatus(jobConf);
+    assertEquals(5, files.length, "Pulling 1 commit from 100, should get us the 5 files committed at 200");
+    ensureFilesInCommit("Pulling 1 commit from 100, should get us the 5 files committed at 200", files, "200", 5);
+
+    InputFormatTestUtil.setupIncremental(jobConf, "100", 3);
+    files = inputFormat.listStatus(jobConf);
+
+    assertEquals(5, files.length, "Pulling 3 commits from 100, should get us the 3 files from 400 commit, 1 file from 300 "
+        + "commit and 1 file from 200 commit");
+    ensureFilesInCommit("Pulling 3 commits from 100, should get us the 3 files from 400 commit", files, "400", 3);
+    ensureFilesInCommit("Pulling 3 commits from 100, should get us the 1 files from 300 commit", files, "300", 1);
+    ensureFilesInCommit("Pulling 3 commits from 100, should get us the 1 files from 200 commit", files, "200", 1);
+
+    InputFormatTestUtil.setupIncremental(jobConf, "100", HoodieHiveUtils.MAX_COMMIT_ALL);
+    files = inputFormat.listStatus(jobConf);
+
+    assertEquals(5, files.length,
+        "Pulling all commits from 100, should get us the 1 file from each of 200,300,400,500,400 commits");
+    ensureFilesInCommit("Pulling all commits from 100, should get us the 1 files from 600 commit", files, "600", 1);
+    ensureFilesInCommit("Pulling all commits from 100, should get us the 1 files from 500 commit", files, "500", 1);
+    ensureFilesInCommit("Pulling all commits from 100, should get us the 1 files from 400 commit", files, "400", 1);
+    ensureFilesInCommit("Pulling all commits from 100, should get us the 1 files from 300 commit", files, "300", 1);
+    ensureFilesInCommit("Pulling all commits from 100, should get us the 1 files from 200 commit", files, "200", 1);
+  }
+
+  // TODO enable this after enabling predicate pushdown
+  public void testPredicatePushDown() throws IOException {
+    // initial commit
+    Schema schema = getSchemaFromResource(TestHoodieHFileInputFormat.class, "/sample1.avsc");
+    String commit1 = "20160628071126";
+    File partitionDir = InputFormatTestUtil.prepareParquetTable(basePath, schema, 1, 10, commit1);
+    InputFormatTestUtil.commit(basePath, commit1);
+    // Add the paths
+    FileInputFormat.setInputPaths(jobConf, partitionDir.getPath());
+    // check whether we have 10 records at this point
+    ensureRecordsInCommit("We need to have 10 records at this point for commit " + commit1, commit1, 10, 10);
+
+    // update 2 records in the original parquet file and save it as commit 200
+    String commit2 = "20160629193623";
+    InputFormatTestUtil.simulateParquetUpdates(partitionDir, schema, commit1, 10, 2, commit2);
+    InputFormatTestUtil.commit(basePath, commit2);
+
+    InputFormatTestUtil.setupIncremental(jobConf, commit1, 1);
+    // check whether we have 2 records at this point
+    ensureRecordsInCommit("We need to have 2 records that was modified at commit " + commit2 + " and no more", commit2,
+        2, 2);
+    // Make sure we have the 10 records if we roll back the stattime
+    InputFormatTestUtil.setupIncremental(jobConf, "0", 2);
+    ensureRecordsInCommit("We need to have 8 records that was modified at commit " + commit1 + " and no more", commit1,
+        8, 10);
+    ensureRecordsInCommit("We need to have 2 records that was modified at commit " + commit2 + " and no more", commit2,
+        2, 10);
+  }
+
+  @Test
+  public void testGetIncrementalTableNames() throws IOException {
+    String[] expectedincrTables = {"db1.raw_trips", "db2.model_trips", "db3.model_trips"};
+    JobConf conf = new JobConf();
+    String incrementalMode1 = String.format(HoodieHiveUtils.HOODIE_CONSUME_MODE_PATTERN, expectedincrTables[0]);
+    conf.set(incrementalMode1, HoodieHiveUtils.INCREMENTAL_SCAN_MODE);
+    String incrementalMode2 = String.format(HoodieHiveUtils.HOODIE_CONSUME_MODE_PATTERN, expectedincrTables[1]);
+    conf.set(incrementalMode2,HoodieHiveUtils.INCREMENTAL_SCAN_MODE);
+    String incrementalMode3 = String.format(HoodieHiveUtils.HOODIE_CONSUME_MODE_PATTERN, "db3.model_trips");
+    conf.set(incrementalMode3, HoodieHiveUtils.INCREMENTAL_SCAN_MODE.toLowerCase());
+    String defaultmode = String.format(HoodieHiveUtils.HOODIE_CONSUME_MODE_PATTERN, "db3.first_trips");
+    conf.set(defaultmode, HoodieHiveUtils.DEFAULT_SCAN_MODE);
+    List<String> actualincrTables = HoodieHiveUtils.getIncrementalTableNames(Job.getInstance(conf));
+    for (String expectedincrTable : expectedincrTables) {
+      assertTrue(actualincrTables.contains(expectedincrTable));
+    }
+  }
+
+  // test incremental read does not go past compaction instant for RO views
+  @Test
+  public void testIncrementalWithPendingCompaction() throws IOException {
+    // initial commit
+    File partitionDir = InputFormatTestUtil.prepareTable(basePath, baseFileFormat, 10, "100");
+    createCommitFile(basePath, "100", "2016/05/01");
+
+    // simulate compaction requested at 300
+    File compactionFile = createCompactionFile(basePath, "300");
+
+    // write inserts into new bucket
+    InputFormatTestUtil.simulateInserts(partitionDir, baseFileExtension, "fileId2", 10, "400");
+    createCommitFile(basePath, "400", "2016/05/01");
+
+    // Add the paths
+    FileInputFormat.setInputPaths(jobConf, partitionDir.getPath());
+    InputFormatTestUtil.setupIncremental(jobConf, "0", -1);
+    FileStatus[] files = inputFormat.listStatus(jobConf);
+    assertEquals(10, files.length,
+        "Pulling all commit from beginning, should not return instants after begin compaction");
+    ensureFilesInCommit("Pulling all commit from beginning, should not return instants after begin compaction",
+        files, "100", 10);
+
+    // delete compaction and verify inserts show up
+    compactionFile.delete();
+    InputFormatTestUtil.setupIncremental(jobConf, "0", -1);
+    files = inputFormat.listStatus(jobConf);
+    assertEquals(20, files.length,
+        "after deleting compaction, should get all inserted files");
+
+    ensureFilesInCommit("Pulling all commit from beginning, should return instants before requested compaction",
+        files, "100", 10);
+    ensureFilesInCommit("Pulling all commit from beginning, should return instants after requested compaction",
+        files, "400", 10);
+
+  }
+
+  private void ensureRecordsInCommit(String msg, String commit, int expectedNumberOfRecordsInCommit,
+      int totalExpected) throws IOException {
+    int actualCount = 0;
+    int totalCount = 0;
+    InputSplit[] splits = inputFormat.getSplits(jobConf, 1);
+    for (InputSplit split : splits) {
+      RecordReader<NullWritable, ArrayWritable> recordReader = inputFormat.getRecordReader(split, jobConf, null);
+      NullWritable key = recordReader.createKey();
+      ArrayWritable writable = recordReader.createValue();
+
+      while (recordReader.next(key, writable)) {
+        // writable returns an array with [field1, field2, _hoodie_commit_time,
+        // _hoodie_commit_seqno]
+        // Take the commit time and compare with the one we are interested in
+        if (commit.equals((writable.get()[2]).toString())) {
+          actualCount++;
+        }
+        totalCount++;
+      }
+    }
+    assertEquals(expectedNumberOfRecordsInCommit, actualCount, msg);
+    assertEquals(totalExpected, totalCount, msg);
+  }
+}
diff --git a/hudi-integ-test/src/test/java/org/apache/hudi/integ/ITTestBase.java b/hudi-integ-test/src/test/java/org/apache/hudi/integ/ITTestBase.java
index eda776e..8cb64f8 100644
--- a/hudi-integ-test/src/test/java/org/apache/hudi/integ/ITTestBase.java
+++ b/hudi-integ-test/src/test/java/org/apache/hudi/integ/ITTestBase.java
@@ -255,7 +255,7 @@ public abstract class ITTestBase {
     try {
       // save up the Hive log files for introspection
       String hiveLogStr =
-          executeCommandStringInDocker(HIVESERVER, "cat /tmp/root/hive.log |  grep -i exception -A 10 -B 5", true).getStdout().toString();
+          executeCommandStringInDocker(HIVESERVER, "cat /tmp/root/hive.log |  grep -i exception -A 10 -B 5", false).getStdout().toString();
       String filePath = System.getProperty("java.io.tmpdir") + "/" + System.currentTimeMillis() + "-hive.log";
       FileIOUtils.writeStringToFile(hiveLogStr, filePath);
       LOG.info("Hive log saved up at  : " + filePath);
diff --git a/hudi-integ-test/src/test/java/org/apache/hudi/integ/ITTestHoodieDemo.java b/hudi-integ-test/src/test/java/org/apache/hudi/integ/ITTestHoodieDemo.java
index eb608df..444a751 100644
--- a/hudi-integ-test/src/test/java/org/apache/hudi/integ/ITTestHoodieDemo.java
+++ b/hudi-integ-test/src/test/java/org/apache/hudi/integ/ITTestHoodieDemo.java
@@ -23,6 +23,8 @@ import org.apache.hudi.common.util.CollectionUtils;
 import org.apache.hudi.common.util.collection.Pair;
 
 import org.apache.hudi.keygen.SimpleKeyGenerator;
+
+import org.junit.jupiter.api.Disabled;
 import org.junit.jupiter.api.Test;
 
 import java.util.Collections;
@@ -77,7 +79,7 @@ public class ITTestHoodieDemo extends ITTestBase {
   private static final String HIVE_INCREMENTAL_MOR_RO_COMMANDS = HOODIE_WS_ROOT + "/docker/demo/hive-incremental-mor-ro.commands";
   private static final String HIVE_INCREMENTAL_MOR_RT_COMMANDS = HOODIE_WS_ROOT + "/docker/demo/hive-incremental-mor-rt.commands";
 
-  private static HoodieFileFormat baseFileFormat;
+  private HoodieFileFormat baseFileFormat;
 
   private static String HIVE_SYNC_CMD_FMT =
       " --enable-hive-sync --hoodie-conf hoodie.datasource.hive_sync.jdbcurl=jdbc:hive2://hiveserver:10000 "
@@ -115,6 +117,36 @@ public class ITTestHoodieDemo extends ITTestBase {
     testIncrementalHiveQueryAfterCompaction();
   }
 
+  @Test
+  @Disabled
+  public void testHFileDemo() throws Exception {
+    baseFileFormat = HoodieFileFormat.HFILE;
+
+    // TODO: Preseto and SparkSQL support for HFile format
+
+    setupDemo();
+
+    // batch 1
+    ingestFirstBatchAndHiveSync();
+    testHiveAfterFirstBatch();
+    //testPrestoAfterFirstBatch();
+    //testSparkSQLAfterFirstBatch();
+
+    // batch 2
+    ingestSecondBatchAndHiveSync();
+    testHiveAfterSecondBatch();
+    //testPrestoAfterSecondBatch();
+    //testSparkSQLAfterSecondBatch();
+    testIncrementalHiveQueryBeforeCompaction();
+    //testIncrementalSparkSQLQuery();
+
+    // compaction
+    scheduleAndRunCompaction();
+    testHiveAfterSecondBatchAfterCompaction();
+    //testPrestoAfterSecondBatchAfterCompaction();
+    //testIncrementalHiveQueryAfterCompaction();
+  }
+
   private void setupDemo() throws Exception {
     List<String> cmds = CollectionUtils.createImmutableList("hdfs dfsadmin -safemode wait",
         "hdfs dfs -mkdir -p " + HDFS_DATA_DIR,
diff --git a/packaging/hudi-spark-bundle/pom.xml b/packaging/hudi-spark-bundle/pom.xml
index eaab646..d92d09e 100644
--- a/packaging/hudi-spark-bundle/pom.xml
+++ b/packaging/hudi-spark-bundle/pom.xml
@@ -102,6 +102,7 @@
                   <include>org.apache.hive:hive-service-rpc</include>
                   <include>org.apache.hive:hive-metastore</include>
                   <include>org.apache.hive:hive-jdbc</include>
+
                   <include>org.apache.hbase:hbase-client</include>
                   <include>org.apache.hbase:hbase-common</include>
                   <include>org.apache.hbase:hbase-protocol</include>
@@ -289,6 +290,12 @@
       <scope>compile</scope>
     </dependency>
 
+    <!-- Hbase -->
+    <dependency>
+      <groupId>org.apache.hbase</groupId>
+      <artifactId>hbase-common</artifactId>
+      <version>${hbase.version}</version>
+    </dependency>
     <dependency>
       <groupId>org.apache.hbase</groupId>
       <artifactId>hbase-server</artifactId>
@@ -313,6 +320,16 @@
         </exclusion>
       </exclusions>
     </dependency>
+    <dependency>
+      <groupId>org.apache.hbase</groupId>
+      <artifactId>hbase-client</artifactId>
+      <version>${hbase.version}</version>
+    </dependency>
+    <dependency>
+      <groupId>org.apache.hbase</groupId>
+      <artifactId>hbase-protocol</artifactId>
+      <version>${hbase.version}</version>
+    </dependency>
 
     <!-- TODO: Reinvestigate PR 633 -->
   </dependencies>
diff --git a/packaging/hudi-utilities-bundle/pom.xml b/packaging/hudi-utilities-bundle/pom.xml
index eef203c..43e519a 100644
--- a/packaging/hudi-utilities-bundle/pom.xml
+++ b/packaging/hudi-utilities-bundle/pom.xml
@@ -113,6 +113,7 @@
                   <include>org.apache.hive:hive-service-rpc</include>
                   <include>org.apache.hive:hive-metastore</include>
                   <include>org.apache.hive:hive-jdbc</include>
+
                   <include>org.apache.hbase:hbase-client</include>
                   <include>org.apache.hbase:hbase-common</include>
                   <include>org.apache.hbase:hbase-protocol</include>
@@ -285,6 +286,12 @@
       <scope>compile</scope>
     </dependency>
 
+    <!-- Hbase -->
+    <dependency>
+      <groupId>org.apache.hbase</groupId>
+      <artifactId>hbase-common</artifactId>
+      <version>${hbase.version}</version>
+    </dependency>
     <dependency>
       <groupId>org.apache.hbase</groupId>
       <artifactId>hbase-server</artifactId>
@@ -309,7 +316,16 @@
         </exclusion>
       </exclusions>
     </dependency>
-
+    <dependency>
+      <groupId>org.apache.hbase</groupId>
+      <artifactId>hbase-client</artifactId>
+      <version>${hbase.version}</version>
+    </dependency>
+    <dependency>
+      <groupId>org.apache.hbase</groupId>
+      <artifactId>hbase-protocol</artifactId>
+      <version>${hbase.version}</version>
+    </dependency>
   </dependencies>
 
   <profiles>