You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hudi.apache.org by vb...@apache.org on 2020/06/26 06:47:06 UTC

[hudi] branch master updated: [HUDI-684] Introduced abstraction for writing and reading different types of base file formats. (#1687)

This is an automated email from the ASF dual-hosted git repository.

vbalaji pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/hudi.git


The following commit(s) were added to refs/heads/master by this push:
     new 2603cfb  [HUDI-684] Introduced abstraction for writing and reading different types of base file formats. (#1687)
2603cfb is described below

commit 2603cfb33e272632d7f36a53e1b13fe86dbb8627
Author: Prashant Wason <pw...@uber.com>
AuthorDate: Thu Jun 25 23:46:55 2020 -0700

    [HUDI-684] Introduced abstraction for writing and reading different types of base file formats. (#1687)
    
    Notable changes:
        1. HoodieFileWriter and HoodieFileReader abstractions for writer/reader side of a base file format
        2. HoodieDataBlock abstraction for creation specific data blocks for base file formats. (e.g. Parquet has HoodieAvroDataBlock)
        3. All hardocded references to Parquet / Parquet based classes have been abstracted to call methods which accept a base file format
        4. HiveSyncTool accepts the base file format as a CLI parameter
        5. HoodieDeltaStreamer accepts the base file format as a CLI parameter
        6. HoodieSparkSqlWriter accepts the base file format as a parameter
---
 .../hudi/cli/commands/HoodieLogFileCommand.java    |  10 +-
 .../org/apache/hudi/io/HoodieAppendHandle.java     |   4 +-
 .../org/apache/hudi/io/HoodieCreateHandle.java     |  15 +-
 .../org/apache/hudi/io/HoodieKeyLookupHandle.java  |  17 +-
 .../java/org/apache/hudi/io/HoodieMergeHandle.java |  19 +-
 .../org/apache/hudi/io/HoodieRangeInfoHandle.java  |   9 +-
 .../java/org/apache/hudi/io/HoodieReadHandle.java  |  10 +
 .../java/org/apache/hudi/io/HoodieWriteHandle.java |  10 +-
 ...dieStorageWriter.java => HoodieFileWriter.java} |   2 +-
 ...erFactory.java => HoodieFileWriterFactory.java} |  12 +-
 .../hudi/io/storage/HoodieParquetWriter.java       |  17 +-
 .../apache/hudi/table/HoodieCopyOnWriteTable.java  |  16 +-
 .../java/org/apache/hudi/table/HoodieTable.java    |  36 ++-
 .../table/action/commit/CommitActionExecutor.java  |  15 +-
 .../deltacommit/DeltaCommitActionExecutor.java     |   2 +-
 .../hudi/table/action/rollback/RollbackHelper.java |   6 +-
 .../hudi/index/bloom/TestHoodieBloomIndex.java     |   6 +-
 ...ctory.java => TestHoodieFileWriterFactory.java} |  10 +-
 .../hudi/table/TestHoodieMergeOnReadTable.java     | 286 ++++++++++++---------
 .../hudi/testutils/HoodieClientTestUtils.java      |  14 +-
 .../hudi/testutils/HoodieMergeOnReadTestUtils.java |  35 +--
 .../java/org/apache/hudi/common/fs/FSUtils.java    |  26 +-
 .../hudi/common/table/HoodieTableMetaClient.java   |  33 ++-
 .../hudi/common/table/TableSchemaResolver.java     |  23 +-
 .../table/log/AbstractHoodieLogRecordScanner.java  |   5 +-
 .../hudi/common/table/log/HoodieLogFileReader.java |   2 +-
 .../hudi/common/table/log/LogReaderUtils.java      |   6 +-
 .../table/log/block/HoodieAvroDataBlock.java       |  98 ++-----
 .../common/table/log/block/HoodieDataBlock.java    | 132 ++++++++++
 .../common/table/log/block/HoodieLogBlock.java     |   4 +-
 .../table/view/AbstractTableFileSystemView.java    |   3 +-
 .../hudi/common/util}/ParquetReaderIterator.java   |   2 +-
 .../apache/hudi/io/storage/HoodieFileReader.java   |  22 +-
 .../hudi/io/storage/HoodieFileReaderFactory.java   |  47 ++++
 .../hudi/io/storage/HoodieParquetReader.java       |  80 ++++++
 .../common/functional/TestHoodieAvroLogFormat.java |  24 +-
 .../common/functional/TestHoodieLogFormat.java     | 124 +++++----
 .../hudi/common/testutils/HoodieTestUtils.java     |  50 +++-
 .../common/util}/TestParquetReaderIterator.java    |   2 +-
 .../io/storage/TestHoodieFileReaderFactory.java    |  54 ++++
 .../hudi/hadoop/HoodieParquetInputFormat.java      |   4 +-
 .../hadoop/hive/HoodieCombineHiveInputFormat.java  |   8 +-
 .../realtime/AbstractRealtimeRecordReader.java     |   6 +-
 .../hudi/hadoop/utils/HoodieInputFormatUtils.java  |  63 ++++-
 .../utils/HoodieRealtimeRecordReaderUtils.java     |  15 +-
 .../hudi/hadoop/TestHoodieParquetInputFormat.java  |  31 ++-
 .../hudi/hadoop/testutils/InputFormatTestUtil.java |  23 +-
 .../java/org/apache/hudi/hive/HiveSyncConfig.java  |   3 +
 .../java/org/apache/hudi/hive/HiveSyncTool.java    |  37 +--
 .../org/apache/hudi/integ/ITTestHoodieDemo.java    |  10 +-
 .../main/java/org/apache/hudi/DataSourceUtils.java |   3 +-
 .../scala/org/apache/hudi/DataSourceOptions.scala  |   2 +
 .../org/apache/hudi/HoodieSparkSqlWriter.scala     |   2 +
 .../hudi/utilities/deltastreamer/DeltaSync.java    |   6 +-
 .../deltastreamer/HoodieDeltaStreamer.java         |  15 ++
 55 files changed, 1032 insertions(+), 484 deletions(-)

diff --git a/hudi-cli/src/main/java/org/apache/hudi/cli/commands/HoodieLogFileCommand.java b/hudi-cli/src/main/java/org/apache/hudi/cli/commands/HoodieLogFileCommand.java
index f7c4d65..a5fe4fe 100644
--- a/hudi-cli/src/main/java/org/apache/hudi/cli/commands/HoodieLogFileCommand.java
+++ b/hudi-cli/src/main/java/org/apache/hudi/cli/commands/HoodieLogFileCommand.java
@@ -30,8 +30,8 @@ import org.apache.hudi.common.table.TableSchemaResolver;
 import org.apache.hudi.common.table.log.HoodieLogFormat;
 import org.apache.hudi.common.table.log.HoodieLogFormat.Reader;
 import org.apache.hudi.common.table.log.HoodieMergedLogRecordScanner;
-import org.apache.hudi.common.table.log.block.HoodieAvroDataBlock;
 import org.apache.hudi.common.table.log.block.HoodieCorruptBlock;
+import org.apache.hudi.common.table.log.block.HoodieDataBlock;
 import org.apache.hudi.common.table.log.block.HoodieLogBlock;
 import org.apache.hudi.common.table.log.block.HoodieLogBlock.HeaderMetadataType;
 import org.apache.hudi.common.table.log.block.HoodieLogBlock.HoodieLogBlockType;
@@ -118,8 +118,8 @@ public class HoodieLogFileCommand implements CommandMarker {
             dummyInstantTimeCount++;
             instantTime = "dummy_instant_time_" + dummyInstantTimeCount;
           }
-          if (n instanceof HoodieAvroDataBlock) {
-            recordCount = ((HoodieAvroDataBlock) n).getRecords().size();
+          if (n instanceof HoodieDataBlock) {
+            recordCount = ((HoodieDataBlock) n).getRecords().size();
           }
         }
         if (commitCountAndMetadata.containsKey(instantTime)) {
@@ -215,8 +215,8 @@ public class HoodieLogFileCommand implements CommandMarker {
         // read the avro blocks
         while (reader.hasNext()) {
           HoodieLogBlock n = reader.next();
-          if (n instanceof HoodieAvroDataBlock) {
-            HoodieAvroDataBlock blk = (HoodieAvroDataBlock) n;
+          if (n instanceof HoodieDataBlock) {
+            HoodieDataBlock blk = (HoodieDataBlock) n;
             List<IndexedRecord> records = blk.getRecords();
             for (IndexedRecord record : records) {
               if (allRecords.size() < limit) {
diff --git a/hudi-client/src/main/java/org/apache/hudi/io/HoodieAppendHandle.java b/hudi-client/src/main/java/org/apache/hudi/io/HoodieAppendHandle.java
index cc91e89..12e3a07 100644
--- a/hudi-client/src/main/java/org/apache/hudi/io/HoodieAppendHandle.java
+++ b/hudi-client/src/main/java/org/apache/hudi/io/HoodieAppendHandle.java
@@ -34,7 +34,7 @@ import org.apache.hudi.common.model.HoodieWriteStat;
 import org.apache.hudi.common.model.HoodieWriteStat.RuntimeStats;
 import org.apache.hudi.common.table.log.HoodieLogFormat;
 import org.apache.hudi.common.table.log.HoodieLogFormat.Writer;
-import org.apache.hudi.common.table.log.block.HoodieAvroDataBlock;
+import org.apache.hudi.common.table.log.block.HoodieDataBlock;
 import org.apache.hudi.common.table.log.block.HoodieDeleteBlock;
 import org.apache.hudi.common.table.log.block.HoodieLogBlock;
 import org.apache.hudi.common.table.log.block.HoodieLogBlock.HeaderMetadataType;
@@ -207,7 +207,7 @@ public class HoodieAppendHandle<T extends HoodieRecordPayload> extends HoodieWri
       header.put(HoodieLogBlock.HeaderMetadataType.INSTANT_TIME, instantTime);
       header.put(HoodieLogBlock.HeaderMetadataType.SCHEMA, writerSchema.toString());
       if (recordList.size() > 0) {
-        writer = writer.appendBlock(new HoodieAvroDataBlock(recordList, header));
+        writer = writer.appendBlock(HoodieDataBlock.getBlock(hoodieTable.getLogDataBlockFormat(), recordList, header));
         recordList.clear();
       }
       if (keysToDelete.size() > 0) {
diff --git a/hudi-client/src/main/java/org/apache/hudi/io/HoodieCreateHandle.java b/hudi-client/src/main/java/org/apache/hudi/io/HoodieCreateHandle.java
index 5d64c84..c437f54 100644
--- a/hudi-client/src/main/java/org/apache/hudi/io/HoodieCreateHandle.java
+++ b/hudi-client/src/main/java/org/apache/hudi/io/HoodieCreateHandle.java
@@ -30,8 +30,7 @@ import org.apache.hudi.common.model.HoodieWriteStat.RuntimeStats;
 import org.apache.hudi.common.util.Option;
 import org.apache.hudi.config.HoodieWriteConfig;
 import org.apache.hudi.exception.HoodieInsertException;
-import org.apache.hudi.io.storage.HoodieStorageWriter;
-import org.apache.hudi.io.storage.HoodieStorageWriterFactory;
+import org.apache.hudi.io.storage.HoodieFileWriter;
 import org.apache.hudi.table.HoodieTable;
 
 import org.apache.avro.generic.GenericRecord;
@@ -47,7 +46,7 @@ public class HoodieCreateHandle<T extends HoodieRecordPayload> extends HoodieWri
 
   private static final Logger LOG = LogManager.getLogger(HoodieCreateHandle.class);
 
-  private final HoodieStorageWriter<IndexedRecord> storageWriter;
+  private final HoodieFileWriter<IndexedRecord> fileWriter;
   private final Path path;
   private long recordsWritten = 0;
   private long insertRecordsWritten = 0;
@@ -68,8 +67,8 @@ public class HoodieCreateHandle<T extends HoodieRecordPayload> extends HoodieWri
           new Path(config.getBasePath()), FSUtils.getPartitionPath(config.getBasePath(), partitionPath));
       partitionMetadata.trySave(getPartitionId());
       createMarkerFile(partitionPath);
-      this.storageWriter =
-          HoodieStorageWriterFactory.getStorageWriter(instantTime, path, hoodieTable, config, writerSchema, this.sparkTaskContextSupplier);
+      this.fileWriter = createNewFileWriter(instantTime, path, hoodieTable, config, writerSchema,
+          this.sparkTaskContextSupplier);
     } catch (IOException e) {
       throw new HoodieInsertException("Failed to initialize HoodieStorageWriter for path " + path, e);
     }
@@ -88,7 +87,7 @@ public class HoodieCreateHandle<T extends HoodieRecordPayload> extends HoodieWri
 
   @Override
   public boolean canWrite(HoodieRecord record) {
-    return storageWriter.canWrite() && record.getPartitionPath().equals(writeStatus.getPartitionPath());
+    return fileWriter.canWrite() && record.getPartitionPath().equals(writeStatus.getPartitionPath());
   }
 
   /**
@@ -101,7 +100,7 @@ public class HoodieCreateHandle<T extends HoodieRecordPayload> extends HoodieWri
       if (avroRecord.isPresent()) {
         // Convert GenericRecord to GenericRecord with hoodie commit metadata in schema
         IndexedRecord recordWithMetadataInSchema = rewriteRecord((GenericRecord) avroRecord.get());
-        storageWriter.writeAvroWithMetadata(recordWithMetadataInSchema, record);
+        fileWriter.writeAvroWithMetadata(recordWithMetadataInSchema, record);
         // update the new location of record, so we know where to find it next
         record.unseal();
         record.setNewLocation(new HoodieRecordLocation(instantTime, writeStatus.getFileId()));
@@ -156,7 +155,7 @@ public class HoodieCreateHandle<T extends HoodieRecordPayload> extends HoodieWri
         .info("Closing the file " + writeStatus.getFileId() + " as we are done with all the records " + recordsWritten);
     try {
 
-      storageWriter.close();
+      fileWriter.close();
 
       HoodieWriteStat stat = new HoodieWriteStat();
       stat.setPartitionPath(writeStatus.getPartitionPath());
diff --git a/hudi-client/src/main/java/org/apache/hudi/io/HoodieKeyLookupHandle.java b/hudi-client/src/main/java/org/apache/hudi/io/HoodieKeyLookupHandle.java
index 02f351e..4002b5a 100644
--- a/hudi-client/src/main/java/org/apache/hudi/io/HoodieKeyLookupHandle.java
+++ b/hudi-client/src/main/java/org/apache/hudi/io/HoodieKeyLookupHandle.java
@@ -23,7 +23,6 @@ import org.apache.hudi.common.model.HoodieBaseFile;
 import org.apache.hudi.common.model.HoodieRecordPayload;
 import org.apache.hudi.common.model.HoodieTableType;
 import org.apache.hudi.common.util.HoodieTimer;
-import org.apache.hudi.common.util.ParquetUtils;
 import org.apache.hudi.common.util.collection.Pair;
 import org.apache.hudi.config.HoodieWriteConfig;
 import org.apache.hudi.exception.HoodieIndexException;
@@ -34,6 +33,7 @@ import org.apache.hadoop.fs.Path;
 import org.apache.log4j.LogManager;
 import org.apache.log4j.Logger;
 
+import java.io.IOException;
 import java.util.ArrayList;
 import java.util.HashSet;
 import java.util.List;
@@ -61,23 +61,26 @@ public class HoodieKeyLookupHandle<T extends HoodieRecordPayload> extends Hoodie
     this.candidateRecordKeys = new ArrayList<>();
     this.totalKeysChecked = 0;
     HoodieTimer timer = new HoodieTimer().startTimer();
-    this.bloomFilter = ParquetUtils.readBloomFilterFromParquetMetadata(hoodieTable.getHadoopConf(),
-        new Path(getLatestDataFile().getPath()));
+
+    try {
+      this.bloomFilter = createNewFileReader().readBloomFilter();
+    } catch (IOException e) {
+      throw new HoodieIndexException(String.format("Error reading bloom filter from %s: %s", partitionPathFilePair, e));
+    }
     LOG.info(String.format("Read bloom filter from %s in %d ms", partitionPathFilePair, timer.endTimer()));
   }
 
   /**
    * Given a list of row keys and one file, return only row keys existing in that file.
    */
-  public static List<String> checkCandidatesAgainstFile(Configuration configuration, List<String> candidateRecordKeys,
-                                                        Path filePath) throws HoodieIndexException {
+  public List<String> checkCandidatesAgainstFile(Configuration configuration, List<String> candidateRecordKeys,
+                                                 Path filePath) throws HoodieIndexException {
     List<String> foundRecordKeys = new ArrayList<>();
     try {
       // Load all rowKeys from the file, to double-confirm
       if (!candidateRecordKeys.isEmpty()) {
         HoodieTimer timer = new HoodieTimer().startTimer();
-        Set<String> fileRowKeys =
-            ParquetUtils.filterParquetRowKeys(configuration, filePath, new HashSet<>(candidateRecordKeys));
+        Set<String> fileRowKeys = createNewFileReader().filterRowKeys(new HashSet<>(candidateRecordKeys));
         foundRecordKeys.addAll(fileRowKeys);
         LOG.info(String.format("Checked keys against file %s, in %d ms. #candidates (%d) #found (%d)", filePath,
             timer.endTimer(), candidateRecordKeys.size(), foundRecordKeys.size()));
diff --git a/hudi-client/src/main/java/org/apache/hudi/io/HoodieMergeHandle.java b/hudi-client/src/main/java/org/apache/hudi/io/HoodieMergeHandle.java
index 3e6af34..0eaf3f2 100644
--- a/hudi-client/src/main/java/org/apache/hudi/io/HoodieMergeHandle.java
+++ b/hudi-client/src/main/java/org/apache/hudi/io/HoodieMergeHandle.java
@@ -37,8 +37,7 @@ import org.apache.hudi.common.util.collection.ExternalSpillableMap;
 import org.apache.hudi.config.HoodieWriteConfig;
 import org.apache.hudi.exception.HoodieIOException;
 import org.apache.hudi.exception.HoodieUpsertException;
-import org.apache.hudi.io.storage.HoodieStorageWriter;
-import org.apache.hudi.io.storage.HoodieStorageWriterFactory;
+import org.apache.hudi.io.storage.HoodieFileWriter;
 import org.apache.hudi.table.HoodieTable;
 
 import org.apache.avro.Schema;
@@ -61,7 +60,7 @@ public class HoodieMergeHandle<T extends HoodieRecordPayload> extends HoodieWrit
 
   private Map<String, HoodieRecord<T>> keyToNewRecords;
   private Set<String> writtenRecordKeys;
-  private HoodieStorageWriter<IndexedRecord> storageWriter;
+  private HoodieFileWriter<IndexedRecord> fileWriter;
   private Path newFilePath;
   private Path oldFilePath;
   private long recordsWritten = 0;
@@ -115,7 +114,7 @@ public class HoodieMergeHandle<T extends HoodieRecordPayload> extends HoodieWrit
 
       oldFilePath = new Path(config.getBasePath() + "/" + partitionPath + "/" + latestValidFilePath);
       String relativePath = new Path((partitionPath.isEmpty() ? "" : partitionPath + "/")
-          + FSUtils.makeDataFileName(instantTime, writeToken, fileId)).toString();
+          + FSUtils.makeDataFileName(instantTime, writeToken, fileId, hoodieTable.getBaseFileExtension())).toString();
       newFilePath = new Path(config.getBasePath(), relativePath);
 
       LOG.info(String.format("Merging new data into oldPath %s, as newPath %s", oldFilePath.toString(),
@@ -131,8 +130,8 @@ public class HoodieMergeHandle<T extends HoodieRecordPayload> extends HoodieWrit
       createMarkerFile(partitionPath);
 
       // Create the writer for writing the new version file
-      storageWriter =
-          HoodieStorageWriterFactory.getStorageWriter(instantTime, newFilePath, hoodieTable, config, writerSchema, sparkTaskContextSupplier);
+      fileWriter = createNewFileWriter(instantTime, newFilePath, hoodieTable, config, writerSchema, sparkTaskContextSupplier);
+
     } catch (IOException io) {
       LOG.error("Error in update task at commit " + instantTime, io);
       writeStatus.setGlobalError(io);
@@ -190,7 +189,7 @@ public class HoodieMergeHandle<T extends HoodieRecordPayload> extends HoodieWrit
       if (indexedRecord.isPresent()) {
         // Convert GenericRecord to GenericRecord with hoodie commit metadata in schema
         IndexedRecord recordWithMetadataInSchema = rewriteRecord((GenericRecord) indexedRecord.get());
-        storageWriter.writeAvroWithMetadata(recordWithMetadataInSchema, hoodieRecord);
+        fileWriter.writeAvroWithMetadata(recordWithMetadataInSchema, hoodieRecord);
         recordsWritten++;
       } else {
         recordsDeleted++;
@@ -243,7 +242,7 @@ public class HoodieMergeHandle<T extends HoodieRecordPayload> extends HoodieWrit
       String errMsg = "Failed to merge old record into new file for key " + key + " from old file " + getOldFilePath()
           + " to new file " + newFilePath;
       try {
-        storageWriter.writeAvro(key, oldRecord);
+        fileWriter.writeAvro(key, oldRecord);
       } catch (ClassCastException e) {
         LOG.error("Schema mismatch when rewriting old record " + oldRecord + " from file " + getOldFilePath()
             + " to file " + newFilePath + " with writerSchema " + writerSchema.toString(true));
@@ -277,8 +276,8 @@ public class HoodieMergeHandle<T extends HoodieRecordPayload> extends HoodieWrit
       keyToNewRecords.clear();
       writtenRecordKeys.clear();
 
-      if (storageWriter != null) {
-        storageWriter.close();
+      if (fileWriter != null) {
+        fileWriter.close();
       }
 
       long fileSizeInBytes = FSUtils.getFileSize(fs, newFilePath);
diff --git a/hudi-client/src/main/java/org/apache/hudi/io/HoodieRangeInfoHandle.java b/hudi-client/src/main/java/org/apache/hudi/io/HoodieRangeInfoHandle.java
index e84b0fa..2b58583 100644
--- a/hudi-client/src/main/java/org/apache/hudi/io/HoodieRangeInfoHandle.java
+++ b/hudi-client/src/main/java/org/apache/hudi/io/HoodieRangeInfoHandle.java
@@ -18,14 +18,12 @@
 
 package org.apache.hudi.io;
 
-import org.apache.hudi.common.model.HoodieBaseFile;
 import org.apache.hudi.common.model.HoodieRecordPayload;
-import org.apache.hudi.common.util.ParquetUtils;
 import org.apache.hudi.common.util.collection.Pair;
 import org.apache.hudi.config.HoodieWriteConfig;
 import org.apache.hudi.table.HoodieTable;
 
-import org.apache.hadoop.fs.Path;
+import java.io.IOException;
 
 /**
  * Extract range information for a given file slice.
@@ -37,8 +35,7 @@ public class HoodieRangeInfoHandle<T extends HoodieRecordPayload> extends Hoodie
     super(config, null, hoodieTable, partitionPathFilePair);
   }
 
-  public String[] getMinMaxKeys() {
-    HoodieBaseFile dataFile = getLatestDataFile();
-    return ParquetUtils.readMinMaxRecordKeys(hoodieTable.getHadoopConf(), new Path(dataFile.getPath()));
+  public String[] getMinMaxKeys() throws IOException {
+    return createNewFileReader().readMinMaxRecordKeys();
   }
 }
diff --git a/hudi-client/src/main/java/org/apache/hudi/io/HoodieReadHandle.java b/hudi-client/src/main/java/org/apache/hudi/io/HoodieReadHandle.java
index 6662d00..8f2d4de 100644
--- a/hudi-client/src/main/java/org/apache/hudi/io/HoodieReadHandle.java
+++ b/hudi-client/src/main/java/org/apache/hudi/io/HoodieReadHandle.java
@@ -22,9 +22,14 @@ import org.apache.hudi.common.model.HoodieBaseFile;
 import org.apache.hudi.common.model.HoodieRecordPayload;
 import org.apache.hudi.common.util.collection.Pair;
 import org.apache.hudi.config.HoodieWriteConfig;
+import org.apache.hudi.io.storage.HoodieFileReader;
+import org.apache.hudi.io.storage.HoodieFileReaderFactory;
 import org.apache.hudi.table.HoodieTable;
 
+import java.io.IOException;
+
 import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
 
 /**
  * Base class for read operations done logically on the file group.
@@ -56,4 +61,9 @@ public abstract class HoodieReadHandle<T extends HoodieRecordPayload> extends Ho
     return hoodieTable.getBaseFileOnlyView()
         .getLatestBaseFile(partitionPathFilePair.getLeft(), partitionPathFilePair.getRight()).get();
   }
+
+  protected HoodieFileReader createNewFileReader() throws IOException {
+    return HoodieFileReaderFactory.getFileReader(hoodieTable.getHadoopConf(),
+        new Path(getLatestDataFile().getPath()));
+  }
 }
diff --git a/hudi-client/src/main/java/org/apache/hudi/io/HoodieWriteHandle.java b/hudi-client/src/main/java/org/apache/hudi/io/HoodieWriteHandle.java
index 08bb06e..4470b76 100644
--- a/hudi-client/src/main/java/org/apache/hudi/io/HoodieWriteHandle.java
+++ b/hudi-client/src/main/java/org/apache/hudi/io/HoodieWriteHandle.java
@@ -30,6 +30,8 @@ import org.apache.hudi.common.util.ReflectionUtils;
 import org.apache.hudi.config.HoodieWriteConfig;
 import org.apache.hudi.exception.HoodieException;
 import org.apache.hudi.exception.HoodieIOException;
+import org.apache.hudi.io.storage.HoodieFileWriter;
+import org.apache.hudi.io.storage.HoodieFileWriterFactory;
 import org.apache.hudi.table.HoodieTable;
 
 import org.apache.avro.Schema;
@@ -86,7 +88,8 @@ public abstract class HoodieWriteHandle<T extends HoodieRecordPayload> extends H
       throw new HoodieIOException("Failed to make dir " + path, e);
     }
 
-    return new Path(path.toString(), FSUtils.makeDataFileName(instantTime, writeToken, fileId));
+    return new Path(path.toString(), FSUtils.makeDataFileName(instantTime, writeToken, fileId,
+        hoodieTable.getMetaClient().getTableConfig().getBaseFileFormat().getFileExtension()));
   }
 
   /**
@@ -180,4 +183,9 @@ public abstract class HoodieWriteHandle<T extends HoodieRecordPayload> extends H
   protected long getAttemptId() {
     return sparkTaskContextSupplier.getAttemptIdSupplier().get();
   }
+
+  protected HoodieFileWriter createNewFileWriter(String instantTime, Path path, HoodieTable<T> hoodieTable,
+      HoodieWriteConfig config, Schema schema, SparkTaskContextSupplier sparkTaskContextSupplier) throws IOException {
+    return HoodieFileWriterFactory.getFileWriter(instantTime, path, hoodieTable, config, schema, sparkTaskContextSupplier);
+  }
 }
diff --git a/hudi-client/src/main/java/org/apache/hudi/io/storage/HoodieStorageWriter.java b/hudi-client/src/main/java/org/apache/hudi/io/storage/HoodieFileWriter.java
similarity index 94%
copy from hudi-client/src/main/java/org/apache/hudi/io/storage/HoodieStorageWriter.java
copy to hudi-client/src/main/java/org/apache/hudi/io/storage/HoodieFileWriter.java
index d8cdb0f..ea9ecad 100644
--- a/hudi-client/src/main/java/org/apache/hudi/io/storage/HoodieStorageWriter.java
+++ b/hudi-client/src/main/java/org/apache/hudi/io/storage/HoodieFileWriter.java
@@ -24,7 +24,7 @@ import org.apache.avro.generic.IndexedRecord;
 
 import java.io.IOException;
 
-public interface HoodieStorageWriter<R extends IndexedRecord> {
+public interface HoodieFileWriter<R extends IndexedRecord> {
 
   void writeAvroWithMetadata(R newRecord, HoodieRecord record) throws IOException;
 
diff --git a/hudi-client/src/main/java/org/apache/hudi/io/storage/HoodieStorageWriterFactory.java b/hudi-client/src/main/java/org/apache/hudi/io/storage/HoodieFileWriterFactory.java
similarity index 85%
rename from hudi-client/src/main/java/org/apache/hudi/io/storage/HoodieStorageWriterFactory.java
rename to hudi-client/src/main/java/org/apache/hudi/io/storage/HoodieFileWriterFactory.java
index cba4b05..90566fb 100644
--- a/hudi-client/src/main/java/org/apache/hudi/io/storage/HoodieStorageWriterFactory.java
+++ b/hudi-client/src/main/java/org/apache/hudi/io/storage/HoodieFileWriterFactory.java
@@ -34,23 +34,21 @@ import org.apache.parquet.avro.AvroSchemaConverter;
 
 import java.io.IOException;
 
-import static org.apache.hudi.common.model.HoodieFileFormat.HOODIE_LOG;
 import static org.apache.hudi.common.model.HoodieFileFormat.PARQUET;
 
-public class HoodieStorageWriterFactory {
+public class HoodieFileWriterFactory {
 
-  public static <T extends HoodieRecordPayload, R extends IndexedRecord> HoodieStorageWriter<R> getStorageWriter(
+  public static <T extends HoodieRecordPayload, R extends IndexedRecord> HoodieFileWriter<R> getFileWriter(
       String instantTime, Path path, HoodieTable<T> hoodieTable, HoodieWriteConfig config, Schema schema,
       SparkTaskContextSupplier sparkTaskContextSupplier) throws IOException {
-    final String name = path.getName();
-    final String extension = FSUtils.isLogFile(path) ? HOODIE_LOG.getFileExtension() : FSUtils.getFileExtension(name);
+    final String extension = FSUtils.getFileExtension(path.getName());
     if (PARQUET.getFileExtension().equals(extension)) {
-      return newParquetStorageWriter(instantTime, path, config, schema, hoodieTable, sparkTaskContextSupplier);
+      return newParquetFileWriter(instantTime, path, config, schema, hoodieTable, sparkTaskContextSupplier);
     }
     throw new UnsupportedOperationException(extension + " format not supported yet.");
   }
 
-  private static <T extends HoodieRecordPayload, R extends IndexedRecord> HoodieStorageWriter<R> newParquetStorageWriter(
+  private static <T extends HoodieRecordPayload, R extends IndexedRecord> HoodieFileWriter<R> newParquetFileWriter(
       String instantTime, Path path, HoodieWriteConfig config, Schema schema, HoodieTable hoodieTable,
       SparkTaskContextSupplier sparkTaskContextSupplier) throws IOException {
     BloomFilter filter = BloomFilterFactory
diff --git a/hudi-client/src/main/java/org/apache/hudi/io/storage/HoodieParquetWriter.java b/hudi-client/src/main/java/org/apache/hudi/io/storage/HoodieParquetWriter.java
index 35258ff..8c4c7e6 100644
--- a/hudi-client/src/main/java/org/apache/hudi/io/storage/HoodieParquetWriter.java
+++ b/hudi-client/src/main/java/org/apache/hudi/io/storage/HoodieParquetWriter.java
@@ -29,7 +29,6 @@ import org.apache.hudi.common.model.HoodieRecordPayload;
 import org.apache.avro.Schema;
 import org.apache.avro.generic.GenericRecord;
 import org.apache.avro.generic.IndexedRecord;
-import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.fs.Path;
 import org.apache.parquet.hadoop.ParquetFileWriter;
 import org.apache.parquet.hadoop.ParquetWriter;
@@ -42,7 +41,7 @@ import java.util.concurrent.atomic.AtomicLong;
  * the current file can take more records with the <code>canWrite()</code>
  */
 public class HoodieParquetWriter<T extends HoodieRecordPayload, R extends IndexedRecord>
-    extends ParquetWriter<IndexedRecord> implements HoodieStorageWriter<R> {
+    extends ParquetWriter<IndexedRecord> implements HoodieFileWriter<R> {
 
   private static AtomicLong recordIndex = new AtomicLong(1);
 
@@ -51,7 +50,6 @@ public class HoodieParquetWriter<T extends HoodieRecordPayload, R extends Indexe
   private final long maxFileSize;
   private final HoodieAvroWriteSupport writeSupport;
   private final String instantTime;
-  private final Schema schema;
   private final SparkTaskContextSupplier sparkTaskContextSupplier;
 
   public HoodieParquetWriter(String instantTime, Path file, HoodieParquetConfig parquetConfig,
@@ -60,10 +58,10 @@ public class HoodieParquetWriter<T extends HoodieRecordPayload, R extends Indexe
         ParquetFileWriter.Mode.CREATE, parquetConfig.getWriteSupport(), parquetConfig.getCompressionCodecName(),
         parquetConfig.getBlockSize(), parquetConfig.getPageSize(), parquetConfig.getPageSize(),
         ParquetWriter.DEFAULT_IS_DICTIONARY_ENABLED, ParquetWriter.DEFAULT_IS_VALIDATING_ENABLED,
-        ParquetWriter.DEFAULT_WRITER_VERSION, registerFileSystem(file, parquetConfig.getHadoopConf()));
+        ParquetWriter.DEFAULT_WRITER_VERSION, FSUtils.registerFileSystem(file, parquetConfig.getHadoopConf()));
     this.file = HoodieWrapperFileSystem.convertToHoodiePath(file, parquetConfig.getHadoopConf());
     this.fs =
-        (HoodieWrapperFileSystem) this.file.getFileSystem(registerFileSystem(file, parquetConfig.getHadoopConf()));
+        (HoodieWrapperFileSystem) this.file.getFileSystem(FSUtils.registerFileSystem(file, parquetConfig.getHadoopConf()));
     // We cannot accurately measure the snappy compressed output file size. We are choosing a
     // conservative 10%
     // TODO - compute this compression ratio dynamically by looking at the bytes written to the
@@ -72,18 +70,9 @@ public class HoodieParquetWriter<T extends HoodieRecordPayload, R extends Indexe
         + Math.round(parquetConfig.getMaxFileSize() * parquetConfig.getCompressionRatio());
     this.writeSupport = parquetConfig.getWriteSupport();
     this.instantTime = instantTime;
-    this.schema = schema;
     this.sparkTaskContextSupplier = sparkTaskContextSupplier;
   }
 
-  public static Configuration registerFileSystem(Path file, Configuration conf) {
-    Configuration returnConf = new Configuration(conf);
-    String scheme = FSUtils.getFs(file.toString(), conf).getScheme();
-    returnConf.set("fs." + HoodieWrapperFileSystem.getHoodieScheme(scheme) + ".impl",
-        HoodieWrapperFileSystem.class.getName());
-    return returnConf;
-  }
-
   @Override
   public void writeAvroWithMetadata(R avroRecord, HoodieRecord record) throws IOException {
     String seqId =
diff --git a/hudi-client/src/main/java/org/apache/hudi/table/HoodieCopyOnWriteTable.java b/hudi-client/src/main/java/org/apache/hudi/table/HoodieCopyOnWriteTable.java
index 64b3bf0..9f3bb82 100644
--- a/hudi-client/src/main/java/org/apache/hudi/table/HoodieCopyOnWriteTable.java
+++ b/hudi-client/src/main/java/org/apache/hudi/table/HoodieCopyOnWriteTable.java
@@ -27,7 +27,6 @@ import org.apache.hudi.avro.model.HoodieRestoreMetadata;
 import org.apache.hudi.avro.model.HoodieRollbackMetadata;
 import org.apache.hudi.avro.model.HoodieSavepointMetadata;
 import org.apache.hudi.client.WriteStatus;
-import org.apache.hudi.client.utils.ParquetReaderIterator;
 import org.apache.hudi.common.model.HoodieBaseFile;
 import org.apache.hudi.common.model.HoodieKey;
 import org.apache.hudi.common.model.HoodieRecord;
@@ -44,6 +43,8 @@ import org.apache.hudi.exception.HoodieUpsertException;
 import org.apache.hudi.execution.SparkBoundedInMemoryExecutor;
 import org.apache.hudi.io.HoodieCreateHandle;
 import org.apache.hudi.io.HoodieMergeHandle;
+import org.apache.hudi.io.storage.HoodieFileReader;
+import org.apache.hudi.io.storage.HoodieFileReaderFactory;
 import org.apache.hudi.table.action.clean.CleanActionExecutor;
 import org.apache.hudi.table.action.HoodieWriteMetadata;
 import org.apache.hudi.table.action.commit.BulkInsertCommitActionExecutor;
@@ -58,9 +59,6 @@ import org.apache.hudi.table.action.rollback.CopyOnWriteRollbackActionExecutor;
 import org.apache.hudi.table.action.savepoint.SavepointActionExecutor;
 import org.apache.log4j.LogManager;
 import org.apache.log4j.Logger;
-import org.apache.parquet.avro.AvroParquetReader;
-import org.apache.parquet.avro.AvroReadSupport;
-import org.apache.parquet.hadoop.ParquetReader;
 import org.apache.spark.api.java.JavaRDD;
 import org.apache.spark.api.java.JavaSparkContext;
 
@@ -150,11 +148,13 @@ public class HoodieCopyOnWriteTable<T extends HoodieRecordPayload> extends Hoodi
       throw new HoodieUpsertException(
           "Error in finding the old file path at commit " + instantTime + " for fileId: " + fileId);
     } else {
-      AvroReadSupport.setAvroReadSchema(getHadoopConf(), upsertHandle.getWriterSchema());
       BoundedInMemoryExecutor<GenericRecord, GenericRecord, Void> wrapper = null;
-      try (ParquetReader<IndexedRecord> reader =
-          AvroParquetReader.<IndexedRecord>builder(upsertHandle.getOldFilePath()).withConf(getHadoopConf()).build()) {
-        wrapper = new SparkBoundedInMemoryExecutor(config, new ParquetReaderIterator(reader),
+      HoodieFileReader<IndexedRecord> storageReader =
+          HoodieFileReaderFactory.getFileReader(getHadoopConf(), upsertHandle.getOldFilePath());
+
+      try {
+        wrapper =
+            new SparkBoundedInMemoryExecutor(config, storageReader.getRecordIterator(upsertHandle.getWriterSchema()),
             new UpdateHandler(upsertHandle), x -> x);
         wrapper.execute();
       } catch (Exception e) {
diff --git a/hudi-client/src/main/java/org/apache/hudi/table/HoodieTable.java b/hudi-client/src/main/java/org/apache/hudi/table/HoodieTable.java
index 0584dbf..49d8858 100644
--- a/hudi-client/src/main/java/org/apache/hudi/table/HoodieTable.java
+++ b/hudi-client/src/main/java/org/apache/hudi/table/HoodieTable.java
@@ -34,12 +34,14 @@ import org.apache.hudi.common.fs.ConsistencyGuard;
 import org.apache.hudi.common.fs.ConsistencyGuard.FileVisibility;
 import org.apache.hudi.common.fs.FSUtils;
 import org.apache.hudi.common.fs.FailSafeConsistencyGuard;
+import org.apache.hudi.common.model.HoodieFileFormat;
 import org.apache.hudi.common.model.HoodieKey;
 import org.apache.hudi.common.model.HoodieRecord;
 import org.apache.hudi.common.model.HoodieRecordPayload;
 import org.apache.hudi.common.model.HoodieWriteStat;
 import org.apache.hudi.common.table.HoodieTableMetaClient;
 import org.apache.hudi.common.table.TableSchemaResolver;
+import org.apache.hudi.common.table.log.block.HoodieLogBlock.HoodieLogBlockType;
 import org.apache.hudi.common.table.timeline.HoodieActiveTimeline;
 import org.apache.hudi.common.table.timeline.HoodieInstant;
 import org.apache.hudi.common.table.timeline.HoodieTimeline;
@@ -310,7 +312,7 @@ public abstract class HoodieTable<T extends HoodieRecordPayload> implements Seri
 
   /**
    * Schedule compaction for the instant time.
-   * 
+   *
    * @param jsc Spark Context
    * @param instantTime Instant Time for scheduling compaction
    * @param extraMetadata additional metadata to write into plan
@@ -381,7 +383,7 @@ public abstract class HoodieTable<T extends HoodieRecordPayload> implements Seri
 
   /**
    * Delete Marker directory corresponding to an instant.
-   * 
+   *
    * @param instantTs Instant Time
    */
   public void deleteMarkerDir(String instantTs) {
@@ -422,9 +424,11 @@ public abstract class HoodieTable<T extends HoodieRecordPayload> implements Seri
         return;
       }
 
-      List<String> invalidDataPaths = FSUtils.getAllDataFilesForMarkers(fs, basePath, instantTs, markerDir.toString());
+      final String baseFileExtension = getBaseFileFormat().getFileExtension();
+      List<String> invalidDataPaths = FSUtils.getAllDataFilesForMarkers(fs, basePath, instantTs, markerDir.toString(),
+          baseFileExtension);
       List<String> validDataPaths = stats.stream().map(w -> String.format("%s/%s", basePath, w.getPath()))
-          .filter(p -> p.endsWith(".parquet")).collect(Collectors.toList());
+          .filter(p -> p.endsWith(baseFileExtension)).collect(Collectors.toList());
       // Contains list of partially created files. These needs to be cleaned up.
       invalidDataPaths.removeAll(validDataPaths);
       if (!invalidDataPaths.isEmpty()) {
@@ -478,7 +482,7 @@ public abstract class HoodieTable<T extends HoodieRecordPayload> implements Seri
 
   /**
    * Ensures all files passed either appear or disappear.
-   * 
+   *
    * @param jsc JavaSparkContext
    * @param groupByPartition Files grouped by partition
    * @param visibility Appear/Disappear
@@ -562,4 +566,26 @@ public abstract class HoodieTable<T extends HoodieRecordPayload> implements Seri
       throw new HoodieInsertException("Failed insert schema compability check.", e);
     }
   }
+
+  public HoodieFileFormat getBaseFileFormat() {
+    return metaClient.getTableConfig().getBaseFileFormat();
+  }
+
+  public HoodieFileFormat getLogFileFormat() {
+    return metaClient.getTableConfig().getLogFileFormat();
+  }
+
+  public HoodieLogBlockType getLogDataBlockFormat() {
+    switch (getBaseFileFormat()) {
+      case PARQUET:
+        return HoodieLogBlockType.AVRO_DATA_BLOCK;
+      default:
+        throw new HoodieException("Base file format " + getBaseFileFormat()
+            + " does not have associated log block format");
+    }
+  }
+
+  public String getBaseFileExtension() {
+    return getBaseFileFormat().getFileExtension();
+  }
 }
diff --git a/hudi-client/src/main/java/org/apache/hudi/table/action/commit/CommitActionExecutor.java b/hudi-client/src/main/java/org/apache/hudi/table/action/commit/CommitActionExecutor.java
index 5208c12..c07d4c9 100644
--- a/hudi-client/src/main/java/org/apache/hudi/table/action/commit/CommitActionExecutor.java
+++ b/hudi-client/src/main/java/org/apache/hudi/table/action/commit/CommitActionExecutor.java
@@ -19,7 +19,6 @@
 package org.apache.hudi.table.action.commit;
 
 import org.apache.hudi.client.WriteStatus;
-import org.apache.hudi.client.utils.ParquetReaderIterator;
 import org.apache.hudi.common.model.HoodieBaseFile;
 import org.apache.hudi.common.model.HoodieRecord;
 import org.apache.hudi.common.model.HoodieRecordPayload;
@@ -32,6 +31,8 @@ import org.apache.hudi.exception.HoodieUpsertException;
 import org.apache.hudi.execution.LazyInsertIterable;
 import org.apache.hudi.execution.SparkBoundedInMemoryExecutor;
 import org.apache.hudi.io.HoodieMergeHandle;
+import org.apache.hudi.io.storage.HoodieFileReader;
+import org.apache.hudi.io.storage.HoodieFileReaderFactory;
 import org.apache.hudi.table.HoodieTable;
 import org.apache.hudi.table.WorkloadProfile;
 
@@ -39,9 +40,6 @@ import org.apache.avro.generic.GenericRecord;
 import org.apache.avro.generic.IndexedRecord;
 import org.apache.log4j.LogManager;
 import org.apache.log4j.Logger;
-import org.apache.parquet.avro.AvroParquetReader;
-import org.apache.parquet.avro.AvroReadSupport;
-import org.apache.parquet.hadoop.ParquetReader;
 import org.apache.spark.Partitioner;
 import org.apache.spark.api.java.JavaSparkContext;
 
@@ -89,11 +87,12 @@ public abstract class CommitActionExecutor<T extends HoodieRecordPayload<T>>
       throw new HoodieUpsertException(
           "Error in finding the old file path at commit " + instantTime + " for fileId: " + fileId);
     } else {
-      AvroReadSupport.setAvroReadSchema(table.getHadoopConf(), upsertHandle.getWriterSchema());
       BoundedInMemoryExecutor<GenericRecord, GenericRecord, Void> wrapper = null;
-      try (ParquetReader<IndexedRecord> reader =
-          AvroParquetReader.<IndexedRecord>builder(upsertHandle.getOldFilePath()).withConf(table.getHadoopConf()).build()) {
-        wrapper = new SparkBoundedInMemoryExecutor(config, new ParquetReaderIterator(reader),
+      try {
+        HoodieFileReader<IndexedRecord> storageReader =
+            HoodieFileReaderFactory.getFileReader(table.getHadoopConf(), upsertHandle.getOldFilePath());
+        wrapper =
+            new SparkBoundedInMemoryExecutor(config, storageReader.getRecordIterator(upsertHandle.getWriterSchema()),
             new UpdateHandler(upsertHandle), x -> x);
         wrapper.execute();
       } catch (Exception e) {
diff --git a/hudi-client/src/main/java/org/apache/hudi/table/action/deltacommit/DeltaCommitActionExecutor.java b/hudi-client/src/main/java/org/apache/hudi/table/action/deltacommit/DeltaCommitActionExecutor.java
index be3806e..8c24afd 100644
--- a/hudi-client/src/main/java/org/apache/hudi/table/action/deltacommit/DeltaCommitActionExecutor.java
+++ b/hudi-client/src/main/java/org/apache/hudi/table/action/deltacommit/DeltaCommitActionExecutor.java
@@ -83,7 +83,7 @@ public abstract class DeltaCommitActionExecutor<T extends HoodieRecordPayload<T>
   @Override
   public Iterator<List<WriteStatus>> handleInsert(String idPfx, Iterator<HoodieRecord<T>> recordItr)
       throws Exception {
-    // If canIndexLogFiles, write inserts to log files else write inserts to parquet files
+    // If canIndexLogFiles, write inserts to log files else write inserts to base files
     if (table.getIndex().canIndexLogFiles()) {
       return new LazyInsertIterable<>(recordItr, config, instantTime, (HoodieTable<T>)table, idPfx,
           sparkTaskContextSupplier, new AppendHandleFactory<>());
diff --git a/hudi-client/src/main/java/org/apache/hudi/table/action/rollback/RollbackHelper.java b/hudi-client/src/main/java/org/apache/hudi/table/action/rollback/RollbackHelper.java
index f64483a..4a9c20a 100644
--- a/hudi-client/src/main/java/org/apache/hudi/table/action/rollback/RollbackHelper.java
+++ b/hudi-client/src/main/java/org/apache/hudi/table/action/rollback/RollbackHelper.java
@@ -71,8 +71,9 @@ public class RollbackHelper implements Serializable {
    */
   public List<HoodieRollbackStat> performRollback(JavaSparkContext jsc, HoodieInstant instantToRollback, List<RollbackRequest> rollbackRequests) {
 
+    String basefileExtension = metaClient.getTableConfig().getBaseFileFormat().getFileExtension();
     SerializablePathFilter filter = (path) -> {
-      if (path.toString().contains(".parquet")) {
+      if (path.toString().contains(basefileExtension)) {
         String fileCommitTime = FSUtils.getCommitTime(path.getName());
         return instantToRollback.getTimestamp().equals(fileCommitTime);
       } else if (path.toString().contains(".log")) {
@@ -184,8 +185,9 @@ public class RollbackHelper implements Serializable {
       Map<FileStatus, Boolean> results, String commit, String partitionPath) throws IOException {
     LOG.info("Cleaning path " + partitionPath);
     FileSystem fs = metaClient.getFs();
+    String basefileExtension = metaClient.getTableConfig().getBaseFileFormat().getFileExtension();
     PathFilter filter = (path) -> {
-      if (path.toString().contains(".parquet")) {
+      if (path.toString().contains(basefileExtension)) {
         String fileCommitTime = FSUtils.getCommitTime(path.getName());
         return commit.equals(fileCommitTime);
       }
diff --git a/hudi-client/src/test/java/org/apache/hudi/index/bloom/TestHoodieBloomIndex.java b/hudi-client/src/test/java/org/apache/hudi/index/bloom/TestHoodieBloomIndex.java
index 97acf03..2e91a84 100644
--- a/hudi-client/src/test/java/org/apache/hudi/index/bloom/TestHoodieBloomIndex.java
+++ b/hudi-client/src/test/java/org/apache/hudi/index/bloom/TestHoodieBloomIndex.java
@@ -257,7 +257,11 @@ public class TestHoodieBloomIndex extends HoodieClientTestHarness {
     List<String> uuids =
         Arrays.asList(record1.getRecordKey(), record2.getRecordKey(), record3.getRecordKey(), record4.getRecordKey());
 
-    List<String> results = HoodieKeyLookupHandle.checkCandidatesAgainstFile(hadoopConf, uuids,
+    HoodieWriteConfig config = HoodieWriteConfig.newBuilder().withPath(basePath).build();
+    HoodieTable table = HoodieTable.create(metaClient, config, hadoopConf);
+    HoodieKeyLookupHandle keyHandle = new HoodieKeyLookupHandle<>(config, table,
+        Pair.of("2016/01/31/", FSUtils.getFileId(filename)));
+    List<String> results = keyHandle.checkCandidatesAgainstFile(hadoopConf, uuids,
         new Path(basePath + "/2016/01/31/" + filename));
     assertEquals(results.size(), 2);
     assertTrue(results.get(0).equals("1eb5b87a-1feh-4edd-87b4-6ec96dc405a0")
diff --git a/hudi-client/src/test/java/org/apache/hudi/io/storage/TestHoodieStorageWriterFactory.java b/hudi-client/src/test/java/org/apache/hudi/io/storage/TestHoodieFileWriterFactory.java
old mode 100755
new mode 100644
similarity index 85%
rename from hudi-client/src/test/java/org/apache/hudi/io/storage/TestHoodieStorageWriterFactory.java
rename to hudi-client/src/test/java/org/apache/hudi/io/storage/TestHoodieFileWriterFactory.java
index a82756a..2584643
--- a/hudi-client/src/test/java/org/apache/hudi/io/storage/TestHoodieStorageWriterFactory.java
+++ b/hudi-client/src/test/java/org/apache/hudi/io/storage/TestHoodieFileWriterFactory.java
@@ -34,26 +34,26 @@ import static org.junit.jupiter.api.Assertions.assertThrows;
 import static org.junit.jupiter.api.Assertions.assertTrue;
 
 /**
- * Tests for {@link HoodieStorageWriterFactory}.
+ * Tests for {@link HoodieFileWriterFactory}.
  */
-public class TestHoodieStorageWriterFactory extends HoodieClientTestBase {
+public class TestHoodieFileWriterFactory extends HoodieClientTestBase {
 
   @Test
-  public void testGetStorageWriter() throws IOException {
+  public void testGetFileWriter() throws IOException {
     // parquet file format.
     final String instantTime = "100";
     final Path parquetPath = new Path(basePath + "/partition/path/f1_1-0-1_000.parquet");
     final HoodieWriteConfig cfg = getConfig();
     HoodieTable table = HoodieTable.create(metaClient, cfg, hadoopConf);
     SparkTaskContextSupplier supplier = new SparkTaskContextSupplier();
-    HoodieStorageWriter<IndexedRecord> parquetWriter = HoodieStorageWriterFactory.getStorageWriter(instantTime,
+    HoodieFileWriter<IndexedRecord> parquetWriter = HoodieFileWriterFactory.getFileWriter(instantTime,
         parquetPath, table, cfg, HoodieTestDataGenerator.AVRO_SCHEMA, supplier);
     assertTrue(parquetWriter instanceof HoodieParquetWriter);
 
     // other file format exception.
     final Path logPath = new Path(basePath + "/partition/path/f.b51192a8-574b-4a85-b246-bcfec03ac8bf_100.log.2_1-0-1");
     final Throwable thrown = assertThrows(UnsupportedOperationException.class, () -> {
-      HoodieStorageWriter<IndexedRecord> logWriter = HoodieStorageWriterFactory.getStorageWriter(instantTime, logPath,
+      HoodieFileWriter<IndexedRecord> logWriter = HoodieFileWriterFactory.getFileWriter(instantTime, logPath,
           table, cfg, HoodieTestDataGenerator.AVRO_SCHEMA, supplier);
     }, "should fail since log storage writer is not supported yet.");
     assertTrue(thrown.getMessage().contains("format not supported yet."));
diff --git a/hudi-client/src/test/java/org/apache/hudi/table/TestHoodieMergeOnReadTable.java b/hudi-client/src/test/java/org/apache/hudi/table/TestHoodieMergeOnReadTable.java
index 6092fd8..0e94025 100644
--- a/hudi-client/src/test/java/org/apache/hudi/table/TestHoodieMergeOnReadTable.java
+++ b/hudi-client/src/test/java/org/apache/hudi/table/TestHoodieMergeOnReadTable.java
@@ -24,6 +24,7 @@ import org.apache.hudi.client.WriteStatus;
 import org.apache.hudi.common.model.FileSlice;
 import org.apache.hudi.common.model.HoodieBaseFile;
 import org.apache.hudi.common.model.HoodieCommitMetadata;
+import org.apache.hudi.common.model.HoodieFileFormat;
 import org.apache.hudi.common.model.HoodieFileGroup;
 import org.apache.hudi.common.model.HoodieKey;
 import org.apache.hudi.common.model.HoodieRecord;
@@ -46,7 +47,9 @@ import org.apache.hudi.config.HoodieCompactionConfig;
 import org.apache.hudi.config.HoodieIndexConfig;
 import org.apache.hudi.config.HoodieStorageConfig;
 import org.apache.hudi.config.HoodieWriteConfig;
+import org.apache.hudi.exception.HoodieIOException;
 import org.apache.hudi.hadoop.utils.HoodieHiveUtils;
+import org.apache.hudi.hadoop.utils.HoodieInputFormatUtils;
 import org.apache.hudi.hadoop.HoodieParquetInputFormat;
 import org.apache.hudi.hadoop.realtime.HoodieParquetRealtimeInputFormat;
 import org.apache.hudi.index.HoodieIndex;
@@ -66,9 +69,9 @@ import org.apache.hadoop.mapred.FileInputFormat;
 import org.apache.hadoop.mapred.JobConf;
 import org.apache.spark.api.java.JavaRDD;
 import org.junit.jupiter.api.AfterEach;
-import org.junit.jupiter.api.BeforeEach;
-import org.junit.jupiter.api.Test;
 import org.junit.jupiter.api.io.TempDir;
+import org.junit.jupiter.params.ParameterizedTest;
+import org.junit.jupiter.params.provider.MethodSource;
 
 import java.io.File;
 import java.io.IOException;
@@ -90,37 +93,32 @@ import static org.junit.jupiter.api.Assertions.assertFalse;
 import static org.junit.jupiter.api.Assertions.assertTrue;
 
 public class TestHoodieMergeOnReadTable extends HoodieClientTestHarness {
-  private HoodieParquetInputFormat roSnapshotInputFormat;
   private JobConf roSnapshotJobConf;
-
-  private HoodieParquetInputFormat roInputFormat;
   private JobConf roJobConf;
-
-  private HoodieParquetRealtimeInputFormat rtInputFormat;
   private JobConf rtJobConf;
 
-  @BeforeEach
-  public void init() throws IOException {
+  @TempDir
+  public java.nio.file.Path tempFolder;
+  private HoodieFileFormat baseFileFormat;
+
+  static Stream<HoodieFileFormat> argumentsProvider() {
+    return Stream.of(HoodieFileFormat.PARQUET);
+  }
+
+  public void init(HoodieFileFormat baseFileFormat) throws IOException {
+    this.baseFileFormat = baseFileFormat;
+
     initDFS();
     initSparkContexts("TestHoodieMergeOnReadTable");
     hadoopConf.addResource(dfs.getConf());
     initPath();
     dfs.mkdirs(new Path(basePath));
-    HoodieTestUtils.init(hadoopConf, basePath, HoodieTableType.MERGE_ON_READ);
+    metaClient = HoodieTestUtils.init(hadoopConf, basePath, HoodieTableType.MERGE_ON_READ, baseFileFormat);
     initTestDataGenerator();
 
-    // initialize parquet input format
-    roSnapshotInputFormat = new HoodieParquetInputFormat();
-    roSnapshotJobConf = new JobConf(jsc.hadoopConfiguration());
-    roSnapshotInputFormat.setConf(roSnapshotJobConf);
-
-    roInputFormat = new HoodieParquetInputFormat();
+    roSnapshotJobConf = new JobConf(hadoopConf);
     roJobConf = new JobConf(hadoopConf);
-    roInputFormat.setConf(roJobConf);
-
-    rtInputFormat = new HoodieParquetRealtimeInputFormat();
     rtJobConf = new JobConf(hadoopConf);
-    rtInputFormat.setConf(rtJobConf);
   }
 
   @AfterEach
@@ -128,8 +126,11 @@ public class TestHoodieMergeOnReadTable extends HoodieClientTestHarness {
     cleanupResources();
   }
 
-  @Test
-  public void testSimpleInsertAndUpdate() throws Exception {
+  @ParameterizedTest
+  @MethodSource("argumentsProvider")
+  public void testSimpleInsertAndUpdate(HoodieFileFormat baseFileFormat) throws Exception {
+    init(baseFileFormat);
+
     HoodieWriteConfig cfg = getConfig(true);
     try (HoodieWriteClient client = getHoodieWriteClient(cfg);) {
 
@@ -153,9 +154,10 @@ public class TestHoodieMergeOnReadTable extends HoodieClientTestHarness {
       String compactionCommitTime = client.scheduleCompaction(Option.empty()).get().toString();
       client.compact(compactionCommitTime);
 
-      FileStatus[] allFiles = HoodieTestUtils.listAllDataFilesInPath(dfs, cfg.getBasePath());
-      hoodieTable = HoodieTable.create(metaClient, cfg, hadoopConf);
+      HoodieTable hoodieTable = HoodieTable.create(metaClient, cfg, hadoopConf);
+      FileStatus[] allFiles = listAllDataFilesInPath(hoodieTable, cfg.getBasePath());
       tableView = getHoodieTableFileSystemView(metaClient, hoodieTable.getCompletedCommitsTimeline(), allFiles);
+      HoodieTableFileSystemView roView = new HoodieTableFileSystemView(metaClient, hoodieTable.getCompletedCommitsTimeline(), allFiles);
       Stream<HoodieBaseFile> dataFilesToRead = tableView.getLatestBaseFiles();
       assertTrue(dataFilesToRead.findAny().isPresent());
 
@@ -174,8 +176,11 @@ public class TestHoodieMergeOnReadTable extends HoodieClientTestHarness {
 
   // test incremental read does not go past compaction instant for RO views
   // For RT views, incremental read can go past compaction
-  @Test
-  public void testIncrementalReadsWithCompaction() throws Exception {
+  @ParameterizedTest
+  @MethodSource("argumentsProvider")
+  public void testIncrementalReadsWithCompaction(HoodieFileFormat baseFileFormat) throws Exception {
+    init(baseFileFormat);
+
     String partitionPath = "2020/02/20"; // use only one partition for this test
     dataGen = new HoodieTestDataGenerator(new String[] { partitionPath });
     HoodieWriteConfig cfg = getConfig(true);
@@ -190,19 +195,17 @@ public class TestHoodieMergeOnReadTable extends HoodieClientTestHarness {
       List<HoodieRecord> records001 = dataGen.generateInserts(commitTime1, 200);
       insertAndGetFilePaths(records001, client, cfg, commitTime1);
 
-      // verify only one parquet file shows up with commit time 001
+      // verify only one base file shows up with commit time 001
       FileStatus[] snapshotROFiles = getROSnapshotFiles(partitionPath);
-      validateFiles(partitionPath,1, snapshotROFiles, roSnapshotInputFormat,
-          roSnapshotJobConf,200, commitTime1);
+      validateFiles(partitionPath, 1, snapshotROFiles, false, roSnapshotJobConf, 200, commitTime1);
 
       FileStatus[] incrementalROFiles = getROIncrementalFiles(partitionPath, true);
-      validateFiles(partitionPath, 1, incrementalROFiles, roInputFormat,
-              roJobConf,200, commitTime1);
+      validateFiles(partitionPath, 1, incrementalROFiles, false, roJobConf, 200, commitTime1);
       Path firstFilePath = incrementalROFiles[0].getPath();
 
       FileStatus[] incrementalRTFiles = getRTIncrementalFiles(partitionPath);
-      validateFiles(partitionPath, 1, incrementalRTFiles, rtInputFormat,
-              rtJobConf,200, commitTime1);
+      validateFiles(partitionPath, 1, incrementalRTFiles, true, rtJobConf,200, commitTime1);
+
       assertEquals(firstFilePath, incrementalRTFiles[0].getPath());
 
       /**
@@ -215,14 +218,12 @@ public class TestHoodieMergeOnReadTable extends HoodieClientTestHarness {
 
       // verify RO incremental reads - only one parquet file shows up because updates to into log files
       incrementalROFiles = getROIncrementalFiles(partitionPath, false);
-      validateFiles(partitionPath, 1, incrementalROFiles, roInputFormat,
-              roJobConf, 200, commitTime1);
+      validateFiles(partitionPath, 1, incrementalROFiles, false, roJobConf, 200, commitTime1);
       assertEquals(firstFilePath, incrementalROFiles[0].getPath());
 
       // verify RT incremental reads includes updates also
       incrementalRTFiles = getRTIncrementalFiles(partitionPath);
-      validateFiles(partitionPath, 1, incrementalRTFiles, rtInputFormat,
-              rtJobConf, 200, commitTime1, updateTime);
+      validateFiles(partitionPath, 1, incrementalRTFiles, true, rtJobConf, 200, commitTime1, updateTime);
 
       // request compaction, but do not perform compaction
       String compactionCommitTime = "005";
@@ -230,13 +231,11 @@ public class TestHoodieMergeOnReadTable extends HoodieClientTestHarness {
 
       // verify RO incremental reads - only one parquet file shows up because updates go into log files
       incrementalROFiles = getROIncrementalFiles(partitionPath, true);
-      validateFiles(partitionPath,1, incrementalROFiles, roInputFormat,
-              roJobConf, 200, commitTime1);
+      validateFiles(partitionPath,1, incrementalROFiles, false, roJobConf, 200, commitTime1);
 
       // verify RT incremental reads includes updates also
       incrementalRTFiles = getRTIncrementalFiles(partitionPath);
-      validateFiles(partitionPath, 1, incrementalRTFiles, rtInputFormat,
-              rtJobConf, 200, commitTime1, updateTime);
+      validateFiles(partitionPath, 1, incrementalRTFiles, true, rtJobConf, 200, commitTime1, updateTime);
 
       // write 3 - more inserts
       String insertsTime = "006";
@@ -246,44 +245,44 @@ public class TestHoodieMergeOnReadTable extends HoodieClientTestHarness {
 
       // verify new write shows up in snapshot mode even though there is pending compaction
       snapshotROFiles = getROSnapshotFiles(partitionPath);
-      validateFiles(partitionPath, 2, snapshotROFiles, roSnapshotInputFormat,
-          roSnapshotJobConf,400, commitTime1, insertsTime);
+      validateFiles(partitionPath, 2, snapshotROFiles, false, roSnapshotJobConf,400, commitTime1, insertsTime);
 
       incrementalROFiles = getROIncrementalFiles(partitionPath, true);
       assertEquals(firstFilePath, incrementalROFiles[0].getPath());
       // verify 006 does not show up in RO mode because of pending compaction
-      validateFiles(partitionPath, 1, incrementalROFiles, roInputFormat,
-              roJobConf, 200, commitTime1);
+
+      validateFiles(partitionPath, 1, incrementalROFiles, false, roJobConf, 200, commitTime1);
 
       // verify that if stopAtCompaction is disabled, inserts from "insertsTime" show up
       incrementalROFiles = getROIncrementalFiles(partitionPath, false);
-      validateFiles(partitionPath,2, incrementalROFiles, roInputFormat,
-          roJobConf, 400, commitTime1, insertsTime);
+      validateFiles(partitionPath,2, incrementalROFiles, false, roJobConf, 400, commitTime1, insertsTime);
 
       // verify 006 shows up in RT views
       incrementalRTFiles = getRTIncrementalFiles(partitionPath);
-      validateFiles(partitionPath, 2, incrementalRTFiles, rtInputFormat,
-              rtJobConf, 400, commitTime1, updateTime, insertsTime);
+      validateFiles(partitionPath, 2, incrementalRTFiles, true, rtJobConf, 400, commitTime1, updateTime, insertsTime);
 
       // perform the scheduled compaction
       client.compact(compactionCommitTime);
 
       // verify new write shows up in snapshot mode after compaction is complete
       snapshotROFiles = getROSnapshotFiles(partitionPath);
-      validateFiles(partitionPath,2, snapshotROFiles, roSnapshotInputFormat,
-          roSnapshotJobConf,400, commitTime1, compactionCommitTime, insertsTime);
+      validateFiles(partitionPath,2, snapshotROFiles, false, roSnapshotJobConf,400, commitTime1, compactionCommitTime,
+          insertsTime);
 
       incrementalROFiles = getROIncrementalFiles(partitionPath, "002", -1, true);
       assertTrue(incrementalROFiles.length == 2);
       // verify 006 shows up because of pending compaction
-      validateFiles(partitionPath, 2, incrementalROFiles, roInputFormat,
-              roJobConf, 400, commitTime1, compactionCommitTime, insertsTime);
+      validateFiles(partitionPath, 2, incrementalROFiles, false, roJobConf, 400, commitTime1, compactionCommitTime,
+                    insertsTime);
     }
   }
 
   // Check if record level metadata is aggregated properly at the end of write.
-  @Test
-  public void testMetadataAggregateFromWriteStatus() throws Exception {
+  @ParameterizedTest
+  @MethodSource("argumentsProvider")
+  public void testMetadataAggregateFromWriteStatus(HoodieFileFormat baseFileFormat) throws Exception {
+    init(baseFileFormat);
+
     HoodieWriteConfig cfg = getConfigBuilder(false).withWriteStatusClass(MetadataMergeWriteStatus.class).build();
     try (HoodieWriteClient client = getHoodieWriteClient(cfg);) {
 
@@ -305,8 +304,11 @@ public class TestHoodieMergeOnReadTable extends HoodieClientTestHarness {
     }
   }
 
-  @Test
-  public void testSimpleInsertUpdateAndDelete() throws Exception {
+  @ParameterizedTest
+  @MethodSource("argumentsProvider")
+  public void testSimpleInsertUpdateAndDelete(HoodieFileFormat baseFileFormat) throws Exception {
+    init(baseFileFormat);
+
     HoodieWriteConfig cfg = getConfig(true);
     try (HoodieWriteClient client = getHoodieWriteClient(cfg);) {
 
@@ -332,7 +334,7 @@ public class TestHoodieMergeOnReadTable extends HoodieClientTestHarness {
       Option<HoodieInstant> commit = metaClient.getActiveTimeline().getCommitTimeline().firstInstant();
       assertFalse(commit.isPresent());
 
-      FileStatus[] allFiles = HoodieTestUtils.listAllDataFilesInPath(metaClient.getFs(), cfg.getBasePath());
+      FileStatus[] allFiles = listAllDataFilesInPath(hoodieTable, cfg.getBasePath());
       tableView = getHoodieTableFileSystemView(metaClient, metaClient.getCommitTimeline().filterCompletedInstants(), allFiles);
       Stream<HoodieBaseFile> dataFilesToRead = tableView.getLatestBaseFiles();
       assertFalse(dataFilesToRead.findAny().isPresent());
@@ -373,23 +375,25 @@ public class TestHoodieMergeOnReadTable extends HoodieClientTestHarness {
       commit = metaClient.getActiveTimeline().getCommitTimeline().firstInstant();
       assertFalse(commit.isPresent());
 
-      allFiles = HoodieTestUtils.listAllDataFilesInPath(dfs, cfg.getBasePath());
+      allFiles = listAllDataFilesInPath(hoodieTable, cfg.getBasePath());
       tableView = getHoodieTableFileSystemView(metaClient, hoodieTable.getCompletedCommitsTimeline(), allFiles);
       dataFilesToRead = tableView.getLatestBaseFiles();
       assertTrue(dataFilesToRead.findAny().isPresent());
 
       List<String> dataFiles = tableView.getLatestBaseFiles().map(HoodieBaseFile::getPath).collect(Collectors.toList());
-      List<GenericRecord> recordsRead = HoodieMergeOnReadTestUtils.getRecordsUsingInputFormat(dataFiles, basePath);
+      List<GenericRecord> recordsRead = HoodieMergeOnReadTestUtils.getRecordsUsingInputFormat(hadoopConf, dataFiles, basePath);
       // Wrote 20 records and deleted 20 records, so remaining 20-20 = 0
       assertEquals(0, recordsRead.size(), "Must contain 0 records");
     }
   }
 
-  @Test
-  public void testCOWToMORConvertedTableRollback() throws Exception {
+  @ParameterizedTest
+  @MethodSource("argumentsProvider")
+  public void testCOWToMORConvertedTableRollback(HoodieFileFormat baseFileFormat) throws Exception {
+    init(baseFileFormat);
 
     // Set TableType to COW
-    HoodieTestUtils.init(hadoopConf, basePath, HoodieTableType.COPY_ON_WRITE);
+    HoodieTestUtils.init(hadoopConf, basePath, HoodieTableType.COPY_ON_WRITE, baseFileFormat);
 
     HoodieWriteConfig cfg = getConfig(true);
     try (HoodieWriteClient client = getHoodieWriteClient(cfg);) {
@@ -425,14 +429,14 @@ public class TestHoodieMergeOnReadTable extends HoodieClientTestHarness {
       assertNoWriteErrors(statuses);
 
       // Set TableType to MOR
-      HoodieTestUtils.init(hadoopConf, basePath, HoodieTableType.MERGE_ON_READ);
+      HoodieTestUtils.init(hadoopConf, basePath, HoodieTableType.MERGE_ON_READ, baseFileFormat);
 
       // rollback a COW commit when TableType is MOR
       client.rollback(newCommitTime);
 
       metaClient = HoodieTableMetaClient.reload(metaClient);
       HoodieTable hoodieTable = HoodieTable.create(metaClient, cfg, hadoopConf);
-      FileStatus[] allFiles = HoodieTestUtils.listAllDataFilesInPath(metaClient.getFs(), cfg.getBasePath());
+      FileStatus[] allFiles = listAllDataFilesInPath(hoodieTable, cfg.getBasePath());
       tableView = getHoodieTableFileSystemView(metaClient, hoodieTable.getCompletedCommitsTimeline(), allFiles);
 
       final String absentCommit = newCommitTime;
@@ -440,8 +444,10 @@ public class TestHoodieMergeOnReadTable extends HoodieClientTestHarness {
     }
   }
 
-  @Test
-  public void testRollbackWithDeltaAndCompactionCommit() throws Exception {
+  @ParameterizedTest
+  @MethodSource("argumentsProvider")
+  public void testRollbackWithDeltaAndCompactionCommit(HoodieFileFormat baseFileFormat) throws Exception {
+    init(baseFileFormat);
 
     HoodieWriteConfig cfg = getConfig(false);
     try (HoodieWriteClient client = getHoodieWriteClient(cfg);) {
@@ -471,7 +477,7 @@ public class TestHoodieMergeOnReadTable extends HoodieClientTestHarness {
       Option<HoodieInstant> commit = metaClient.getActiveTimeline().getCommitTimeline().firstInstant();
       assertFalse(commit.isPresent());
 
-      FileStatus[] allFiles = HoodieTestUtils.listAllDataFilesInPath(metaClient.getFs(), cfg.getBasePath());
+      FileStatus[] allFiles = listAllDataFilesInPath(hoodieTable, cfg.getBasePath());
       tableView =
           getHoodieTableFileSystemView(metaClient, metaClient.getCommitTimeline().filterCompletedInstants(), allFiles);
       Stream<HoodieBaseFile> dataFilesToRead = tableView.getLatestBaseFiles();
@@ -495,7 +501,8 @@ public class TestHoodieMergeOnReadTable extends HoodieClientTestHarness {
         copyOfRecords.addAll(dataGen.generateInserts(commitTime1, 200));
 
         List<String> dataFiles = tableView.getLatestBaseFiles().map(HoodieBaseFile::getPath).collect(Collectors.toList());
-        List<GenericRecord> recordsRead = HoodieMergeOnReadTestUtils.getRecordsUsingInputFormat(dataFiles, basePath);
+        List<GenericRecord> recordsRead = HoodieMergeOnReadTestUtils.getRecordsUsingInputFormat(hadoopConf, dataFiles,
+            basePath);
         assertEquals(recordsRead.size(), 200);
 
         statuses = secondClient.upsert(jsc.parallelize(copyOfRecords, 1), commitTime1).collect();
@@ -504,12 +511,12 @@ public class TestHoodieMergeOnReadTable extends HoodieClientTestHarness {
 
         // Test failed delta commit rollback
         secondClient.rollback(commitTime1);
-        allFiles = HoodieTestUtils.listAllDataFilesInPath(metaClient.getFs(), cfg.getBasePath());
-        // After rollback, there should be no parquet file with the failed commit time
+        allFiles = listAllDataFilesInPath(hoodieTable, cfg.getBasePath());
+        // After rollback, there should be no base file with the failed commit time
         assertEquals(0, Arrays.stream(allFiles)
             .filter(file -> file.getPath().getName().contains(commitTime1)).count());
         dataFiles = tableView.getLatestBaseFiles().map(HoodieBaseFile::getPath).collect(Collectors.toList());
-        recordsRead = HoodieMergeOnReadTestUtils.getRecordsUsingInputFormat(dataFiles, basePath);
+        recordsRead = HoodieMergeOnReadTestUtils.getRecordsUsingInputFormat(hadoopConf, dataFiles, basePath);
         assertEquals(200, recordsRead.size());
       }
 
@@ -525,7 +532,8 @@ public class TestHoodieMergeOnReadTable extends HoodieClientTestHarness {
         copyOfRecords.addAll(dataGen.generateInserts(commitTime2, 200));
 
         List<String> dataFiles = tableView.getLatestBaseFiles().map(HoodieBaseFile::getPath).collect(Collectors.toList());
-        List<GenericRecord> recordsRead = HoodieMergeOnReadTestUtils.getRecordsUsingInputFormat(dataFiles, basePath);
+        List<GenericRecord> recordsRead = HoodieMergeOnReadTestUtils.getRecordsUsingInputFormat(hadoopConf, dataFiles,
+            basePath);
         assertEquals(200, recordsRead.size());
 
         writeRecords = jsc.parallelize(copyOfRecords, 1);
@@ -537,7 +545,7 @@ public class TestHoodieMergeOnReadTable extends HoodieClientTestHarness {
 
         // Test successful delta commit rollback
         thirdClient.rollback(commitTime2);
-        allFiles = HoodieTestUtils.listAllDataFilesInPath(metaClient.getFs(), cfg.getBasePath());
+        allFiles = listAllDataFilesInPath(hoodieTable, cfg.getBasePath());
         // After rollback, there should be no parquet file with the failed commit time
         assertEquals(0, Arrays.stream(allFiles)
             .filter(file -> file.getPath().getName().contains(commitTime2)).count());
@@ -546,7 +554,7 @@ public class TestHoodieMergeOnReadTable extends HoodieClientTestHarness {
         hoodieTable = HoodieTable.create(metaClient, cfg, hadoopConf);
         tableView = getHoodieTableFileSystemView(metaClient, hoodieTable.getCompletedCommitsTimeline(), allFiles);
         dataFiles = tableView.getLatestBaseFiles().map(HoodieBaseFile::getPath).collect(Collectors.toList());
-        recordsRead = HoodieMergeOnReadTestUtils.getRecordsUsingInputFormat(dataFiles, basePath);
+        recordsRead = HoodieMergeOnReadTestUtils.getRecordsUsingInputFormat(hadoopConf, dataFiles, basePath);
         // check that the number of records read is still correct after rollback operation
         assertEquals(200, recordsRead.size());
 
@@ -569,7 +577,7 @@ public class TestHoodieMergeOnReadTable extends HoodieClientTestHarness {
         JavaRDD<WriteStatus> ws = thirdClient.compact(compactionInstantTime);
         thirdClient.commitCompaction(compactionInstantTime, ws, Option.empty());
 
-        allFiles = HoodieTestUtils.listAllDataFilesInPath(metaClient.getFs(), cfg.getBasePath());
+        allFiles = listAllDataFilesInPath(hoodieTable, cfg.getBasePath());
         metaClient = HoodieTableMetaClient.reload(metaClient);
         tableView = getHoodieTableFileSystemView(metaClient, metaClient.getCommitsTimeline(), allFiles);
 
@@ -580,7 +588,7 @@ public class TestHoodieMergeOnReadTable extends HoodieClientTestHarness {
 
         thirdClient.rollback(compactedCommitTime);
 
-        allFiles = HoodieTestUtils.listAllDataFilesInPath(metaClient.getFs(), cfg.getBasePath());
+        allFiles = listAllDataFilesInPath(hoodieTable, cfg.getBasePath());
         metaClient = HoodieTableMetaClient.reload(metaClient);
         tableView = getHoodieTableFileSystemView(metaClient, metaClient.getCommitsTimeline(), allFiles);
 
@@ -589,8 +597,10 @@ public class TestHoodieMergeOnReadTable extends HoodieClientTestHarness {
     }
   }
 
-  @Test
-  public void testMultiRollbackWithDeltaAndCompactionCommit() throws Exception {
+  @ParameterizedTest
+  @MethodSource("argumentsProvider")
+  public void testMultiRollbackWithDeltaAndCompactionCommit(HoodieFileFormat baseFileFormat) throws Exception {
+    init(baseFileFormat);
 
     HoodieWriteConfig cfg = getConfig(false);
     try (final HoodieWriteClient client = getHoodieWriteClient(cfg);) {
@@ -618,7 +628,7 @@ public class TestHoodieMergeOnReadTable extends HoodieClientTestHarness {
       Option<HoodieInstant> commit = metaClient.getActiveTimeline().getCommitTimeline().firstInstant();
       assertFalse(commit.isPresent());
 
-      FileStatus[] allFiles = HoodieTestUtils.listAllDataFilesInPath(metaClient.getFs(), cfg.getBasePath());
+      FileStatus[] allFiles = listAllDataFilesInPath(hoodieTable, cfg.getBasePath());
       tableView = getHoodieTableFileSystemView(metaClient, metaClient.getCommitTimeline().filterCompletedInstants(), allFiles);
       Stream<HoodieBaseFile> dataFilesToRead = tableView.getLatestBaseFiles();
       assertFalse(dataFilesToRead.findAny().isPresent());
@@ -641,7 +651,8 @@ public class TestHoodieMergeOnReadTable extends HoodieClientTestHarness {
       copyOfRecords.addAll(dataGen.generateInserts(newCommitTime, 200));
 
       List<String> dataFiles = tableView.getLatestBaseFiles().map(hf -> hf.getPath()).collect(Collectors.toList());
-      List<GenericRecord> recordsRead = HoodieMergeOnReadTestUtils.getRecordsUsingInputFormat(dataFiles, basePath);
+      List<GenericRecord> recordsRead = HoodieMergeOnReadTestUtils.getRecordsUsingInputFormat(hadoopConf, dataFiles,
+          basePath);
       assertEquals(200, recordsRead.size());
 
       statuses = nClient.upsert(jsc.parallelize(copyOfRecords, 1), newCommitTime).collect();
@@ -696,7 +707,7 @@ public class TestHoodieMergeOnReadTable extends HoodieClientTestHarness {
       JavaRDD<WriteStatus> ws = client.compact(compactionInstantTime);
       client.commitCompaction(compactionInstantTime, ws, Option.empty());
 
-      allFiles = HoodieTestUtils.listAllDataFilesInPath(metaClient.getFs(), cfg.getBasePath());
+      allFiles = listAllDataFilesInPath(hoodieTable, cfg.getBasePath());
       metaClient = HoodieTableMetaClient.reload(metaClient);
       tableView = getHoodieTableFileSystemView(metaClient, metaClient.getCommitsTimeline(), allFiles);
 
@@ -724,7 +735,7 @@ public class TestHoodieMergeOnReadTable extends HoodieClientTestHarness {
       client.restoreToInstant("000");
 
       metaClient = HoodieTableMetaClient.reload(metaClient);
-      allFiles = HoodieTestUtils.listAllDataFilesInPath(metaClient.getFs(), cfg.getBasePath());
+      allFiles = listAllDataFilesInPath(hoodieTable, cfg.getBasePath());
       tableView = getHoodieTableFileSystemView(metaClient, metaClient.getCommitTimeline().filterCompletedInstants(), allFiles);
       dataFilesToRead = tableView.getLatestBaseFiles();
       assertFalse(dataFilesToRead.findAny().isPresent());
@@ -751,8 +762,11 @@ public class TestHoodieMergeOnReadTable extends HoodieClientTestHarness {
         .build();
   }
 
-  @Test
-  public void testUpsertPartitioner() throws Exception {
+  @ParameterizedTest
+  @MethodSource("argumentsProvider")
+  public void testUpsertPartitioner(HoodieFileFormat baseFileFormat) throws Exception {
+    init(baseFileFormat);
+
     HoodieWriteConfig cfg = getConfig(true);
     try (HoodieWriteClient client = getHoodieWriteClient(cfg);) {
 
@@ -778,7 +792,7 @@ public class TestHoodieMergeOnReadTable extends HoodieClientTestHarness {
       Option<HoodieInstant> commit = metaClient.getActiveTimeline().getCommitTimeline().firstInstant();
       assertFalse(commit.isPresent());
 
-      FileStatus[] allFiles = HoodieTestUtils.listAllDataFilesInPath(metaClient.getFs(), cfg.getBasePath());
+      FileStatus[] allFiles = listAllDataFilesInPath(hoodieTable, cfg.getBasePath());
       BaseFileOnlyView roView = getHoodieTableFileSystemView(metaClient,
           metaClient.getCommitsTimeline().filterCompletedInstants(), allFiles);
       Stream<HoodieBaseFile> dataFilesToRead = roView.getLatestBaseFiles();
@@ -812,7 +826,7 @@ public class TestHoodieMergeOnReadTable extends HoodieClientTestHarness {
       commit = metaClient.getActiveTimeline().getCommitTimeline().firstInstant();
       assertFalse(commit.isPresent());
 
-      allFiles = HoodieTestUtils.listAllDataFilesInPath(metaClient.getFs(), cfg.getBasePath());
+      allFiles = listAllDataFilesInPath(hoodieTable, cfg.getBasePath());
       roView = getHoodieTableFileSystemView(metaClient,
           hoodieTable.getActiveTimeline().reload().getCommitsTimeline().filterCompletedInstants(), allFiles);
       dataFilesToRead = roView.getLatestBaseFiles();
@@ -823,14 +837,18 @@ public class TestHoodieMergeOnReadTable extends HoodieClientTestHarness {
       assertTrue(parquetFileIdToNewSize.entrySet().stream().anyMatch(entry -> parquetFileIdToSize.get(entry.getKey()) < entry.getValue()));
 
       List<String> dataFiles = roView.getLatestBaseFiles().map(HoodieBaseFile::getPath).collect(Collectors.toList());
-      List<GenericRecord> recordsRead = HoodieMergeOnReadTestUtils.getRecordsUsingInputFormat(dataFiles, basePath);
+      List<GenericRecord> recordsRead = HoodieMergeOnReadTestUtils.getRecordsUsingInputFormat(hadoopConf, dataFiles,
+          basePath);
       // Wrote 20 records in 2 batches
       assertEquals(40, recordsRead.size(), "Must contain 40 records");
     }
   }
 
-  @Test
-  public void testLogFileCountsAfterCompaction() throws Exception {
+  @ParameterizedTest
+  @MethodSource("argumentsProvider")
+  public void testLogFileCountsAfterCompaction(HoodieFileFormat baseFileFormat) throws Exception {
+    init(baseFileFormat);
+
     // insert 100 records
     HoodieWriteConfig config = getConfig(true);
     try (HoodieWriteClient writeClient = getHoodieWriteClient(config);) {
@@ -902,8 +920,11 @@ public class TestHoodieMergeOnReadTable extends HoodieClientTestHarness {
     }
   }
 
-  @Test
-  public void testSimpleInsertsGeneratedIntoLogFiles() throws Exception {
+  @ParameterizedTest
+  @MethodSource("argumentsProvider")
+  public void testSimpleInsertsGeneratedIntoLogFiles(HoodieFileFormat baseFileFormat) throws Exception {
+    init(baseFileFormat);
+
     // insert 100 records
     // Setting IndexType to be InMemory to simulate Global Index nature
     HoodieWriteConfig config = getConfigBuilder(false, IndexType.INMEMORY).build();
@@ -939,8 +960,11 @@ public class TestHoodieMergeOnReadTable extends HoodieClientTestHarness {
     }
   }
 
-  @Test
-  public void testInsertsGeneratedIntoLogFilesRollback(@TempDir java.nio.file.Path tempFolder) throws Exception {
+  @ParameterizedTest
+  @MethodSource("argumentsProvider")
+  public void testInsertsGeneratedIntoLogFilesRollback(HoodieFileFormat baseFileFormat) throws Exception {
+    init(baseFileFormat);
+
     // insert 100 records
     // Setting IndexType to be InMemory to simulate Global Index nature
     HoodieWriteConfig config = getConfigBuilder(false, IndexType.INMEMORY).build();
@@ -1010,8 +1034,11 @@ public class TestHoodieMergeOnReadTable extends HoodieClientTestHarness {
     }
   }
 
-  @Test
-  public void testInsertsGeneratedIntoLogFilesRollbackAfterCompaction() throws Exception {
+  @ParameterizedTest
+  @MethodSource("argumentsProvider")
+  public void testInsertsGeneratedIntoLogFilesRollbackAfterCompaction(HoodieFileFormat baseFileFormat) throws Exception {
+    init(baseFileFormat);
+
     // insert 100 records
     // Setting IndexType to be InMemory to simulate Global Index nature
     HoodieWriteConfig config = getConfigBuilder(false, IndexType.INMEMORY).build();
@@ -1063,8 +1090,10 @@ public class TestHoodieMergeOnReadTable extends HoodieClientTestHarness {
   /**
    * Test to ensure rolling stats are correctly written to metadata file.
    */
-  @Test
-  public void testRollingStatsInMetadata() throws Exception {
+  @ParameterizedTest
+  @MethodSource("argumentsProvider")
+  public void testRollingStatsInMetadata(HoodieFileFormat baseFileFormat) throws Exception {
+    init(baseFileFormat);
 
     HoodieWriteConfig cfg = getConfigBuilder(false, IndexType.INMEMORY).withAutoCommit(false).build();
     try (HoodieWriteClient client = getHoodieWriteClient(cfg);) {
@@ -1163,8 +1192,11 @@ public class TestHoodieMergeOnReadTable extends HoodieClientTestHarness {
   /**
    * Test to ensure rolling stats are correctly written to the metadata file, identifies small files and corrects them.
    */
-  @Test
-  public void testRollingStatsWithSmallFileHandling() throws Exception {
+  @ParameterizedTest
+  @MethodSource("argumentsProvider")
+  public void testRollingStatsWithSmallFileHandling(HoodieFileFormat baseFileFormat) throws Exception {
+    init(baseFileFormat);
+
     HoodieWriteConfig cfg = getConfigBuilder(false, IndexType.INMEMORY).withAutoCommit(false).build();
     try (HoodieWriteClient client = getHoodieWriteClient(cfg);) {
       Map<String, Long> fileIdToInsertsMap = new HashMap<>();
@@ -1296,8 +1328,11 @@ public class TestHoodieMergeOnReadTable extends HoodieClientTestHarness {
   /**
    * Test to validate invoking table.handleUpdate() with input records from multiple partitions will fail.
    */
-  @Test
-  public void testHandleUpdateWithMultiplePartitions() throws Exception {
+  @ParameterizedTest
+  @MethodSource("argumentsProvider")
+  public void testHandleUpdateWithMultiplePartitions(HoodieFileFormat baseFileFormat) throws Exception {
+    init(baseFileFormat);
+
     HoodieWriteConfig cfg = getConfig(true);
     try (HoodieWriteClient client = getHoodieWriteClient(cfg);) {
 
@@ -1323,7 +1358,7 @@ public class TestHoodieMergeOnReadTable extends HoodieClientTestHarness {
       Option<HoodieInstant> commit = metaClient.getActiveTimeline().getCommitTimeline().firstInstant();
       assertFalse(commit.isPresent());
 
-      FileStatus[] allFiles = HoodieTestUtils.listAllDataFilesInPath(metaClient.getFs(), cfg.getBasePath());
+      FileStatus[] allFiles = listAllDataFilesInPath(hoodieTable, cfg.getBasePath());
       BaseFileOnlyView roView =
           getHoodieTableFileSystemView(metaClient, metaClient.getCommitTimeline().filterCompletedInstants(), allFiles);
       Stream<HoodieBaseFile> dataFilesToRead = roView.getLatestBaseFiles();
@@ -1401,7 +1436,7 @@ public class TestHoodieMergeOnReadTable extends HoodieClientTestHarness {
       assertFalse(status.hasErrors(), "Errors found in write of " + status.getFileId());
     }
   }
-  
+
   private FileStatus[] insertAndGetFilePaths(List<HoodieRecord> records, HoodieWriteClient client,
                                              HoodieWriteConfig cfg, String commitTime) throws IOException {
     JavaRDD<HoodieRecord> writeRecords = jsc.parallelize(records, 1);
@@ -1419,7 +1454,7 @@ public class TestHoodieMergeOnReadTable extends HoodieClientTestHarness {
     Option<HoodieInstant> commit = metaClient.getActiveTimeline().getCommitTimeline().lastInstant();
     assertFalse(commit.isPresent());
 
-    FileStatus[] allFiles = HoodieTestUtils.listAllDataFilesInPath(metaClient.getFs(), cfg.getBasePath());
+    FileStatus[] allFiles = listAllDataFilesInPath(hoodieTable, cfg.getBasePath());
     BaseFileOnlyView roView =
         getHoodieTableFileSystemView(metaClient, metaClient.getCommitTimeline().filterCompletedInstants(), allFiles);
     Stream<HoodieBaseFile> dataFilesToRead = roView.getLatestBaseFiles();
@@ -1452,14 +1487,14 @@ public class TestHoodieMergeOnReadTable extends HoodieClientTestHarness {
 
     Option<HoodieInstant> commit = metaClient.getActiveTimeline().getCommitTimeline().firstInstant();
     assertFalse(commit.isPresent());
-    return HoodieTestUtils.listAllDataFilesInPath(metaClient.getFs(), cfg.getBasePath());
+    HoodieTable hoodieTable = HoodieTable.create(metaClient, cfg, hadoopConf);
+    return listAllDataFilesInPath(hoodieTable, cfg.getBasePath());
   }
 
   private FileStatus[] getROSnapshotFiles(String partitionPath)
       throws Exception {
-    HoodieTestUtils.init(hadoopConf, basePath, HoodieTableType.MERGE_ON_READ);
     FileInputFormat.setInputPaths(roSnapshotJobConf, basePath + "/" + partitionPath);
-    return roSnapshotInputFormat.listStatus(roSnapshotJobConf);
+    return listStatus(roSnapshotJobConf, false);
   }
 
   private FileStatus[] getROIncrementalFiles(String partitionPath, boolean stopAtCompaction)
@@ -1469,10 +1504,9 @@ public class TestHoodieMergeOnReadTable extends HoodieClientTestHarness {
 
   private FileStatus[] getROIncrementalFiles(String partitionPath, String startCommitTime, int numCommitsToPull, boolean stopAtCompaction)
           throws Exception {
-    HoodieTestUtils.init(hadoopConf, basePath, HoodieTableType.MERGE_ON_READ);
     setupIncremental(roJobConf, startCommitTime, numCommitsToPull, stopAtCompaction);
     FileInputFormat.setInputPaths(roJobConf, Paths.get(basePath, partitionPath).toString());
-    return roInputFormat.listStatus(roJobConf);
+    return listStatus(roJobConf, false);
   }
 
   private FileStatus[] getRTIncrementalFiles(String partitionPath)
@@ -1482,10 +1516,9 @@ public class TestHoodieMergeOnReadTable extends HoodieClientTestHarness {
 
   private FileStatus[] getRTIncrementalFiles(String partitionPath, String startCommitTime, int numCommitsToPull)
           throws Exception {
-    HoodieTestUtils.init(hadoopConf, basePath, HoodieTableType.MERGE_ON_READ);
     setupIncremental(rtJobConf, startCommitTime, numCommitsToPull, false);
     FileInputFormat.setInputPaths(rtJobConf, Paths.get(basePath, partitionPath).toString());
-    return rtInputFormat.listStatus(rtJobConf);
+    return listStatus(rtJobConf, true);
   }
 
   private void setupIncremental(JobConf jobConf, String startCommit, int numberOfCommitsToPull, boolean stopAtCompaction) {
@@ -1507,16 +1540,37 @@ public class TestHoodieMergeOnReadTable extends HoodieClientTestHarness {
   }
 
   private void validateFiles(String partitionPath, int expectedNumFiles,
-                             FileStatus[] files, HoodieParquetInputFormat inputFormat,
-                             JobConf jobConf, int expectedRecords, String... expectedCommits) {
+                             FileStatus[] files, boolean realtime, JobConf jobConf,
+                             int expectedRecords, String... expectedCommits) {
 
     assertEquals(expectedNumFiles, files.length);
     Set<String> expectedCommitsSet = Arrays.stream(expectedCommits).collect(Collectors.toSet());
-    List<GenericRecord> records = HoodieMergeOnReadTestUtils.getRecordsUsingInputFormat(
-        Collections.singletonList(Paths.get(basePath, partitionPath).toString()), basePath, jobConf, inputFormat);
+    List<GenericRecord> records = HoodieMergeOnReadTestUtils.getRecordsUsingInputFormat(hadoopConf,
+        Collections.singletonList(Paths.get(basePath, partitionPath).toString()), basePath, jobConf, realtime);
     assertEquals(expectedRecords, records.size());
     Set<String> actualCommits = records.stream().map(r ->
             r.get(HoodieRecord.COMMIT_TIME_METADATA_FIELD).toString()).collect(Collectors.toSet());
     assertEquals(expectedCommitsSet, actualCommits);
   }
+
+  private FileStatus[] listAllDataFilesInPath(HoodieTable table, String basePath) throws IOException {
+    return HoodieTestUtils.listAllDataFilesInPath(metaClient.getFs(), basePath, table.getBaseFileExtension());
+  }
+
+  private FileStatus[] listStatus(JobConf jobConf, boolean realtime) throws IOException {
+    // This is required as Hoodie InputFormats do not extend a common base class and FileInputFormat's
+    // listStatus() is protected.
+    FileInputFormat inputFormat = HoodieInputFormatUtils.getInputFormat(baseFileFormat, realtime, jobConf);
+    switch (baseFileFormat) {
+      case PARQUET:
+        if (realtime) {
+          return ((HoodieParquetRealtimeInputFormat)inputFormat).listStatus(jobConf);
+        } else {
+          return ((HoodieParquetInputFormat)inputFormat).listStatus(jobConf);
+        }
+      default:
+        throw new HoodieIOException("Hoodie InputFormat not implemented for base file format " + baseFileFormat);
+    }
+  }
 }
+
diff --git a/hudi-client/src/test/java/org/apache/hudi/testutils/HoodieClientTestUtils.java b/hudi-client/src/test/java/org/apache/hudi/testutils/HoodieClientTestUtils.java
index 698f901..145cde5 100644
--- a/hudi-client/src/test/java/org/apache/hudi/testutils/HoodieClientTestUtils.java
+++ b/hudi-client/src/test/java/org/apache/hudi/testutils/HoodieClientTestUtils.java
@@ -29,6 +29,7 @@ import org.apache.hudi.common.bloom.BloomFilterTypeCode;
 import org.apache.hudi.common.fs.FSUtils;
 import org.apache.hudi.common.model.HoodieBaseFile;
 import org.apache.hudi.common.model.HoodieCommitMetadata;
+import org.apache.hudi.common.model.HoodieFileFormat;
 import org.apache.hudi.common.model.HoodieKey;
 import org.apache.hudi.common.model.HoodieRecord;
 import org.apache.hudi.common.table.HoodieTableMetaClient;
@@ -181,15 +182,20 @@ public class HoodieClientTestUtils {
   /**
    * Obtain all new data written into the Hoodie table since the given timestamp.
    */
-  public static Dataset<Row> readSince(String basePath, SQLContext sqlContext, HoodieTimeline commitTimeline,
-                                       String lastCommitTime) {
+  public static Dataset<Row> readSince(String basePath, SQLContext sqlContext,
+                                       HoodieTimeline commitTimeline, String lastCommitTime) {
     List<HoodieInstant> commitsToReturn =
         commitTimeline.findInstantsAfter(lastCommitTime, Integer.MAX_VALUE).getInstants().collect(Collectors.toList());
     try {
       // Go over the commit metadata, and obtain the new files that need to be read.
       HashMap<String, String> fileIdToFullPath = getLatestFileIDsToFullPath(basePath, commitTimeline, commitsToReturn);
-      return sqlContext.read().parquet(fileIdToFullPath.values().toArray(new String[fileIdToFullPath.size()]))
-          .filter(String.format("%s >'%s'", HoodieRecord.COMMIT_TIME_METADATA_FIELD, lastCommitTime));
+      String[] paths = fileIdToFullPath.values().toArray(new String[fileIdToFullPath.size()]);
+      Dataset<Row> rows = null;
+      if (paths[0].endsWith(HoodieFileFormat.PARQUET.getFileExtension())) {
+        rows = sqlContext.read().parquet(paths);
+      }
+
+      return rows.filter(String.format("%s >'%s'", HoodieRecord.COMMIT_TIME_METADATA_FIELD, lastCommitTime));
     } catch (IOException e) {
       throw new HoodieException("Error pulling data incrementally from commitTimestamp :" + lastCommitTime, e);
     }
diff --git a/hudi-client/src/test/java/org/apache/hudi/testutils/HoodieMergeOnReadTestUtils.java b/hudi-client/src/test/java/org/apache/hudi/testutils/HoodieMergeOnReadTestUtils.java
index 23bd333..20bcafd 100644
--- a/hudi-client/src/test/java/org/apache/hudi/testutils/HoodieMergeOnReadTestUtils.java
+++ b/hudi-client/src/test/java/org/apache/hudi/testutils/HoodieMergeOnReadTestUtils.java
@@ -19,18 +19,19 @@
 package org.apache.hudi.testutils;
 
 import org.apache.hudi.avro.HoodieAvroUtils;
+import org.apache.hudi.common.table.HoodieTableMetaClient;
 import org.apache.hudi.common.testutils.HoodieTestUtils;
-import org.apache.hudi.hadoop.HoodieParquetInputFormat;
-import org.apache.hudi.hadoop.realtime.HoodieParquetRealtimeInputFormat;
-
+import org.apache.hudi.hadoop.utils.HoodieInputFormatUtils;
 import org.apache.avro.Schema;
 import org.apache.avro.generic.GenericRecord;
 import org.apache.avro.generic.GenericRecordBuilder;
+import org.apache.hadoop.conf.Configurable;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.hive.metastore.api.hive_metastoreConstants;
 import org.apache.hadoop.hive.serde2.ColumnProjectionUtils;
 import org.apache.hadoop.io.ArrayWritable;
 import org.apache.hadoop.io.Writable;
+import org.apache.hadoop.mapred.FileInputFormat;
 import org.apache.hadoop.mapred.InputSplit;
 import org.apache.hadoop.mapred.JobConf;
 import org.apache.hadoop.mapred.RecordReader;
@@ -45,20 +46,19 @@ import java.util.stream.Collectors;
  * Utility methods to aid in testing MergeOnRead (workaround for HoodieReadClient for MOR).
  */
 public class HoodieMergeOnReadTestUtils {
-  public static List<GenericRecord> getRecordsUsingInputFormat(List<String> inputPaths, String basePath) {
-    return getRecordsUsingInputFormat(inputPaths, basePath, new Configuration());
-  }
-
-  public static List<GenericRecord> getRecordsUsingInputFormat(List<String> inputPaths, String basePath,
-      Configuration conf) {
-    JobConf jobConf = new JobConf(conf);
-    return getRecordsUsingInputFormat(inputPaths, basePath, jobConf, new HoodieParquetRealtimeInputFormat());
+  public static List<GenericRecord> getRecordsUsingInputFormat(Configuration conf, List<String> inputPaths,
+                                                               String basePath) {
+    return getRecordsUsingInputFormat(conf, inputPaths, basePath, new JobConf(conf), true);
   }
 
-  public static List<GenericRecord> getRecordsUsingInputFormat(List<String> inputPaths,
+  public static List<GenericRecord> getRecordsUsingInputFormat(Configuration conf, List<String> inputPaths,
                                                                String basePath,
                                                                JobConf jobConf,
-                                                               HoodieParquetInputFormat inputFormat) {
+                                                               boolean realtime) {
+    HoodieTableMetaClient metaClient = new HoodieTableMetaClient(conf, basePath);
+    FileInputFormat inputFormat = HoodieInputFormatUtils.getInputFormat(metaClient.getTableConfig().getBaseFileFormat(),
+        realtime, jobConf);
+
     Schema schema = HoodieAvroUtils.addMetadataFields(
         new Schema.Parser().parse(HoodieTestDataGenerator.TRIP_EXAMPLE_SCHEMA));
     setPropsForInputFormat(inputFormat, jobConf, schema, basePath);
@@ -93,8 +93,8 @@ public class HoodieMergeOnReadTestUtils {
     }).orElse(new ArrayList<GenericRecord>());
   }
 
-  private static void setPropsForInputFormat(HoodieParquetInputFormat inputFormat, JobConf jobConf,
-      Schema schema, String basePath) {
+  private static void setPropsForInputFormat(FileInputFormat inputFormat, JobConf jobConf, Schema schema,
+                                             String basePath) {
     List<Schema.Field> fields = schema.getFields();
     String names = fields.stream().map(f -> f.name().toString()).collect(Collectors.joining(","));
     String postions = fields.stream().map(f -> String.valueOf(f.pos())).collect(Collectors.joining(","));
@@ -116,7 +116,10 @@ public class HoodieMergeOnReadTestUtils {
     conf.set(ColumnProjectionUtils.READ_COLUMN_IDS_CONF_STR, postions);
     conf.set(hive_metastoreConstants.META_TABLE_PARTITION_COLUMNS, "datestr");
     conf.set(hive_metastoreConstants.META_TABLE_COLUMN_TYPES, hiveColumnTypes);
-    inputFormat.setConf(conf);
+
+    // Hoodie Input formats are also configurable
+    Configurable configurable = (Configurable)inputFormat;
+    configurable.setConf(conf);
     jobConf.addResource(conf);
   }
 
diff --git a/hudi-common/src/main/java/org/apache/hudi/common/fs/FSUtils.java b/hudi-common/src/main/java/org/apache/hudi/common/fs/FSUtils.java
index e7adaa6..fffd43f 100644
--- a/hudi-common/src/main/java/org/apache/hudi/common/fs/FSUtils.java
+++ b/hudi-common/src/main/java/org/apache/hudi/common/fs/FSUtils.java
@@ -107,15 +107,21 @@ public class FSUtils {
     return String.format("%d-%d-%d", taskPartitionId, stageId, taskAttemptId);
   }
 
+  // TODO: this should be removed
   public static String makeDataFileName(String instantTime, String writeToken, String fileId) {
-    return String.format("%s_%s_%s.parquet", fileId, writeToken, instantTime);
+    return String.format("%s_%s_%s%s", fileId, writeToken, instantTime, HoodieFileFormat.PARQUET.getFileExtension());
+  }
+
+  public static String makeDataFileName(String instantTime, String writeToken, String fileId, String fileExtension) {
+    return String.format("%s_%s_%s%s", fileId, writeToken, instantTime, fileExtension);
   }
 
   public static String makeMarkerFile(String instantTime, String writeToken, String fileId) {
     return String.format("%s_%s_%s%s", fileId, writeToken, instantTime, HoodieTableMetaClient.MARKER_EXTN);
   }
 
-  public static String translateMarkerToDataPath(String basePath, String markerPath, String instantTs) {
+  public static String translateMarkerToDataPath(String basePath, String markerPath, String instantTs,
+                                                 String baseFileExtension) {
     ValidationUtils.checkArgument(markerPath.endsWith(HoodieTableMetaClient.MARKER_EXTN));
     String markerRootPath = Path.getPathWithoutSchemeAndAuthority(
         new Path(String.format("%s/%s/%s", basePath, HoodieTableMetaClient.TEMPFOLDER_NAME, instantTs))).toString();
@@ -123,8 +129,7 @@ public class FSUtils {
     ValidationUtils.checkArgument(begin >= 0,
         "Not in marker dir. Marker Path=" + markerPath + ", Expected Marker Root=" + markerRootPath);
     String rPath = markerPath.substring(begin + markerRootPath.length() + 1);
-    return String.format("%s/%s%s", basePath, rPath.replace(HoodieTableMetaClient.MARKER_EXTN, ""),
-        HoodieFileFormat.PARQUET.getFileExtension());
+    return String.format("%s/%s%s", basePath, rPath.replace(HoodieTableMetaClient.MARKER_EXTN, ""), baseFileExtension);
   }
 
   public static String maskWithoutFileId(String instantTime, int taskPartitionId) {
@@ -195,12 +200,12 @@ public class FSUtils {
   }
 
   public static List<String> getAllDataFilesForMarkers(FileSystem fs, String basePath, String instantTs,
-      String markerDir) throws IOException {
+      String markerDir, String baseFileExtension) throws IOException {
     List<String> dataFiles = new LinkedList<>();
     processFiles(fs, markerDir, (status) -> {
       String pathStr = status.getPath().toString();
       if (pathStr.endsWith(HoodieTableMetaClient.MARKER_EXTN)) {
-        dataFiles.add(FSUtils.translateMarkerToDataPath(basePath, pathStr, instantTs));
+        dataFiles.add(FSUtils.translateMarkerToDataPath(basePath, pathStr, instantTs, baseFileExtension));
       }
       return true;
     }, false);
@@ -545,4 +550,13 @@ public class FSUtils {
         || inputStream.getWrappedStream().getClass().getCanonicalName()
             .equals("com.google.cloud.hadoop.fs.gcs.GoogleHadoopFSInputStream");
   }
+
+  public static Configuration registerFileSystem(Path file, Configuration conf) {
+    Configuration returnConf = new Configuration(conf);
+    String scheme = FSUtils.getFs(file.toString(), conf).getScheme();
+    returnConf.set("fs." + HoodieWrapperFileSystem.getHoodieScheme(scheme) + ".impl",
+        HoodieWrapperFileSystem.class.getName());
+    return returnConf;
+  }
+
 }
diff --git a/hudi-common/src/main/java/org/apache/hudi/common/table/HoodieTableMetaClient.java b/hudi-common/src/main/java/org/apache/hudi/common/table/HoodieTableMetaClient.java
index 3218ddf..9675b77 100644
--- a/hudi-common/src/main/java/org/apache/hudi/common/table/HoodieTableMetaClient.java
+++ b/hudi-common/src/main/java/org/apache/hudi/common/table/HoodieTableMetaClient.java
@@ -128,7 +128,8 @@ public class HoodieTableMetaClient implements Serializable {
     }
     this.timelineLayoutVersion = layoutVersion.orElseGet(() -> tableConfig.getTimelineLayoutVersion().get());
     this.loadActiveTimelineOnLoad = loadActiveTimelineOnLoad;
-    LOG.info("Finished Loading Table of type " + tableType + "(version=" + timelineLayoutVersion + ") from " + basePath);
+    LOG.info("Finished Loading Table of type " + tableType + "(version=" + timelineLayoutVersion + ", baseFileFormat="
+        + this.tableConfig.getBaseFileFormat() + ") from " + basePath);
     if (loadActiveTimelineOnLoad) {
       LOG.info("Loading Active commit timeline for " + basePath);
       getActiveTimeline();
@@ -299,12 +300,22 @@ public class HoodieTableMetaClient implements Serializable {
   }
 
   /**
-   * Helper method to initialize a table, with given basePath, tableType, name, archiveFolder.
+   * Helper method to initialize a table, with given basePath, tableType, name, archiveFolder, payloadClass.
    */
   public static HoodieTableMetaClient initTableType(Configuration hadoopConf, String basePath, String tableType,
       String tableName, String archiveLogFolder, String payloadClassName) throws IOException {
     return initTableType(hadoopConf, basePath, HoodieTableType.valueOf(tableType), tableName,
-        archiveLogFolder, payloadClassName, null);
+        archiveLogFolder, payloadClassName, null, null);
+  }
+
+  /**
+   * Helper method to initialize a table, with given basePath, tableType, name, archiveFolder, payloadClass and
+   * base file format.
+   */
+  public static HoodieTableMetaClient initTableType(Configuration hadoopConf, String basePath, String tableType,
+      String tableName, String archiveLogFolder, String payloadClassName, String baseFileFormat) throws IOException {
+    return initTableType(hadoopConf, basePath, HoodieTableType.valueOf(tableType), tableName,
+        archiveLogFolder, payloadClassName, null, baseFileFormat);
   }
 
   /**
@@ -312,12 +323,20 @@ public class HoodieTableMetaClient implements Serializable {
    */
   public static HoodieTableMetaClient initTableType(Configuration hadoopConf, String basePath,
       HoodieTableType tableType, String tableName, String payloadClassName) throws IOException {
-    return initTableType(hadoopConf, basePath, tableType, tableName, null, payloadClassName, null);
+    return initTableType(hadoopConf, basePath, tableType, tableName, null, payloadClassName, null, null);
+  }
+
+  public static HoodieTableMetaClient initTableType(Configuration hadoopConf, String basePath,
+                                                    HoodieTableType tableType, String tableName,
+                                                    String archiveLogFolder, String payloadClassName,
+                                                    Integer timelineLayoutVersion) throws IOException {
+    return initTableType(hadoopConf, basePath, tableType, tableName, archiveLogFolder, payloadClassName,
+        timelineLayoutVersion, null);
   }
 
   public static HoodieTableMetaClient initTableType(Configuration hadoopConf, String basePath,
       HoodieTableType tableType, String tableName, String archiveLogFolder, String payloadClassName,
-      Integer timelineLayoutVersion) throws IOException {
+      Integer timelineLayoutVersion, String baseFileFormat) throws IOException {
     Properties properties = new Properties();
     properties.setProperty(HoodieTableConfig.HOODIE_TABLE_NAME_PROP_NAME, tableName);
     properties.setProperty(HoodieTableConfig.HOODIE_TABLE_TYPE_PROP_NAME, tableType.name());
@@ -332,6 +351,10 @@ public class HoodieTableMetaClient implements Serializable {
     if (null != timelineLayoutVersion) {
       properties.put(HoodieTableConfig.HOODIE_TIMELINE_LAYOUT_VERSION, String.valueOf(timelineLayoutVersion));
     }
+
+    if (null != baseFileFormat) {
+      properties.setProperty(HoodieTableConfig.HOODIE_BASE_FILE_FORMAT_PROP_NAME, baseFileFormat.toUpperCase());
+    }
     return HoodieTableMetaClient.initTableAndGetMetaClient(hadoopConf, basePath, properties);
   }
 
diff --git a/hudi-common/src/main/java/org/apache/hudi/common/table/TableSchemaResolver.java b/hudi-common/src/main/java/org/apache/hudi/common/table/TableSchemaResolver.java
index 129f85f..db68667 100644
--- a/hudi-common/src/main/java/org/apache/hudi/common/table/TableSchemaResolver.java
+++ b/hudi-common/src/main/java/org/apache/hudi/common/table/TableSchemaResolver.java
@@ -31,7 +31,7 @@ import org.apache.hudi.common.model.HoodieFileFormat;
 import org.apache.hudi.common.model.HoodieLogFile;
 import org.apache.hudi.common.table.log.HoodieLogFormat;
 import org.apache.hudi.common.table.log.HoodieLogFormat.Reader;
-import org.apache.hudi.common.table.log.block.HoodieAvroDataBlock;
+import org.apache.hudi.common.table.log.block.HoodieDataBlock;
 import org.apache.hudi.common.table.log.block.HoodieLogBlock;
 import org.apache.hudi.common.table.timeline.HoodieActiveTimeline;
 import org.apache.hudi.common.table.timeline.HoodieInstant;
@@ -356,20 +356,7 @@ public class TableSchemaResolver {
    * @return
    */
   public MessageType readSchemaFromLogFile(Path path) throws IOException {
-    FileSystem fs = metaClient.getRawFs();
-    Reader reader = HoodieLogFormat.newReader(fs, new HoodieLogFile(path), null);
-    HoodieAvroDataBlock lastBlock = null;
-    while (reader.hasNext()) {
-      HoodieLogBlock block = reader.next();
-      if (block instanceof HoodieAvroDataBlock) {
-        lastBlock = (HoodieAvroDataBlock) block;
-      }
-    }
-    reader.close();
-    if (lastBlock != null) {
-      return new AvroSchemaConverter().convert(lastBlock.getSchema());
-    }
-    return null;
+    return readSchemaFromLogFile(metaClient.getRawFs(), path);
   }
 
   /**
@@ -394,11 +381,11 @@ public class TableSchemaResolver {
    */
   public static MessageType readSchemaFromLogFile(FileSystem fs, Path path) throws IOException {
     Reader reader = HoodieLogFormat.newReader(fs, new HoodieLogFile(path), null);
-    HoodieAvroDataBlock lastBlock = null;
+    HoodieDataBlock lastBlock = null;
     while (reader.hasNext()) {
       HoodieLogBlock block = reader.next();
-      if (block instanceof HoodieAvroDataBlock) {
-        lastBlock = (HoodieAvroDataBlock) block;
+      if (block instanceof HoodieDataBlock) {
+        lastBlock = (HoodieDataBlock) block;
       }
     }
     reader.close();
diff --git a/hudi-common/src/main/java/org/apache/hudi/common/table/log/AbstractHoodieLogRecordScanner.java b/hudi-common/src/main/java/org/apache/hudi/common/table/log/AbstractHoodieLogRecordScanner.java
index a35a9ee..806d55f 100644
--- a/hudi-common/src/main/java/org/apache/hudi/common/table/log/AbstractHoodieLogRecordScanner.java
+++ b/hudi-common/src/main/java/org/apache/hudi/common/table/log/AbstractHoodieLogRecordScanner.java
@@ -25,6 +25,7 @@ import org.apache.hudi.common.model.HoodieRecordPayload;
 import org.apache.hudi.common.table.HoodieTableMetaClient;
 import org.apache.hudi.common.table.log.block.HoodieAvroDataBlock;
 import org.apache.hudi.common.table.log.block.HoodieCommandBlock;
+import org.apache.hudi.common.table.log.block.HoodieDataBlock;
 import org.apache.hudi.common.table.log.block.HoodieDeleteBlock;
 import org.apache.hudi.common.table.log.block.HoodieLogBlock;
 import org.apache.hudi.common.table.timeline.HoodieTimeline;
@@ -267,7 +268,7 @@ public abstract class AbstractHoodieLogRecordScanner {
    * Iterate over the GenericRecord in the block, read the hoodie key and partition path and call subclass processors to
    * handle it.
    */
-  private void processAvroDataBlock(HoodieAvroDataBlock dataBlock) throws Exception {
+  private void processDataBlock(HoodieDataBlock dataBlock) throws Exception {
     // TODO (NA) - Implement getRecordItr() in HoodieAvroDataBlock and use that here
     List<IndexedRecord> recs = dataBlock.getRecords();
     totalLogRecords.addAndGet(recs.size());
@@ -302,7 +303,7 @@ public abstract class AbstractHoodieLogRecordScanner {
       HoodieLogBlock lastBlock = lastBlocks.pollLast();
       switch (lastBlock.getBlockType()) {
         case AVRO_DATA_BLOCK:
-          processAvroDataBlock((HoodieAvroDataBlock) lastBlock);
+          processDataBlock((HoodieAvroDataBlock) lastBlock);
           break;
         case DELETE_BLOCK:
           Arrays.stream(((HoodieDeleteBlock) lastBlock).getKeysToDelete()).forEach(this::processNextDeletedKey);
diff --git a/hudi-common/src/main/java/org/apache/hudi/common/table/log/HoodieLogFileReader.java b/hudi-common/src/main/java/org/apache/hudi/common/table/log/HoodieLogFileReader.java
index 1f349ad..38bf83c 100644
--- a/hudi-common/src/main/java/org/apache/hudi/common/table/log/HoodieLogFileReader.java
+++ b/hudi-common/src/main/java/org/apache/hudi/common/table/log/HoodieLogFileReader.java
@@ -193,7 +193,7 @@ public class HoodieLogFileReader implements HoodieLogFormat.Reader {
         if (nextBlockVersion.getVersion() == HoodieLogFormatVersion.DEFAULT_VERSION) {
           return HoodieAvroDataBlock.getBlock(content, readerSchema);
         } else {
-          return HoodieAvroDataBlock.getBlock(logFile, inputStream, Option.ofNullable(content), readBlockLazily,
+          return new HoodieAvroDataBlock(logFile, inputStream, Option.ofNullable(content), readBlockLazily,
               contentPosition, contentLength, blockEndPos, readerSchema, header, footer);
         }
       case DELETE_BLOCK:
diff --git a/hudi-common/src/main/java/org/apache/hudi/common/table/log/LogReaderUtils.java b/hudi-common/src/main/java/org/apache/hudi/common/table/log/LogReaderUtils.java
index bb079de..0662e68 100644
--- a/hudi-common/src/main/java/org/apache/hudi/common/table/log/LogReaderUtils.java
+++ b/hudi-common/src/main/java/org/apache/hudi/common/table/log/LogReaderUtils.java
@@ -22,7 +22,7 @@ import org.apache.hudi.common.fs.FSUtils;
 import org.apache.hudi.common.model.HoodieLogFile;
 import org.apache.hudi.common.table.HoodieTableMetaClient;
 import org.apache.hudi.common.table.log.HoodieLogFormat.Reader;
-import org.apache.hudi.common.table.log.block.HoodieAvroDataBlock;
+import org.apache.hudi.common.table.log.block.HoodieDataBlock;
 import org.apache.hudi.common.table.log.block.HoodieLogBlock;
 import org.apache.hudi.common.table.log.block.HoodieLogBlock.HeaderMetadataType;
 import org.apache.hudi.common.table.timeline.HoodieActiveTimeline;
@@ -49,8 +49,8 @@ public class LogReaderUtils {
     HoodieTimeline completedTimeline = activeTimeline.getCommitsTimeline().filterCompletedInstants();
     while (reader.hasPrev()) {
       HoodieLogBlock block = reader.prev();
-      if (block instanceof HoodieAvroDataBlock) {
-        HoodieAvroDataBlock lastBlock = (HoodieAvroDataBlock) block;
+      if (block instanceof HoodieDataBlock) {
+        HoodieDataBlock lastBlock = (HoodieDataBlock) block;
         if (completedTimeline
             .containsOrBeforeTimelineStarts(lastBlock.getLogBlockHeader().get(HeaderMetadataType.INSTANT_TIME))) {
           writerSchema = new Schema.Parser().parse(lastBlock.getLogBlockHeader().get(HeaderMetadataType.SCHEMA));
diff --git a/hudi-common/src/main/java/org/apache/hudi/common/table/log/block/HoodieAvroDataBlock.java b/hudi-common/src/main/java/org/apache/hudi/common/table/log/block/HoodieAvroDataBlock.java
index 0b3e822..31fc352 100644
--- a/hudi-common/src/main/java/org/apache/hudi/common/table/log/block/HoodieAvroDataBlock.java
+++ b/hudi-common/src/main/java/org/apache/hudi/common/table/log/block/HoodieAvroDataBlock.java
@@ -36,8 +36,6 @@ import org.apache.avro.io.Encoder;
 import org.apache.avro.io.EncoderFactory;
 import org.apache.hadoop.fs.FSDataInputStream;
 
-import javax.annotation.Nonnull;
-
 import java.io.ByteArrayInputStream;
 import java.io.ByteArrayOutputStream;
 import java.io.DataInputStream;
@@ -49,56 +47,42 @@ import java.util.Iterator;
 import java.util.List;
 import java.util.Map;
 
+import javax.annotation.Nonnull;
+
 /**
- * DataBlock contains a list of records serialized using Avro. The Datablock contains 1. Data Block version 2. Total
- * number of records in the block 3. Size of a record 4. Actual avro serialized content of the record
+ * HoodieAvroDataBlock contains a list of records serialized using Avro. It is used with the Parquet base file format.
  */
-public class HoodieAvroDataBlock extends HoodieLogBlock {
+public class HoodieAvroDataBlock extends HoodieDataBlock {
 
-  private List<IndexedRecord> records;
-  private Schema schema;
   private ThreadLocal<BinaryEncoder> encoderCache = new ThreadLocal<>();
   private ThreadLocal<BinaryDecoder> decoderCache = new ThreadLocal<>();
 
-  public HoodieAvroDataBlock(@Nonnull List<IndexedRecord> records, @Nonnull Map<HeaderMetadataType, String> header,
-      @Nonnull Map<HeaderMetadataType, String> footer) {
-    super(header, footer, Option.empty(), Option.empty(), null, false);
-    this.records = records;
-    this.schema = new Schema.Parser().parse(super.getLogBlockHeader().get(HeaderMetadataType.SCHEMA));
+  public HoodieAvroDataBlock(@Nonnull Map<HeaderMetadataType, String> logBlockHeader,
+       @Nonnull Map<HeaderMetadataType, String> logBlockFooter,
+       @Nonnull Option<HoodieLogBlockContentLocation> blockContentLocation, @Nonnull Option<byte[]> content,
+       FSDataInputStream inputStream, boolean readBlockLazily) {
+    super(logBlockHeader, logBlockFooter, blockContentLocation, content, inputStream, readBlockLazily);
   }
 
-  public HoodieAvroDataBlock(@Nonnull List<IndexedRecord> records, @Nonnull Map<HeaderMetadataType, String> header) {
-    this(records, header, new HashMap<>());
+  public HoodieAvroDataBlock(HoodieLogFile logFile, FSDataInputStream inputStream, Option<byte[]> content,
+       boolean readBlockLazily, long position, long blockSize, long blockEndpos, Schema readerSchema,
+       Map<HeaderMetadataType, String> header, Map<HeaderMetadataType, String> footer) {
+    super(content, inputStream, readBlockLazily,
+          Option.of(new HoodieLogBlockContentLocation(logFile, position, blockSize, blockEndpos)), readerSchema, header,
+          footer);
   }
 
-  private HoodieAvroDataBlock(Option<byte[]> content, @Nonnull FSDataInputStream inputStream, boolean readBlockLazily,
-      Option<HoodieLogBlockContentLocation> blockContentLocation, Schema readerSchema,
-      @Nonnull Map<HeaderMetadataType, String> headers, @Nonnull Map<HeaderMetadataType, String> footer) {
-    super(headers, footer, blockContentLocation, content, inputStream, readBlockLazily);
-    this.schema = readerSchema;
+  public HoodieAvroDataBlock(@Nonnull List<IndexedRecord> records, @Nonnull Map<HeaderMetadataType, String> header) {
+    super(records, header, new HashMap<>());
   }
 
-  public static HoodieLogBlock getBlock(HoodieLogFile logFile, FSDataInputStream inputStream, Option<byte[]> content,
-      boolean readBlockLazily, long position, long blockSize, long blockEndpos, Schema readerSchema,
-      Map<HeaderMetadataType, String> header, Map<HeaderMetadataType, String> footer) {
-
-    return new HoodieAvroDataBlock(content, inputStream, readBlockLazily,
-        Option.of(new HoodieLogBlockContentLocation(logFile, position, blockSize, blockEndpos)), readerSchema, header,
-        footer);
-
+  @Override
+  public HoodieLogBlockType getBlockType() {
+    return HoodieLogBlockType.AVRO_DATA_BLOCK;
   }
 
   @Override
-  public byte[] getContentBytes() throws IOException {
-
-    // In case this method is called before realizing records from content
-    if (getContent().isPresent()) {
-      return getContent().get();
-    } else if (readBlockLazily && !getContent().isPresent() && records == null) {
-      // read block lazily
-      createRecordsFromContentBytes();
-    }
-
+  protected byte[] serializeRecords() throws IOException {
     Schema schema = new Schema.Parser().parse(super.getLogBlockHeader().get(HeaderMetadataType.SCHEMA));
     GenericDatumWriter<IndexedRecord> writer = new GenericDatumWriter<>(schema);
     ByteArrayOutputStream baos = new ByteArrayOutputStream();
@@ -137,40 +121,10 @@ public class HoodieAvroDataBlock extends HoodieLogBlock {
     return baos.toByteArray();
   }
 
-  @Override
-  public HoodieLogBlockType getBlockType() {
-    return HoodieLogBlockType.AVRO_DATA_BLOCK;
-  }
-
-  public List<IndexedRecord> getRecords() {
-    if (records == null) {
-      try {
-        // in case records are absent, read content lazily and then convert to IndexedRecords
-        createRecordsFromContentBytes();
-      } catch (IOException io) {
-        throw new HoodieIOException("Unable to convert content bytes to records", io);
-      }
-    }
-    return records;
-  }
-
-  public Schema getSchema() {
-    // if getSchema was invoked before converting byte [] to records
-    if (records == null) {
-      getRecords();
-    }
-    return schema;
-  }
-
   // TODO (na) - Break down content into smaller chunks of byte [] to be GC as they are used
   // TODO (na) - Implement a recordItr instead of recordList
-  private void createRecordsFromContentBytes() throws IOException {
-
-    if (readBlockLazily && !getContent().isPresent()) {
-      // read log block contents from disk
-      inflate();
-    }
-
+  @Override
+  protected void deserializeRecords() throws IOException {
     SizeAwareDataInputStream dis =
         new SizeAwareDataInputStream(new DataInputStream(new ByteArrayInputStream(getContent().get())));
 
@@ -212,6 +166,9 @@ public class HoodieAvroDataBlock extends HoodieLogBlock {
 
   //----------------------------------------------------------------------------------------
   //                                  DEPRECATED METHODS
+  //
+  // These methods were only supported by HoodieAvroDataBlock and have been deprecated. Hence,
+  // these are only implemented here even though they duplicate the code from HoodieAvroDataBlock.
   //----------------------------------------------------------------------------------------
 
   /**
@@ -230,7 +187,7 @@ public class HoodieAvroDataBlock extends HoodieLogBlock {
    * HoodieLogFormat V1.
    */
   @Deprecated
-  public static HoodieLogBlock getBlock(byte[] content, Schema readerSchema) throws IOException {
+  public static HoodieAvroDataBlock getBlock(byte[] content, Schema readerSchema) throws IOException {
 
     SizeAwareDataInputStream dis = new SizeAwareDataInputStream(new DataInputStream(new ByteArrayInputStream(content)));
 
@@ -302,5 +259,4 @@ public class HoodieAvroDataBlock extends HoodieLogBlock {
     output.close();
     return baos.toByteArray();
   }
-
 }
diff --git a/hudi-common/src/main/java/org/apache/hudi/common/table/log/block/HoodieDataBlock.java b/hudi-common/src/main/java/org/apache/hudi/common/table/log/block/HoodieDataBlock.java
new file mode 100644
index 0000000..1a70fc3
--- /dev/null
+++ b/hudi-common/src/main/java/org/apache/hudi/common/table/log/block/HoodieDataBlock.java
@@ -0,0 +1,132 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hudi.common.table.log.block;
+
+import org.apache.hudi.common.util.Option;
+import org.apache.hudi.exception.HoodieException;
+import org.apache.hudi.exception.HoodieIOException;
+
+import org.apache.avro.Schema;
+import org.apache.avro.generic.IndexedRecord;
+import org.apache.hadoop.fs.FSDataInputStream;
+
+import javax.annotation.Nonnull;
+
+import java.io.IOException;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+
+/**
+ * DataBlock contains a list of records serialized using formats compatible with the base file format.
+ * For each base file format there is a corresponding DataBlock format.
+ *
+ * The Datablock contains:
+ *   1. Data Block version
+ *   2. Total number of records in the block
+ *   3. Actual serialized content of the records
+ */
+public abstract class HoodieDataBlock extends HoodieLogBlock {
+
+  protected List<IndexedRecord> records;
+  protected Schema schema;
+
+  public HoodieDataBlock(@Nonnull Map<HeaderMetadataType, String> logBlockHeader,
+      @Nonnull Map<HeaderMetadataType, String> logBlockFooter,
+      @Nonnull Option<HoodieLogBlockContentLocation> blockContentLocation, @Nonnull Option<byte[]> content,
+      FSDataInputStream inputStream, boolean readBlockLazily) {
+    super(logBlockHeader, logBlockFooter, blockContentLocation, content, inputStream, readBlockLazily);
+  }
+
+  public HoodieDataBlock(@Nonnull List<IndexedRecord> records, @Nonnull Map<HeaderMetadataType, String> header,
+      @Nonnull Map<HeaderMetadataType, String> footer) {
+    super(header, footer, Option.empty(), Option.empty(), null, false);
+    this.records = records;
+    this.schema = new Schema.Parser().parse(super.getLogBlockHeader().get(HeaderMetadataType.SCHEMA));
+  }
+
+  public HoodieDataBlock(@Nonnull List<IndexedRecord> records, @Nonnull Map<HeaderMetadataType, String> header) {
+    this(records, header, new HashMap<>());
+  }
+
+  protected HoodieDataBlock(Option<byte[]> content, @Nonnull FSDataInputStream inputStream, boolean readBlockLazily,
+      Option<HoodieLogBlockContentLocation> blockContentLocation, Schema readerSchema,
+      @Nonnull Map<HeaderMetadataType, String> headers, @Nonnull Map<HeaderMetadataType, String> footer) {
+    super(headers, footer, blockContentLocation, content, inputStream, readBlockLazily);
+    this.schema = readerSchema;
+  }
+
+  public static HoodieLogBlock getBlock(HoodieLogBlockType logDataBlockFormat, List<IndexedRecord> recordList,
+                                        Map<HeaderMetadataType, String> header) {
+    switch (logDataBlockFormat) {
+      case AVRO_DATA_BLOCK:
+        return new HoodieAvroDataBlock(recordList, header);
+      default:
+        throw new HoodieException("Data block format " + logDataBlockFormat + " not implemented");
+    }
+  }
+
+  @Override
+  public byte[] getContentBytes() throws IOException {
+    // In case this method is called before realizing records from content
+    if (getContent().isPresent()) {
+      return getContent().get();
+    } else if (readBlockLazily && !getContent().isPresent() && records == null) {
+      // read block lazily
+      createRecordsFromContentBytes();
+    }
+
+    return serializeRecords();
+  }
+
+  public abstract HoodieLogBlockType getBlockType();
+
+  public List<IndexedRecord> getRecords() {
+    if (records == null) {
+      try {
+        // in case records are absent, read content lazily and then convert to IndexedRecords
+        createRecordsFromContentBytes();
+      } catch (IOException io) {
+        throw new HoodieIOException("Unable to convert content bytes to records", io);
+      }
+    }
+    return records;
+  }
+
+  public Schema getSchema() {
+    // if getSchema was invoked before converting byte [] to records
+    if (records == null) {
+      getRecords();
+    }
+    return schema;
+  }
+
+  private void createRecordsFromContentBytes() throws IOException {
+    if (readBlockLazily && !getContent().isPresent()) {
+      // read log block contents from disk
+      inflate();
+    }
+
+    deserializeRecords();
+  }
+
+  protected abstract byte[] serializeRecords() throws IOException;
+
+  protected abstract void deserializeRecords() throws IOException;
+}
diff --git a/hudi-common/src/main/java/org/apache/hudi/common/table/log/block/HoodieLogBlock.java b/hudi-common/src/main/java/org/apache/hudi/common/table/log/block/HoodieLogBlock.java
index 24144d7..b1a88c1 100644
--- a/hudi-common/src/main/java/org/apache/hudi/common/table/log/block/HoodieLogBlock.java
+++ b/hudi-common/src/main/java/org/apache/hudi/common/table/log/block/HoodieLogBlock.java
@@ -84,9 +84,7 @@ public abstract class HoodieLogBlock {
     throw new HoodieException("No implementation was provided");
   }
 
-  public HoodieLogBlockType getBlockType() {
-    throw new HoodieException("No implementation was provided");
-  }
+  public abstract HoodieLogBlockType getBlockType();
 
   public long getLogBlockLength() {
     throw new HoodieException("No implementation was provided");
diff --git a/hudi-common/src/main/java/org/apache/hudi/common/table/view/AbstractTableFileSystemView.java b/hudi-common/src/main/java/org/apache/hudi/common/table/view/AbstractTableFileSystemView.java
index 9babc80..4fdf20b 100644
--- a/hudi-common/src/main/java/org/apache/hudi/common/table/view/AbstractTableFileSystemView.java
+++ b/hudi-common/src/main/java/org/apache/hudi/common/table/view/AbstractTableFileSystemView.java
@@ -121,7 +121,8 @@ public abstract class AbstractTableFileSystemView implements SyncableFileSystemV
       }
     });
     long storePartitionsTs = timer.endTimer();
-    LOG.info("addFilesToView: NumFiles=" + statuses.length + ", FileGroupsCreationTime=" + fgBuildTimeTakenMs
+    LOG.info("addFilesToView: NumFiles=" + statuses.length + ", NumFileGroups=" + fileGroups.size()
+        + ", FileGroupsCreationTime=" + fgBuildTimeTakenMs
         + ", StoreTimeTaken=" + storePartitionsTs);
     return fileGroups;
   }
diff --git a/hudi-client/src/main/java/org/apache/hudi/client/utils/ParquetReaderIterator.java b/hudi-common/src/main/java/org/apache/hudi/common/util/ParquetReaderIterator.java
similarity index 98%
rename from hudi-client/src/main/java/org/apache/hudi/client/utils/ParquetReaderIterator.java
rename to hudi-common/src/main/java/org/apache/hudi/common/util/ParquetReaderIterator.java
index 2bf5c78..20c79dd 100644
--- a/hudi-client/src/main/java/org/apache/hudi/client/utils/ParquetReaderIterator.java
+++ b/hudi-common/src/main/java/org/apache/hudi/common/util/ParquetReaderIterator.java
@@ -16,7 +16,7 @@
  * limitations under the License.
  */
 
-package org.apache.hudi.client.utils;
+package org.apache.hudi.common.util;
 
 import org.apache.hudi.common.util.queue.BoundedInMemoryQueue;
 import org.apache.hudi.exception.HoodieIOException;
diff --git a/hudi-client/src/main/java/org/apache/hudi/io/storage/HoodieStorageWriter.java b/hudi-common/src/main/java/org/apache/hudi/io/storage/HoodieFileReader.java
similarity index 66%
copy from hudi-client/src/main/java/org/apache/hudi/io/storage/HoodieStorageWriter.java
copy to hudi-common/src/main/java/org/apache/hudi/io/storage/HoodieFileReader.java
index d8cdb0f..0e5ead9 100644
--- a/hudi-client/src/main/java/org/apache/hudi/io/storage/HoodieStorageWriter.java
+++ b/hudi-common/src/main/java/org/apache/hudi/io/storage/HoodieFileReader.java
@@ -18,19 +18,27 @@
 
 package org.apache.hudi.io.storage;
 
-import org.apache.hudi.common.model.HoodieRecord;
+import java.io.IOException;
+import java.util.Iterator;
+import java.util.Set;
 
+import org.apache.avro.Schema;
 import org.apache.avro.generic.IndexedRecord;
+import org.apache.hudi.common.bloom.BloomFilter;
 
-import java.io.IOException;
+public interface HoodieFileReader<R extends IndexedRecord> {
+
+  public String[] readMinMaxRecordKeys();
+
+  public BloomFilter readBloomFilter();
 
-public interface HoodieStorageWriter<R extends IndexedRecord> {
+  public Set<String> filterRowKeys(Set<String> candidateRowKeys);
 
-  void writeAvroWithMetadata(R newRecord, HoodieRecord record) throws IOException;
+  public Iterator<R> getRecordIterator(Schema schema) throws IOException;
 
-  boolean canWrite();
+  Schema getSchema();
 
-  void close() throws IOException;
+  void close();
 
-  void writeAvro(String key, R oldRecord) throws IOException;
+  long getTotalRecords();
 }
diff --git a/hudi-common/src/main/java/org/apache/hudi/io/storage/HoodieFileReaderFactory.java b/hudi-common/src/main/java/org/apache/hudi/io/storage/HoodieFileReaderFactory.java
new file mode 100644
index 0000000..1ad85d3
--- /dev/null
+++ b/hudi-common/src/main/java/org/apache/hudi/io/storage/HoodieFileReaderFactory.java
@@ -0,0 +1,47 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hudi.io.storage;
+
+import org.apache.hudi.common.fs.FSUtils;
+import org.apache.hudi.common.model.HoodieRecordPayload;
+
+import org.apache.avro.generic.IndexedRecord;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.Path;
+
+import java.io.IOException;
+
+import static org.apache.hudi.common.model.HoodieFileFormat.PARQUET;
+
+public class HoodieFileReaderFactory {
+
+  public static <T extends HoodieRecordPayload, R extends IndexedRecord> HoodieFileReader<R> getFileReader(
+      Configuration conf, Path path) throws IOException {
+    final String extension = FSUtils.getFileExtension(path.toString());
+    if (PARQUET.getFileExtension().equals(extension)) {
+      return newParquetFileReader(conf, path);
+    }
+    throw new UnsupportedOperationException(extension + " format not supported yet.");
+  }
+
+  private static <T extends HoodieRecordPayload, R extends IndexedRecord> HoodieFileReader<R> newParquetFileReader(
+      Configuration conf, Path path) throws IOException {
+    return new HoodieParquetReader<>(conf, path);
+  }
+}
diff --git a/hudi-common/src/main/java/org/apache/hudi/io/storage/HoodieParquetReader.java b/hudi-common/src/main/java/org/apache/hudi/io/storage/HoodieParquetReader.java
new file mode 100644
index 0000000..107f503
--- /dev/null
+++ b/hudi-common/src/main/java/org/apache/hudi/io/storage/HoodieParquetReader.java
@@ -0,0 +1,80 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hudi.io.storage;
+
+import java.io.IOException;
+import java.util.Iterator;
+import java.util.Set;
+
+import org.apache.avro.Schema;
+import org.apache.avro.generic.IndexedRecord;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.Path;
+import org.apache.hudi.common.bloom.BloomFilter;
+import org.apache.hudi.common.util.ParquetReaderIterator;
+import org.apache.hudi.common.util.ParquetUtils;
+import org.apache.parquet.avro.AvroParquetReader;
+import org.apache.parquet.avro.AvroReadSupport;
+import org.apache.parquet.hadoop.ParquetReader;
+
+public class HoodieParquetReader<R extends IndexedRecord> implements HoodieFileReader {
+  private Path path;
+  private Configuration conf;
+
+  public HoodieParquetReader(Configuration configuration, Path path) {
+    this.conf = configuration;
+    this.path = path;
+  }
+
+  public String[] readMinMaxRecordKeys() {
+    return ParquetUtils.readMinMaxRecordKeys(conf, path);
+  }
+
+  @Override
+  public BloomFilter readBloomFilter() {
+    return ParquetUtils.readBloomFilterFromParquetMetadata(conf, path);
+  }
+
+  @Override
+  public Set<String> filterRowKeys(Set candidateRowKeys) {
+    return ParquetUtils.filterParquetRowKeys(conf, path, candidateRowKeys);
+  }
+
+  @Override
+  public Iterator<R> getRecordIterator(Schema schema) throws IOException {
+    AvroReadSupport.setAvroReadSchema(conf, schema);
+    ParquetReader<IndexedRecord> reader = AvroParquetReader.<IndexedRecord>builder(path).withConf(conf).build();
+    return new ParquetReaderIterator(reader);
+  }
+
+  @Override
+  public Schema getSchema() {
+    return ParquetUtils.readAvroSchema(conf, path);
+  }
+
+  @Override
+  public void close() {
+  }
+
+  @Override
+  public long getTotalRecords() {
+    // TODO Auto-generated method stub
+    return 0;
+  }
+}
diff --git a/hudi-client/src/main/java/org/apache/hudi/io/storage/HoodieStorageWriter.java b/hudi-common/src/test/java/org/apache/hudi/common/functional/TestHoodieAvroLogFormat.java
similarity index 65%
rename from hudi-client/src/main/java/org/apache/hudi/io/storage/HoodieStorageWriter.java
rename to hudi-common/src/test/java/org/apache/hudi/common/functional/TestHoodieAvroLogFormat.java
index d8cdb0f..940ae87 100644
--- a/hudi-client/src/main/java/org/apache/hudi/io/storage/HoodieStorageWriter.java
+++ b/hudi-common/src/test/java/org/apache/hudi/common/functional/TestHoodieAvroLogFormat.java
@@ -16,21 +16,15 @@
  * limitations under the License.
  */
 
-package org.apache.hudi.io.storage;
+package org.apache.hudi.common.functional;
 
-import org.apache.hudi.common.model.HoodieRecord;
+import org.apache.hudi.common.table.log.block.HoodieLogBlock.HoodieLogBlockType;
 
-import org.apache.avro.generic.IndexedRecord;
-
-import java.io.IOException;
-
-public interface HoodieStorageWriter<R extends IndexedRecord> {
-
-  void writeAvroWithMetadata(R newRecord, HoodieRecord record) throws IOException;
-
-  boolean canWrite();
-
-  void close() throws IOException;
-
-  void writeAvro(String key, R oldRecord) throws IOException;
+/**
+ * Tests Avro log format {@link HoodieAvroDataBlock}.
+ */
+public class TestHoodieAvroLogFormat extends TestHoodieLogFormat {
+  public TestHoodieAvroLogFormat() {
+    super(HoodieLogBlockType.AVRO_DATA_BLOCK);
+  }
 }
diff --git a/hudi-common/src/test/java/org/apache/hudi/common/functional/TestHoodieLogFormat.java b/hudi-common/src/test/java/org/apache/hudi/common/functional/TestHoodieLogFormat.java
index e9f0ef7..6214af1 100755
--- a/hudi-common/src/test/java/org/apache/hudi/common/functional/TestHoodieLogFormat.java
+++ b/hudi-common/src/test/java/org/apache/hudi/common/functional/TestHoodieLogFormat.java
@@ -32,6 +32,7 @@ import org.apache.hudi.common.table.log.HoodieLogFormat.Writer;
 import org.apache.hudi.common.table.log.HoodieMergedLogRecordScanner;
 import org.apache.hudi.common.table.log.block.HoodieAvroDataBlock;
 import org.apache.hudi.common.table.log.block.HoodieCommandBlock;
+import org.apache.hudi.common.table.log.block.HoodieDataBlock;
 import org.apache.hudi.common.table.log.block.HoodieDeleteBlock;
 import org.apache.hudi.common.table.log.block.HoodieLogBlock;
 import org.apache.hudi.common.table.log.block.HoodieLogBlock.HeaderMetadataType;
@@ -81,12 +82,20 @@ import static org.junit.jupiter.api.Assertions.assertTrue;
  * Tests hoodie log format {@link HoodieLogFormat}.
  */
 @SuppressWarnings("Duplicates")
-public class TestHoodieLogFormat extends HoodieCommonTestHarness {
+public abstract class TestHoodieLogFormat extends HoodieCommonTestHarness {
 
   private static String BASE_OUTPUT_PATH = "/tmp/";
   private FileSystem fs;
   private Path partitionPath;
   private int bufferSize = 4096;
+  private HoodieLogBlockType dataBlockType;
+
+  public TestHoodieLogFormat(HoodieLogBlockType dataBlockType) {
+    this.dataBlockType = dataBlockType;
+  }
+
+  private TestHoodieLogFormat() {
+  }
 
   @BeforeAll
   public static void setUpClass() throws IOException, InterruptedException {
@@ -133,7 +142,7 @@ public class TestHoodieLogFormat extends HoodieCommonTestHarness {
     Map<HeaderMetadataType, String> header = new HashMap<>();
     header.put(HoodieLogBlock.HeaderMetadataType.INSTANT_TIME, "100");
     header.put(HoodieLogBlock.HeaderMetadataType.SCHEMA, getSimpleSchema().toString());
-    HoodieAvroDataBlock dataBlock = new HoodieAvroDataBlock(records, header);
+    HoodieDataBlock dataBlock = getDataBlock(records, header);
     writer = writer.appendBlock(dataBlock);
     long size = writer.getCurrentSize();
     assertTrue(size > 0, "We just wrote a block - size should be > 0");
@@ -151,7 +160,7 @@ public class TestHoodieLogFormat extends HoodieCommonTestHarness {
     Map<HoodieLogBlock.HeaderMetadataType, String> header = new HashMap<>();
     header.put(HoodieLogBlock.HeaderMetadataType.INSTANT_TIME, "100");
     header.put(HoodieLogBlock.HeaderMetadataType.SCHEMA, getSimpleSchema().toString());
-    HoodieAvroDataBlock dataBlock = new HoodieAvroDataBlock(records, header);
+    HoodieDataBlock dataBlock = getDataBlock(records, header);
     // Write out a block
     writer = writer.appendBlock(dataBlock);
     // Get the size of the block
@@ -164,7 +173,7 @@ public class TestHoodieLogFormat extends HoodieCommonTestHarness {
             .withFileId("test-fileid1").overBaseCommit("100").withFs(fs).withSizeThreshold(size - 1).build();
     records = SchemaTestUtil.generateTestRecords(0, 100);
     header.put(HoodieLogBlock.HeaderMetadataType.SCHEMA, getSimpleSchema().toString());
-    dataBlock = new HoodieAvroDataBlock(records, header);
+    dataBlock = getDataBlock(records, header);
     writer = writer.appendBlock(dataBlock);
     assertEquals(0, writer.getCurrentSize(), "This should be a new log file and hence size should be 0");
     assertEquals(2, writer.getLogFile().getLogVersion(), "Version should be rolled to 2");
@@ -217,7 +226,7 @@ public class TestHoodieLogFormat extends HoodieCommonTestHarness {
     Map<HoodieLogBlock.HeaderMetadataType, String> header = new HashMap<>();
     header.put(HoodieLogBlock.HeaderMetadataType.INSTANT_TIME, "100");
     header.put(HoodieLogBlock.HeaderMetadataType.SCHEMA, getSimpleSchema().toString());
-    HoodieAvroDataBlock dataBlock = new HoodieAvroDataBlock(records, header);
+    HoodieDataBlock dataBlock = getDataBlock(records, header);
     writer = writer.appendBlock(dataBlock);
     writer2 = writer2.appendBlock(dataBlock);
     writer.close();
@@ -235,7 +244,7 @@ public class TestHoodieLogFormat extends HoodieCommonTestHarness {
     Map<HoodieLogBlock.HeaderMetadataType, String> header = new HashMap<>();
     header.put(HoodieLogBlock.HeaderMetadataType.INSTANT_TIME, "100");
     header.put(HoodieLogBlock.HeaderMetadataType.SCHEMA, getSimpleSchema().toString());
-    HoodieAvroDataBlock dataBlock = new HoodieAvroDataBlock(records, header);
+    HoodieDataBlock dataBlock = getDataBlock(records, header);
     writer = writer.appendBlock(dataBlock);
     long size1 = writer.getCurrentSize();
     writer.close();
@@ -245,7 +254,7 @@ public class TestHoodieLogFormat extends HoodieCommonTestHarness {
             .withFileId("test-fileid1").overBaseCommit("100").withFs(fs).build();
     records = SchemaTestUtil.generateTestRecords(0, 100);
     header.put(HoodieLogBlock.HeaderMetadataType.SCHEMA, getSimpleSchema().toString());
-    dataBlock = new HoodieAvroDataBlock(records, header);
+    dataBlock = getDataBlock(records, header);
     writer = writer.appendBlock(dataBlock);
     long size2 = writer.getCurrentSize();
     assertTrue(size2 > size1, "We just wrote a new block - size2 should be > size1");
@@ -259,7 +268,7 @@ public class TestHoodieLogFormat extends HoodieCommonTestHarness {
             .withFileId("test-fileid1").overBaseCommit("100").withFs(fs).build();
     records = SchemaTestUtil.generateTestRecords(0, 100);
     header.put(HoodieLogBlock.HeaderMetadataType.SCHEMA, getSimpleSchema().toString());
-    dataBlock = new HoodieAvroDataBlock(records, header);
+    dataBlock = getDataBlock(records, header);
     writer = writer.appendBlock(dataBlock);
     long size3 = writer.getCurrentSize();
     assertTrue(size3 > size2, "We just wrote a new block - size3 should be > size2");
@@ -289,7 +298,7 @@ public class TestHoodieLogFormat extends HoodieCommonTestHarness {
    *       dataBlock = new HoodieAvroDataBlock(records, header); writer = writer.appendBlock(dataBlock); long size1 =
    *       writer.getCurrentSize(); // do not close this writer - this simulates a data note appending to a log dying
    *       without closing the file // writer.close();
-   * 
+   *
    *       writer = HoodieLogFormat.newWriterBuilder().onParentPath(partitionPath)
    *       .withFileExtension(HoodieLogFile.DELTA_EXTENSION).withFileId("test-fileid1").overBaseCommit("100")
    *       .withFs(fs).build(); records = SchemaTestUtil.generateTestRecords(0, 100);
@@ -313,7 +322,7 @@ public class TestHoodieLogFormat extends HoodieCommonTestHarness {
     Map<HoodieLogBlock.HeaderMetadataType, String> header = new HashMap<>();
     header.put(HoodieLogBlock.HeaderMetadataType.INSTANT_TIME, "100");
     header.put(HoodieLogBlock.HeaderMetadataType.SCHEMA, getSimpleSchema().toString());
-    HoodieAvroDataBlock dataBlock = new HoodieAvroDataBlock(records, header);
+    HoodieDataBlock dataBlock = getDataBlock(records, header);
 
     for (int i = 0; i < 2; i++) {
       HoodieLogFormat.newWriterBuilder().onParentPath(testPath)
@@ -338,15 +347,15 @@ public class TestHoodieLogFormat extends HoodieCommonTestHarness {
     Map<HoodieLogBlock.HeaderMetadataType, String> header = new HashMap<>();
     header.put(HoodieLogBlock.HeaderMetadataType.INSTANT_TIME, "100");
     header.put(HoodieLogBlock.HeaderMetadataType.SCHEMA, getSimpleSchema().toString());
-    HoodieAvroDataBlock dataBlock = new HoodieAvroDataBlock(records, header);
+    HoodieDataBlock dataBlock = getDataBlock(records, header);
     writer = writer.appendBlock(dataBlock);
     writer.close();
 
     Reader reader = HoodieLogFormat.newReader(fs, writer.getLogFile(), SchemaTestUtil.getSimpleSchema());
     assertTrue(reader.hasNext(), "We wrote a block, we should be able to read it");
     HoodieLogBlock nextBlock = reader.next();
-    assertEquals(HoodieLogBlockType.AVRO_DATA_BLOCK, nextBlock.getBlockType(), "The next block should be a data block");
-    HoodieAvroDataBlock dataBlockRead = (HoodieAvroDataBlock) nextBlock;
+    assertEquals(dataBlockType, nextBlock.getBlockType(), "The next block should be a data block");
+    HoodieDataBlock dataBlockRead = (HoodieDataBlock) nextBlock;
     assertEquals(copyOfRecords.size(), dataBlockRead.getRecords().size(),
         "Read records size should be equal to the written records size");
     assertEquals(copyOfRecords, dataBlockRead.getRecords(),
@@ -366,7 +375,7 @@ public class TestHoodieLogFormat extends HoodieCommonTestHarness {
     Map<HoodieLogBlock.HeaderMetadataType, String> header = new HashMap<>();
     header.put(HoodieLogBlock.HeaderMetadataType.INSTANT_TIME, "100");
     header.put(HoodieLogBlock.HeaderMetadataType.SCHEMA, getSimpleSchema().toString());
-    HoodieAvroDataBlock dataBlock = new HoodieAvroDataBlock(records1, header);
+    HoodieDataBlock dataBlock = getDataBlock(records1, header);
     writer = writer.appendBlock(dataBlock);
     writer.close();
 
@@ -377,7 +386,7 @@ public class TestHoodieLogFormat extends HoodieCommonTestHarness {
     List<IndexedRecord> copyOfRecords2 = records2.stream()
         .map(record -> HoodieAvroUtils.rewriteRecord((GenericRecord) record, schema)).collect(Collectors.toList());
     header.put(HoodieLogBlock.HeaderMetadataType.SCHEMA, getSimpleSchema().toString());
-    dataBlock = new HoodieAvroDataBlock(records2, header);
+    dataBlock = getDataBlock(records2, header);
     writer = writer.appendBlock(dataBlock);
     writer.close();
 
@@ -389,14 +398,14 @@ public class TestHoodieLogFormat extends HoodieCommonTestHarness {
     List<IndexedRecord> copyOfRecords3 = records3.stream()
         .map(record -> HoodieAvroUtils.rewriteRecord((GenericRecord) record, schema)).collect(Collectors.toList());
     header.put(HoodieLogBlock.HeaderMetadataType.SCHEMA, getSimpleSchema().toString());
-    dataBlock = new HoodieAvroDataBlock(records3, header);
+    dataBlock = getDataBlock(records3, header);
     writer = writer.appendBlock(dataBlock);
     writer.close();
 
     Reader reader = HoodieLogFormat.newReader(fs, writer.getLogFile(), SchemaTestUtil.getSimpleSchema());
     assertTrue(reader.hasNext(), "First block should be available");
     HoodieLogBlock nextBlock = reader.next();
-    HoodieAvroDataBlock dataBlockRead = (HoodieAvroDataBlock) nextBlock;
+    HoodieDataBlock dataBlockRead = (HoodieDataBlock) nextBlock;
     assertEquals(copyOfRecords1.size(), dataBlockRead.getRecords().size(),
         "Read records size should be equal to the written records size");
     assertEquals(copyOfRecords1, dataBlockRead.getRecords(),
@@ -405,7 +414,7 @@ public class TestHoodieLogFormat extends HoodieCommonTestHarness {
 
     reader.hasNext();
     nextBlock = reader.next();
-    dataBlockRead = (HoodieAvroDataBlock) nextBlock;
+    dataBlockRead = (HoodieDataBlock) nextBlock;
     assertEquals(copyOfRecords2.size(), dataBlockRead.getRecords().size(),
         "Read records size should be equal to the written records size");
     assertEquals(copyOfRecords2, dataBlockRead.getRecords(),
@@ -413,7 +422,7 @@ public class TestHoodieLogFormat extends HoodieCommonTestHarness {
 
     reader.hasNext();
     nextBlock = reader.next();
-    dataBlockRead = (HoodieAvroDataBlock) nextBlock;
+    dataBlockRead = (HoodieDataBlock) nextBlock;
     assertEquals(copyOfRecords3.size(), dataBlockRead.getRecords().size(),
         "Read records size should be equal to the written records size");
     assertEquals(copyOfRecords3, dataBlockRead.getRecords(),
@@ -443,7 +452,7 @@ public class TestHoodieLogFormat extends HoodieCommonTestHarness {
           .map(record -> HoodieAvroUtils.rewriteRecord((GenericRecord) record, schema)).collect(Collectors.toList());
       allRecords.add(copyOfRecords1);
       header.put(HoodieLogBlock.HeaderMetadataType.SCHEMA, schema.toString());
-      HoodieAvroDataBlock dataBlock = new HoodieAvroDataBlock(records1, header);
+      HoodieDataBlock dataBlock = getDataBlock(records1, header);
       writer = writer.appendBlock(dataBlock);
     }
     writer.close();
@@ -472,7 +481,7 @@ public class TestHoodieLogFormat extends HoodieCommonTestHarness {
     Map<HoodieLogBlock.HeaderMetadataType, String> header = new HashMap<>();
     header.put(HoodieLogBlock.HeaderMetadataType.INSTANT_TIME, "100");
     header.put(HoodieLogBlock.HeaderMetadataType.SCHEMA, getSimpleSchema().toString());
-    HoodieAvroDataBlock dataBlock = new HoodieAvroDataBlock(records, header);
+    HoodieDataBlock dataBlock = getDataBlock(records, header);
     writer = writer.appendBlock(dataBlock);
     writer.close();
 
@@ -498,7 +507,7 @@ public class TestHoodieLogFormat extends HoodieCommonTestHarness {
                     .withFileId("test-fileid1").overBaseCommit("100").withFs(fs).build();
     records = SchemaTestUtil.generateTestRecords(0, 10);
     header.put(HoodieLogBlock.HeaderMetadataType.SCHEMA, getSimpleSchema().toString());
-    dataBlock = new HoodieAvroDataBlock(records, header);
+    dataBlock = getDataBlock(records, header);
     writer = writer.appendBlock(dataBlock);
     writer.close();
 
@@ -536,7 +545,7 @@ public class TestHoodieLogFormat extends HoodieCommonTestHarness {
             .withFileId("test-fileid1").overBaseCommit("100").withFs(fs).build();
     records = SchemaTestUtil.generateTestRecords(0, 100);
     header.put(HoodieLogBlock.HeaderMetadataType.SCHEMA, getSimpleSchema().toString());
-    dataBlock = new HoodieAvroDataBlock(records, header);
+    dataBlock = getDataBlock(records, header);
     writer = writer.appendBlock(dataBlock);
     writer.close();
 
@@ -574,7 +583,7 @@ public class TestHoodieLogFormat extends HoodieCommonTestHarness {
     Map<HoodieLogBlock.HeaderMetadataType, String> header = new HashMap<>();
     header.put(HoodieLogBlock.HeaderMetadataType.INSTANT_TIME, "100");
     header.put(HoodieLogBlock.HeaderMetadataType.SCHEMA, schema.toString());
-    HoodieAvroDataBlock dataBlock = new HoodieAvroDataBlock(records1, header);
+    HoodieDataBlock dataBlock = getDataBlock(records1, header);
     writer = writer.appendBlock(dataBlock);
 
     // Write 2
@@ -582,7 +591,7 @@ public class TestHoodieLogFormat extends HoodieCommonTestHarness {
     List<IndexedRecord> copyOfRecords2 = records2.stream()
         .map(record -> HoodieAvroUtils.rewriteRecord((GenericRecord) record, schema)).collect(Collectors.toList());
     header.put(HoodieLogBlock.HeaderMetadataType.SCHEMA, schema.toString());
-    dataBlock = new HoodieAvroDataBlock(records2, header);
+    dataBlock = getDataBlock(records2, header);
     writer = writer.appendBlock(dataBlock);
     writer.close();
 
@@ -621,14 +630,14 @@ public class TestHoodieLogFormat extends HoodieCommonTestHarness {
     header.put(HoodieLogBlock.HeaderMetadataType.INSTANT_TIME, "100");
 
     header.put(HoodieLogBlock.HeaderMetadataType.SCHEMA, schema.toString());
-    HoodieAvroDataBlock dataBlock = new HoodieAvroDataBlock(records1, header);
+    HoodieDataBlock dataBlock = getDataBlock(records1, header);
     writer = writer.appendBlock(dataBlock);
 
     // Write 2
     header.put(HoodieLogBlock.HeaderMetadataType.INSTANT_TIME, "101");
     List<IndexedRecord> records2 = SchemaTestUtil.generateHoodieTestRecords(0, 100);
     header.put(HoodieLogBlock.HeaderMetadataType.SCHEMA, schema.toString());
-    dataBlock = new HoodieAvroDataBlock(records2, header);
+    dataBlock = getDataBlock(records2, header);
     writer = writer.appendBlock(dataBlock);
 
     // Rollback the last write
@@ -644,7 +653,7 @@ public class TestHoodieLogFormat extends HoodieCommonTestHarness {
     List<IndexedRecord> copyOfRecords3 = records3.stream()
         .map(record -> HoodieAvroUtils.rewriteRecord((GenericRecord) record, schema)).collect(Collectors.toList());
     header.put(HoodieLogBlock.HeaderMetadataType.SCHEMA, schema.toString());
-    dataBlock = new HoodieAvroDataBlock(records3, header);
+    dataBlock = getDataBlock(records3, header);
     writer = writer.appendBlock(dataBlock);
     writer.close();
 
@@ -681,7 +690,7 @@ public class TestHoodieLogFormat extends HoodieCommonTestHarness {
     Map<HoodieLogBlock.HeaderMetadataType, String> header = new HashMap<>();
     header.put(HoodieLogBlock.HeaderMetadataType.INSTANT_TIME, "100");
     header.put(HoodieLogBlock.HeaderMetadataType.SCHEMA, schema.toString());
-    HoodieAvroDataBlock dataBlock = new HoodieAvroDataBlock(records1, header);
+    HoodieDataBlock dataBlock = getDataBlock(records1, header);
     writer = writer.appendBlock(dataBlock);
     writer.close();
 
@@ -723,7 +732,7 @@ public class TestHoodieLogFormat extends HoodieCommonTestHarness {
         .map(record -> HoodieAvroUtils.rewriteRecord((GenericRecord) record, schema)).collect(Collectors.toList());
 
     header.put(HoodieLogBlock.HeaderMetadataType.SCHEMA, schema.toString());
-    dataBlock = new HoodieAvroDataBlock(records3, header);
+    dataBlock = getDataBlock(records3, header);
     writer = writer.appendBlock(dataBlock);
     writer.close();
 
@@ -761,7 +770,7 @@ public class TestHoodieLogFormat extends HoodieCommonTestHarness {
     Map<HoodieLogBlock.HeaderMetadataType, String> header = new HashMap<>();
     header.put(HoodieLogBlock.HeaderMetadataType.INSTANT_TIME, "100");
     header.put(HoodieLogBlock.HeaderMetadataType.SCHEMA, schema.toString());
-    HoodieAvroDataBlock dataBlock = new HoodieAvroDataBlock(records1, header);
+    HoodieDataBlock dataBlock = getDataBlock(records1, header);
     writer = writer.appendBlock(dataBlock);
 
     // Write 2
@@ -769,7 +778,7 @@ public class TestHoodieLogFormat extends HoodieCommonTestHarness {
     List<IndexedRecord> records2 = SchemaTestUtil.generateHoodieTestRecords(0, 100);
     List<IndexedRecord> copyOfRecords2 = records2.stream()
         .map(record -> HoodieAvroUtils.rewriteRecord((GenericRecord) record, schema)).collect(Collectors.toList());
-    dataBlock = new HoodieAvroDataBlock(records2, header);
+    dataBlock = getDataBlock(records2, header);
     writer = writer.appendBlock(dataBlock);
 
     copyOfRecords1.addAll(copyOfRecords2);
@@ -849,13 +858,13 @@ public class TestHoodieLogFormat extends HoodieCommonTestHarness {
     header.put(HoodieLogBlock.HeaderMetadataType.TARGET_INSTANT_TIME, "100");
 
     header.put(HoodieLogBlock.HeaderMetadataType.SCHEMA, schema.toString());
-    HoodieAvroDataBlock dataBlock = new HoodieAvroDataBlock(records1, header);
+    HoodieDataBlock dataBlock = getDataBlock(records1, header);
     writer = writer.appendBlock(dataBlock);
 
     // Write 2
     List<IndexedRecord> records2 = SchemaTestUtil.generateHoodieTestRecords(0, 100);
     header.put(HoodieLogBlock.HeaderMetadataType.SCHEMA, schema.toString());
-    dataBlock = new HoodieAvroDataBlock(records2, header);
+    dataBlock = getDataBlock(records2, header);
     writer = writer.appendBlock(dataBlock);
 
     // Delete 50 keys
@@ -916,7 +925,7 @@ public class TestHoodieLogFormat extends HoodieCommonTestHarness {
     header.put(HoodieLogBlock.HeaderMetadataType.INSTANT_TIME, "100");
     header.put(HoodieLogBlock.HeaderMetadataType.TARGET_INSTANT_TIME, "100");
     header.put(HoodieLogBlock.HeaderMetadataType.SCHEMA, schema.toString());
-    HoodieAvroDataBlock dataBlock = new HoodieAvroDataBlock(records1, header);
+    HoodieDataBlock dataBlock = getDataBlock(records1, header);
     writer = writer.appendBlock(dataBlock);
 
     // Delete 50 keys
@@ -958,7 +967,7 @@ public class TestHoodieLogFormat extends HoodieCommonTestHarness {
     Map<HoodieLogBlock.HeaderMetadataType, String> header = new HashMap<>();
     header.put(HoodieLogBlock.HeaderMetadataType.INSTANT_TIME, "100");
     header.put(HoodieLogBlock.HeaderMetadataType.SCHEMA, schema.toString());
-    HoodieAvroDataBlock dataBlock = new HoodieAvroDataBlock(records1, header);
+    HoodieDataBlock dataBlock = getDataBlock(records1, header);
     writer = writer.appendBlock(dataBlock);
 
     // Write invalid rollback for a failed write (possible for in-flight commits)
@@ -1000,7 +1009,7 @@ public class TestHoodieLogFormat extends HoodieCommonTestHarness {
     header.put(HoodieLogBlock.HeaderMetadataType.INSTANT_TIME, "100");
     header.put(HoodieLogBlock.HeaderMetadataType.TARGET_INSTANT_TIME, "100");
     header.put(HoodieLogBlock.HeaderMetadataType.SCHEMA, schema.toString());
-    HoodieAvroDataBlock dataBlock = new HoodieAvroDataBlock(records1, header);
+    HoodieDataBlock dataBlock = getDataBlock(records1, header);
     writer = writer.appendBlock(dataBlock);
     writer = writer.appendBlock(dataBlock);
     writer = writer.appendBlock(dataBlock);
@@ -1047,7 +1056,7 @@ public class TestHoodieLogFormat extends HoodieCommonTestHarness {
     Map<HoodieLogBlock.HeaderMetadataType, String> header = new HashMap<>();
     header.put(HoodieLogBlock.HeaderMetadataType.INSTANT_TIME, "100");
     header.put(HoodieLogBlock.HeaderMetadataType.SCHEMA, schema.toString());
-    HoodieAvroDataBlock dataBlock = new HoodieAvroDataBlock(records1, header);
+    HoodieDataBlock dataBlock = getDataBlock(records1, header);
     writer = writer.appendBlock(dataBlock);
     writer = writer.appendBlock(dataBlock);
     writer = writer.appendBlock(dataBlock);
@@ -1149,7 +1158,7 @@ public class TestHoodieLogFormat extends HoodieCommonTestHarness {
       Map<HoodieLogBlock.HeaderMetadataType, String> header = new HashMap<>();
       header.put(HoodieLogBlock.HeaderMetadataType.INSTANT_TIME, "100");
       header.put(HoodieLogBlock.HeaderMetadataType.SCHEMA, schema.toString());
-      HoodieAvroDataBlock dataBlock = new HoodieAvroDataBlock(records.subList(0, numRecordsInLog1), header);
+      HoodieDataBlock dataBlock = getDataBlock(records.subList(0, numRecordsInLog1), header);
       writer = writer.appendBlock(dataBlock);
       // Get the size of the block
       long size = writer.getCurrentSize();
@@ -1163,7 +1172,7 @@ public class TestHoodieLogFormat extends HoodieCommonTestHarness {
       Map<HoodieLogBlock.HeaderMetadataType, String> header2 = new HashMap<>();
       header2.put(HoodieLogBlock.HeaderMetadataType.INSTANT_TIME, "100");
       header2.put(HoodieLogBlock.HeaderMetadataType.SCHEMA, schema.toString());
-      HoodieAvroDataBlock dataBlock2 = new HoodieAvroDataBlock(records2.subList(0, numRecordsInLog2), header2);
+      HoodieDataBlock dataBlock2 = getDataBlock(records2.subList(0, numRecordsInLog2), header2);
       writer2 = writer2.appendBlock(dataBlock2);
       // Get the size of the block
       writer2.close();
@@ -1227,7 +1236,7 @@ public class TestHoodieLogFormat extends HoodieCommonTestHarness {
     Map<HoodieLogBlock.HeaderMetadataType, String> header = new HashMap<>();
     header.put(HoodieLogBlock.HeaderMetadataType.INSTANT_TIME, "100");
     header.put(HoodieLogBlock.HeaderMetadataType.SCHEMA, schema.toString());
-    HoodieAvroDataBlock dataBlock = new HoodieAvroDataBlock(records1, header);
+    HoodieDataBlock dataBlock = getDataBlock(records1, header);
     writer = writer.appendBlock(dataBlock);
     writer.close();
 
@@ -1237,7 +1246,7 @@ public class TestHoodieLogFormat extends HoodieCommonTestHarness {
     List<IndexedRecord> records2 = SchemaTestUtil.generateTestRecords(0, 100);
     List<IndexedRecord> copyOfRecords2 = records2.stream()
         .map(record -> HoodieAvroUtils.rewriteRecord((GenericRecord) record, schema)).collect(Collectors.toList());
-    dataBlock = new HoodieAvroDataBlock(records2, header);
+    dataBlock = getDataBlock(records2, header);
     writer = writer.appendBlock(dataBlock);
     writer.close();
 
@@ -1248,7 +1257,7 @@ public class TestHoodieLogFormat extends HoodieCommonTestHarness {
     List<IndexedRecord> records3 = SchemaTestUtil.generateTestRecords(0, 100);
     List<IndexedRecord> copyOfRecords3 = records3.stream()
         .map(record -> HoodieAvroUtils.rewriteRecord((GenericRecord) record, schema)).collect(Collectors.toList());
-    dataBlock = new HoodieAvroDataBlock(records3, header);
+    dataBlock = getDataBlock(records3, header);
     writer = writer.appendBlock(dataBlock);
     writer.close();
 
@@ -1257,7 +1266,7 @@ public class TestHoodieLogFormat extends HoodieCommonTestHarness {
 
     assertTrue(reader.hasPrev(), "Last block should be available");
     HoodieLogBlock prevBlock = reader.prev();
-    HoodieAvroDataBlock dataBlockRead = (HoodieAvroDataBlock) prevBlock;
+    HoodieDataBlock dataBlockRead = (HoodieDataBlock) prevBlock;
 
     assertEquals(copyOfRecords3.size(), dataBlockRead.getRecords().size(),
         "Third records size should be equal to the written records size");
@@ -1266,7 +1275,7 @@ public class TestHoodieLogFormat extends HoodieCommonTestHarness {
 
     assertTrue(reader.hasPrev(), "Second block should be available");
     prevBlock = reader.prev();
-    dataBlockRead = (HoodieAvroDataBlock) prevBlock;
+    dataBlockRead = (HoodieDataBlock) prevBlock;
     assertEquals(copyOfRecords2.size(), dataBlockRead.getRecords().size(),
         "Read records size should be equal to the written records size");
     assertEquals(copyOfRecords2, dataBlockRead.getRecords(),
@@ -1274,7 +1283,7 @@ public class TestHoodieLogFormat extends HoodieCommonTestHarness {
 
     assertTrue(reader.hasPrev(), "First block should be available");
     prevBlock = reader.prev();
-    dataBlockRead = (HoodieAvroDataBlock) prevBlock;
+    dataBlockRead = (HoodieDataBlock) prevBlock;
     assertEquals(copyOfRecords1.size(), dataBlockRead.getRecords().size(),
         "Read records size should be equal to the written records size");
     assertEquals(copyOfRecords1, dataBlockRead.getRecords(),
@@ -1296,7 +1305,7 @@ public class TestHoodieLogFormat extends HoodieCommonTestHarness {
     Map<HoodieLogBlock.HeaderMetadataType, String> header = new HashMap<>();
     header.put(HoodieLogBlock.HeaderMetadataType.INSTANT_TIME, "100");
     header.put(HoodieLogBlock.HeaderMetadataType.SCHEMA, schema.toString());
-    HoodieAvroDataBlock dataBlock = new HoodieAvroDataBlock(records, header);
+    HoodieDataBlock dataBlock = getDataBlock(records, header);
     writer = writer.appendBlock(dataBlock);
     writer.close();
 
@@ -1322,7 +1331,7 @@ public class TestHoodieLogFormat extends HoodieCommonTestHarness {
         HoodieLogFormat.newWriterBuilder().onParentPath(partitionPath).withFileExtension(HoodieLogFile.DELTA_EXTENSION)
             .withFileId("test-fileid1").overBaseCommit("100").withFs(fs).build();
     records = SchemaTestUtil.generateTestRecords(0, 100);
-    dataBlock = new HoodieAvroDataBlock(records, header);
+    dataBlock = getDataBlock(records, header);
     writer = writer.appendBlock(dataBlock);
     writer.close();
 
@@ -1332,7 +1341,7 @@ public class TestHoodieLogFormat extends HoodieCommonTestHarness {
 
     assertTrue(reader.hasPrev(), "Last block should be available");
     HoodieLogBlock block = reader.prev();
-    assertTrue(block instanceof HoodieAvroDataBlock, "Last block should be datablock");
+    assertTrue(block instanceof HoodieDataBlock, "Last block should be datablock");
 
     assertTrue(reader.hasPrev(), "Last block should be available");
     assertThrows(CorruptedLogFileException.class, () -> {
@@ -1355,7 +1364,7 @@ public class TestHoodieLogFormat extends HoodieCommonTestHarness {
     Map<HoodieLogBlock.HeaderMetadataType, String> header = new HashMap<>();
     header.put(HoodieLogBlock.HeaderMetadataType.INSTANT_TIME, "100");
     header.put(HoodieLogBlock.HeaderMetadataType.SCHEMA, schema.toString());
-    HoodieAvroDataBlock dataBlock = new HoodieAvroDataBlock(records1, header);
+    HoodieDataBlock dataBlock = getDataBlock(records1, header);
     writer = writer.appendBlock(dataBlock);
     writer.close();
 
@@ -1363,7 +1372,7 @@ public class TestHoodieLogFormat extends HoodieCommonTestHarness {
         HoodieLogFormat.newWriterBuilder().onParentPath(partitionPath).withFileExtension(HoodieLogFile.DELTA_EXTENSION)
             .withFileId("test-fileid1").overBaseCommit("100").withFs(fs).build();
     List<IndexedRecord> records2 = SchemaTestUtil.generateTestRecords(0, 100);
-    dataBlock = new HoodieAvroDataBlock(records2, header);
+    dataBlock = getDataBlock(records2, header);
     writer = writer.appendBlock(dataBlock);
     writer.close();
 
@@ -1372,7 +1381,7 @@ public class TestHoodieLogFormat extends HoodieCommonTestHarness {
         HoodieLogFormat.newWriterBuilder().onParentPath(partitionPath).withFileExtension(HoodieLogFile.DELTA_EXTENSION)
             .withFileId("test-fileid1").overBaseCommit("100").withFs(fs).build();
     List<IndexedRecord> records3 = SchemaTestUtil.generateTestRecords(0, 100);
-    dataBlock = new HoodieAvroDataBlock(records3, header);
+    dataBlock = getDataBlock(records3, header);
     writer = writer.appendBlock(dataBlock);
     writer.close();
 
@@ -1388,7 +1397,7 @@ public class TestHoodieLogFormat extends HoodieCommonTestHarness {
     // After moving twice, this last reader.prev() should read the First block written
     assertTrue(reader.hasPrev(), "First block should be available");
     HoodieLogBlock prevBlock = reader.prev();
-    HoodieAvroDataBlock dataBlockRead = (HoodieAvroDataBlock) prevBlock;
+    HoodieDataBlock dataBlockRead = (HoodieDataBlock) prevBlock;
     assertEquals(copyOfRecords1.size(), dataBlockRead.getRecords().size(),
         "Read records size should be equal to the written records size");
     assertEquals(copyOfRecords1, dataBlockRead.getRecords(),
@@ -1429,4 +1438,13 @@ public class TestHoodieLogFormat extends HoodieCommonTestHarness {
       assertEquals(recordsCopy.get(i), readRecords.get(i));
     }
   }
+
+  private HoodieDataBlock getDataBlock(List<IndexedRecord> records, Map<HeaderMetadataType, String> header) {
+    switch (dataBlockType) {
+      case AVRO_DATA_BLOCK:
+        return new HoodieAvroDataBlock(records, header);
+      default:
+        throw new RuntimeException("Unknown data block type " + dataBlockType);
+    }
+  }
 }
diff --git a/hudi-common/src/test/java/org/apache/hudi/common/testutils/HoodieTestUtils.java b/hudi-common/src/test/java/org/apache/hudi/common/testutils/HoodieTestUtils.java
index a7ef208..caae246 100644
--- a/hudi-common/src/test/java/org/apache/hudi/common/testutils/HoodieTestUtils.java
+++ b/hudi-common/src/test/java/org/apache/hudi/common/testutils/HoodieTestUtils.java
@@ -28,6 +28,7 @@ import org.apache.hudi.common.fs.FSUtils;
 import org.apache.hudi.common.model.FileSlice;
 import org.apache.hudi.common.model.HoodieAvroPayload;
 import org.apache.hudi.common.model.HoodieCleaningPolicy;
+import org.apache.hudi.common.model.HoodieFileFormat;
 import org.apache.hudi.common.model.HoodieLogFile;
 import org.apache.hudi.common.model.HoodieRecord;
 import org.apache.hudi.common.model.HoodieRecordLocation;
@@ -120,21 +121,41 @@ public class HoodieTestUtils {
     return init(getDefaultHadoopConf(), basePath, tableType);
   }
 
+  public static HoodieTableMetaClient init(String basePath, HoodieFileFormat baseFileFormat) throws IOException {
+    return init(getDefaultHadoopConf(), basePath, HoodieTableType.COPY_ON_WRITE, baseFileFormat);
+  }
+
   public static HoodieTableMetaClient init(Configuration hadoopConf, String basePath) throws IOException {
     return init(hadoopConf, basePath, HoodieTableType.COPY_ON_WRITE);
   }
 
   public static HoodieTableMetaClient init(Configuration hadoopConf, String basePath, HoodieTableType tableType)
       throws IOException {
-    return init(hadoopConf, basePath, tableType, RAW_TRIPS_TEST_NAME);
+    return init(hadoopConf, basePath, tableType, new Properties());
   }
 
   public static HoodieTableMetaClient init(Configuration hadoopConf, String basePath, HoodieTableType tableType,
-      String tableName) throws IOException {
+                                           String tableName)
+      throws IOException {
     Properties properties = new Properties();
     properties.setProperty(HoodieTableConfig.HOODIE_TABLE_NAME_PROP_NAME, tableName);
-    properties.setProperty(HoodieTableConfig.HOODIE_TABLE_TYPE_PROP_NAME, tableType.name());
-    properties.setProperty(HoodieTableConfig.HOODIE_PAYLOAD_CLASS_PROP_NAME, HoodieAvroPayload.class.getName());
+    return init(hadoopConf, basePath, tableType, properties);
+  }
+
+  public static HoodieTableMetaClient init(Configuration hadoopConf, String basePath, HoodieTableType tableType,
+                                           HoodieFileFormat baseFileFormat)
+      throws IOException {
+    Properties properties = new Properties();
+    properties.setProperty(HoodieTableConfig.HOODIE_BASE_FILE_FORMAT_PROP_NAME, baseFileFormat.toString());
+    return init(hadoopConf, basePath, tableType, properties);
+  }
+
+  public static HoodieTableMetaClient init(Configuration hadoopConf, String basePath, HoodieTableType tableType,
+                                           Properties properties)
+      throws IOException {
+    properties.putIfAbsent(HoodieTableConfig.HOODIE_TABLE_NAME_PROP_NAME, RAW_TRIPS_TEST_NAME);
+    properties.putIfAbsent(HoodieTableConfig.HOODIE_TABLE_TYPE_PROP_NAME, tableType.name());
+    properties.putIfAbsent(HoodieTableConfig.HOODIE_PAYLOAD_CLASS_PROP_NAME, HoodieAvroPayload.class.getName());
     return HoodieTableMetaClient.initTableAndGetMetaClient(hadoopConf, basePath, properties);
   }
 
@@ -442,12 +463,31 @@ public class HoodieTestUtils {
     });
   }
 
+  // TODO: should be removed
   public static FileStatus[] listAllDataFilesInPath(FileSystem fs, String basePath) throws IOException {
+    return listAllDataFilesInPath(fs, basePath, ".parquet");
+  }
+
+  public static FileStatus[] listAllDataFilesInPath(FileSystem fs, String basePath, String datafileExtension)
+      throws IOException {
+    RemoteIterator<LocatedFileStatus> itr = fs.listFiles(new Path(basePath), true);
+    List<FileStatus> returns = new ArrayList<>();
+    while (itr.hasNext()) {
+      LocatedFileStatus status = itr.next();
+      if (status.getPath().getName().contains(datafileExtension)) {
+        returns.add(status);
+      }
+    }
+    return returns.toArray(new FileStatus[returns.size()]);
+  }
+
+  public static FileStatus[] listAllLogFilesInPath(FileSystem fs, String basePath, String logfileExtension)
+      throws IOException {
     RemoteIterator<LocatedFileStatus> itr = fs.listFiles(new Path(basePath), true);
     List<FileStatus> returns = new ArrayList<>();
     while (itr.hasNext()) {
       LocatedFileStatus status = itr.next();
-      if (status.getPath().getName().contains(".parquet")) {
+      if (status.getPath().getName().contains(logfileExtension)) {
         returns.add(status);
       }
     }
diff --git a/hudi-client/src/test/java/org/apache/hudi/client/utils/TestParquetReaderIterator.java b/hudi-common/src/test/java/org/apache/hudi/common/util/TestParquetReaderIterator.java
similarity index 98%
rename from hudi-client/src/test/java/org/apache/hudi/client/utils/TestParquetReaderIterator.java
rename to hudi-common/src/test/java/org/apache/hudi/common/util/TestParquetReaderIterator.java
index f20c5f9..799ed24 100644
--- a/hudi-client/src/test/java/org/apache/hudi/client/utils/TestParquetReaderIterator.java
+++ b/hudi-common/src/test/java/org/apache/hudi/common/util/TestParquetReaderIterator.java
@@ -16,7 +16,7 @@
  * limitations under the License.
  */
 
-package org.apache.hudi.client.utils;
+package org.apache.hudi.common.util;
 
 import org.apache.hudi.exception.HoodieIOException;
 
diff --git a/hudi-common/src/test/java/org/apache/hudi/io/storage/TestHoodieFileReaderFactory.java b/hudi-common/src/test/java/org/apache/hudi/io/storage/TestHoodieFileReaderFactory.java
new file mode 100644
index 0000000..13971d5
--- /dev/null
+++ b/hudi-common/src/test/java/org/apache/hudi/io/storage/TestHoodieFileReaderFactory.java
@@ -0,0 +1,54 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hudi.io.storage;
+
+import org.apache.avro.generic.IndexedRecord;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.Path;
+import org.junit.jupiter.api.Test;
+import org.junit.jupiter.api.io.TempDir;
+
+import java.io.IOException;
+
+import static org.junit.jupiter.api.Assertions.assertThrows;
+import static org.junit.jupiter.api.Assertions.assertTrue;
+
+/**
+ * Tests for {@link HoodieFileReaderFactory}.
+ */
+public class TestHoodieFileReaderFactory {
+  @TempDir
+  public java.nio.file.Path tempDir;
+
+  @Test
+  public void testGetFileReader() throws IOException {
+    // parquet file format.
+    final Configuration hadoopConf = new Configuration();
+    final Path parquetPath = new Path("/partition/path/f1_1-0-1_000.parquet");
+    HoodieFileReader<IndexedRecord> parquetReader = HoodieFileReaderFactory.getFileReader(hadoopConf, parquetPath);
+    assertTrue(parquetReader instanceof HoodieParquetReader);
+
+    // other file format exception.
+    final Path logPath = new Path("/partition/path/f.b51192a8-574b-4a85-b246-bcfec03ac8bf_100.log.2_1-0-1");
+    final Throwable thrown = assertThrows(UnsupportedOperationException.class, () -> {
+      HoodieFileReader<IndexedRecord> logWriter = HoodieFileReaderFactory.getFileReader(hadoopConf, logPath);
+    }, "should fail since log storage reader is not supported yet.");
+    assertTrue(thrown.getMessage().contains("format not supported yet."));
+  }
+}
diff --git a/hudi-hadoop-mr/src/main/java/org/apache/hudi/hadoop/HoodieParquetInputFormat.java b/hudi-hadoop-mr/src/main/java/org/apache/hudi/hadoop/HoodieParquetInputFormat.java
index 4db928c..266cc6f 100644
--- a/hudi-hadoop-mr/src/main/java/org/apache/hudi/hadoop/HoodieParquetInputFormat.java
+++ b/hudi-hadoop-mr/src/main/java/org/apache/hudi/hadoop/HoodieParquetInputFormat.java
@@ -18,6 +18,7 @@
 
 package org.apache.hudi.hadoop;
 
+import org.apache.hudi.common.model.HoodieFileFormat;
 import org.apache.hudi.common.table.HoodieTableMetaClient;
 import org.apache.hudi.common.table.timeline.HoodieDefaultTimeline;
 import org.apache.hudi.common.table.timeline.HoodieInstant;
@@ -101,7 +102,8 @@ public class HoodieParquetInputFormat extends MapredParquetInputFormat implement
       setInputPaths(job, snapshotPaths.toArray(new Path[snapshotPaths.size()]));
       FileStatus[] fileStatuses = super.listStatus(job);
       Map<HoodieTableMetaClient, List<FileStatus>> groupedFileStatus =
-          HoodieInputFormatUtils.groupFileStatusForSnapshotPaths(fileStatuses, tableMetaClientMap.values());
+          HoodieInputFormatUtils.groupFileStatusForSnapshotPaths(fileStatuses,
+              HoodieFileFormat.PARQUET.getFileExtension(), tableMetaClientMap.values());
       LOG.info("Found a total of " + groupedFileStatus.size() + " groups");
       for (Map.Entry<HoodieTableMetaClient, List<FileStatus>> entry : groupedFileStatus.entrySet()) {
         List<FileStatus> result = HoodieInputFormatUtils.filterFileStatusForSnapshotMode(job, entry.getKey(), entry.getValue());
diff --git a/hudi-hadoop-mr/src/main/java/org/apache/hudi/hadoop/hive/HoodieCombineHiveInputFormat.java b/hudi-hadoop-mr/src/main/java/org/apache/hudi/hadoop/hive/HoodieCombineHiveInputFormat.java
index fd7ffb2..a88d152 100644
--- a/hudi-hadoop-mr/src/main/java/org/apache/hudi/hadoop/hive/HoodieCombineHiveInputFormat.java
+++ b/hudi-hadoop-mr/src/main/java/org/apache/hudi/hadoop/hive/HoodieCombineHiveInputFormat.java
@@ -24,7 +24,7 @@ import org.apache.hudi.exception.HoodieException;
 import org.apache.hudi.hadoop.HoodieParquetInputFormat;
 import org.apache.hudi.hadoop.realtime.HoodieCombineRealtimeRecordReader;
 import org.apache.hudi.hadoop.realtime.HoodieParquetRealtimeInputFormat;
-
+import org.apache.hudi.hadoop.utils.HoodieInputFormatUtils;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.fs.FileStatus;
 import org.apache.hadoop.fs.FileSystem;
@@ -952,7 +952,11 @@ public class HoodieCombineHiveInputFormat<K extends WritableComparable, V extend
         ValidationUtils.checkArgument(split instanceof HoodieCombineRealtimeFileSplit, "Only "
             + HoodieCombineRealtimeFileSplit.class.getName() + " allowed, found " + split.getClass().getName());
         for (InputSplit inputSplit : ((HoodieCombineRealtimeFileSplit) split).getRealtimeFileSplits()) {
-          recordReaders.add(new HoodieParquetRealtimeInputFormat().getRecordReader(inputSplit, job, reporter));
+          if (split.getPaths().length == 0) {
+            continue;
+          }
+          FileInputFormat inputFormat = HoodieInputFormatUtils.getInputFormat(split.getPath(0).toString(), true, job);
+          recordReaders.add(inputFormat.getRecordReader(inputSplit, job, reporter));
         }
         return new HoodieCombineRealtimeRecordReader(job, split, recordReaders);
       }
diff --git a/hudi-hadoop-mr/src/main/java/org/apache/hudi/hadoop/realtime/AbstractRealtimeRecordReader.java b/hudi-hadoop-mr/src/main/java/org/apache/hudi/hadoop/realtime/AbstractRealtimeRecordReader.java
index b7da749..3758b9b 100644
--- a/hudi-hadoop-mr/src/main/java/org/apache/hudi/hadoop/realtime/AbstractRealtimeRecordReader.java
+++ b/hudi-hadoop-mr/src/main/java/org/apache/hudi/hadoop/realtime/AbstractRealtimeRecordReader.java
@@ -32,8 +32,6 @@ import org.apache.hadoop.hive.serde2.ColumnProjectionUtils;
 import org.apache.hadoop.mapred.JobConf;
 import org.apache.log4j.LogManager;
 import org.apache.log4j.Logger;
-import org.apache.parquet.avro.AvroSchemaConverter;
-import org.apache.parquet.schema.MessageType;
 
 import java.io.IOException;
 import java.util.ArrayList;
@@ -50,7 +48,6 @@ public abstract class AbstractRealtimeRecordReader {
 
   protected final HoodieRealtimeFileSplit split;
   protected final JobConf jobConf;
-  private final MessageType baseFileSchema;
   protected final boolean usesCustomPayload;
   // Schema handles
   private Schema readerSchema;
@@ -66,7 +63,6 @@ public abstract class AbstractRealtimeRecordReader {
     try {
       this.usesCustomPayload = usesCustomPayload();
       LOG.info("usesCustomPayload ==> " + this.usesCustomPayload);
-      baseFileSchema = HoodieRealtimeRecordReaderUtils.readSchema(jobConf, split.getPath());
       init();
     } catch (IOException e) {
       throw new HoodieIOException("Could not create HoodieRealtimeRecordReader on path " + this.split.getPath(), e);
@@ -88,7 +84,7 @@ public abstract class AbstractRealtimeRecordReader {
     Schema schemaFromLogFile =
         LogReaderUtils.readLatestSchemaFromLogFiles(split.getBasePath(), split.getDeltaLogPaths(), jobConf);
     if (schemaFromLogFile == null) {
-      writerSchema = new AvroSchemaConverter().convert(baseFileSchema);
+      writerSchema = HoodieRealtimeRecordReaderUtils.readSchema(jobConf, split.getPath());
       LOG.debug("Writer Schema From Parquet => " + writerSchema.getFields());
     } else {
       writerSchema = schemaFromLogFile;
diff --git a/hudi-hadoop-mr/src/main/java/org/apache/hudi/hadoop/utils/HoodieInputFormatUtils.java b/hudi-hadoop-mr/src/main/java/org/apache/hudi/hadoop/utils/HoodieInputFormatUtils.java
index d10b664..6d967f3 100644
--- a/hudi-hadoop-mr/src/main/java/org/apache/hudi/hadoop/utils/HoodieInputFormatUtils.java
+++ b/hudi-hadoop-mr/src/main/java/org/apache/hudi/hadoop/utils/HoodieInputFormatUtils.java
@@ -18,8 +18,10 @@
 
 package org.apache.hudi.hadoop.utils;
 
+import org.apache.hudi.common.fs.FSUtils;
 import org.apache.hudi.common.model.HoodieBaseFile;
 import org.apache.hudi.common.model.HoodieCommitMetadata;
+import org.apache.hudi.common.model.HoodieFileFormat;
 import org.apache.hudi.common.model.HoodiePartitionMetadata;
 import org.apache.hudi.common.table.HoodieTableMetaClient;
 import org.apache.hudi.common.table.timeline.HoodieDefaultTimeline;
@@ -30,11 +32,15 @@ import org.apache.hudi.common.table.view.TableFileSystemView;
 import org.apache.hudi.common.util.Option;
 import org.apache.hudi.common.util.StringUtils;
 import org.apache.hudi.exception.HoodieIOException;
-
+import org.apache.hudi.hadoop.HoodieParquetInputFormat;
+import org.apache.hudi.hadoop.realtime.HoodieParquetRealtimeInputFormat;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.fs.FileStatus;
 import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.hive.ql.io.parquet.MapredParquetOutputFormat;
+import org.apache.hadoop.hive.ql.io.parquet.serde.ParquetHiveSerDe;
+import org.apache.hadoop.mapred.FileInputFormat;
 import org.apache.hadoop.mapred.JobConf;
 import org.apache.hadoop.mapreduce.Job;
 import org.apache.log4j.LogManager;
@@ -61,6 +67,54 @@ public class HoodieInputFormatUtils {
 
   private static final Logger LOG = LogManager.getLogger(HoodieInputFormatUtils.class);
 
+  public static FileInputFormat getInputFormat(HoodieFileFormat baseFileFormat, boolean realtime, Configuration conf) {
+    switch (baseFileFormat) {
+      case PARQUET:
+        if (realtime) {
+          HoodieParquetRealtimeInputFormat inputFormat = new HoodieParquetRealtimeInputFormat();
+          inputFormat.setConf(conf);
+          return inputFormat;
+        } else {
+          HoodieParquetInputFormat inputFormat = new HoodieParquetInputFormat();
+          inputFormat.setConf(conf);
+          return inputFormat;
+        }
+      default:
+        throw new HoodieIOException("Hoodie InputFormat not implemented for base file format " + baseFileFormat);
+    }
+  }
+
+  public static String getInputFormatClassName(HoodieFileFormat baseFileFormat, boolean realtime, Configuration conf) {
+    FileInputFormat inputFormat = getInputFormat(baseFileFormat, realtime, conf);
+    return inputFormat.getClass().getName();
+  }
+
+  public static String getOutputFormatClassName(HoodieFileFormat baseFileFormat) {
+    switch (baseFileFormat) {
+      case PARQUET:
+        return MapredParquetOutputFormat.class.getName();
+      default:
+        throw new HoodieIOException("No OutputFormat for base file format " + baseFileFormat);
+    }
+  }
+
+  public static String getSerDeClassName(HoodieFileFormat baseFileFormat) {
+    switch (baseFileFormat) {
+      case PARQUET:
+        return ParquetHiveSerDe.class.getName();
+      default:
+        throw new HoodieIOException("No SerDe for base file format " + baseFileFormat);
+    }
+  }
+
+  public static FileInputFormat getInputFormat(String path, boolean realtime, Configuration conf) {
+    final String extension = FSUtils.getFileExtension(path.toString());
+    if (extension.equals(HoodieFileFormat.PARQUET.getFileExtension())) {
+      return getInputFormat(HoodieFileFormat.PARQUET, realtime, conf);
+    }
+    throw new HoodieIOException("Hoodie InputFormat not implemented for base file of type " + extension);
+  }
+
   /**
    * Filter any specific instants that we do not want to process.
    * example timeline:
@@ -255,19 +309,20 @@ public class HoodieInputFormatUtils {
    * Takes in a list of filesStatus and a list of table metadatas. Groups the files status list
    * based on given table metadata.
    * @param fileStatuses
+   * @param fileExtension
    * @param metaClientList
    * @return
    * @throws IOException
    */
   public static Map<HoodieTableMetaClient, List<FileStatus>> groupFileStatusForSnapshotPaths(
-      FileStatus[] fileStatuses, Collection<HoodieTableMetaClient> metaClientList) {
+      FileStatus[] fileStatuses, String fileExtension, Collection<HoodieTableMetaClient> metaClientList) {
     // This assumes the paths for different tables are grouped together
     Map<HoodieTableMetaClient, List<FileStatus>> grouped = new HashMap<>();
     HoodieTableMetaClient metadata = null;
     for (FileStatus status : fileStatuses) {
       Path inputPath = status.getPath();
-      if (!inputPath.getName().endsWith(".parquet")) {
-        //FIXME(vc): skip non parquet files for now. This wont be needed once log file name start
+      if (!inputPath.getName().endsWith(fileExtension)) {
+        //FIXME(vc): skip non data files for now. This wont be needed once log file name start
         // with "."
         continue;
       }
diff --git a/hudi-hadoop-mr/src/main/java/org/apache/hudi/hadoop/utils/HoodieRealtimeRecordReaderUtils.java b/hudi-hadoop-mr/src/main/java/org/apache/hudi/hadoop/utils/HoodieRealtimeRecordReaderUtils.java
index cd876b4..cec77f9 100644
--- a/hudi-hadoop-mr/src/main/java/org/apache/hudi/hadoop/utils/HoodieRealtimeRecordReaderUtils.java
+++ b/hudi-hadoop-mr/src/main/java/org/apache/hudi/hadoop/utils/HoodieRealtimeRecordReaderUtils.java
@@ -22,7 +22,8 @@ import org.apache.hudi.avro.HoodieAvroUtils;
 import org.apache.hudi.common.util.collection.Pair;
 import org.apache.hudi.exception.HoodieException;
 import org.apache.hudi.exception.HoodieIOException;
-
+import org.apache.hudi.io.storage.HoodieFileReader;
+import org.apache.hudi.io.storage.HoodieFileReaderFactory;
 import org.apache.avro.LogicalTypes;
 import org.apache.avro.Schema;
 import org.apache.avro.generic.GenericArray;
@@ -40,8 +41,6 @@ import org.apache.hadoop.io.IntWritable;
 import org.apache.hadoop.io.LongWritable;
 import org.apache.hadoop.io.Text;
 import org.apache.hadoop.io.Writable;
-import org.apache.parquet.hadoop.ParquetFileReader;
-import org.apache.parquet.schema.MessageType;
 
 import java.io.IOException;
 import java.nio.ByteBuffer;
@@ -57,14 +56,14 @@ import java.util.stream.Collectors;
 public class HoodieRealtimeRecordReaderUtils {
 
   /**
-   * Reads the schema from the parquet file. This is different from ParquetUtils as it uses the twitter parquet to
-   * support hive 1.1.0
+   * Reads the schema from the base file.
    */
-  public static MessageType readSchema(Configuration conf, Path parquetFilePath) {
+  public static Schema readSchema(Configuration conf, Path filePath) {
     try {
-      return ParquetFileReader.readFooter(conf, parquetFilePath).getFileMetaData().getSchema();
+      HoodieFileReader storageReader = HoodieFileReaderFactory.getFileReader(conf, filePath);
+      return storageReader.getSchema();
     } catch (IOException e) {
-      throw new HoodieIOException("Failed to read footer for parquet " + parquetFilePath, e);
+      throw new HoodieIOException("Failed to read schema from " + filePath, e);
     }
   }
 
diff --git a/hudi-hadoop-mr/src/test/java/org/apache/hudi/hadoop/TestHoodieParquetInputFormat.java b/hudi-hadoop-mr/src/test/java/org/apache/hudi/hadoop/TestHoodieParquetInputFormat.java
index 6e413cc..918aade 100644
--- a/hudi-hadoop-mr/src/test/java/org/apache/hudi/hadoop/TestHoodieParquetInputFormat.java
+++ b/hudi-hadoop-mr/src/test/java/org/apache/hudi/hadoop/TestHoodieParquetInputFormat.java
@@ -21,6 +21,7 @@ package org.apache.hudi.hadoop;
 import org.apache.hudi.avro.model.HoodieCompactionPlan;
 import org.apache.hudi.common.fs.FSUtils;
 import org.apache.hudi.common.model.HoodieCommitMetadata;
+import org.apache.hudi.common.model.HoodieFileFormat;
 import org.apache.hudi.common.model.HoodieWriteStat;
 import org.apache.hudi.common.table.HoodieTableMetaClient;
 import org.apache.hudi.common.table.timeline.HoodieActiveTimeline;
@@ -59,6 +60,8 @@ public class TestHoodieParquetInputFormat {
 
   private HoodieParquetInputFormat inputFormat;
   private JobConf jobConf;
+  private final HoodieFileFormat baseFileFormat = HoodieFileFormat.PARQUET;
+  private final String baseFileExtension = baseFileFormat.getFileExtension();
 
   public static void ensureFilesInCommit(String msg, FileStatus[] files, String commit, int expected) {
     int count = 0;
@@ -145,7 +148,7 @@ public class TestHoodieParquetInputFormat {
   @Test
   public void testInputFormatLoad() throws IOException {
     // initial commit
-    File partitionDir = InputFormatTestUtil.prepareTable(basePath, 10, "100");
+    File partitionDir = InputFormatTestUtil.prepareTable(basePath, baseFileFormat, 10, "100");
     InputFormatTestUtil.commit(basePath, "100");
 
     // Add the paths
@@ -161,7 +164,7 @@ public class TestHoodieParquetInputFormat {
   @Test
   public void testInputFormatUpdates() throws IOException {
     // initial commit
-    File partitionDir = InputFormatTestUtil.prepareTable(basePath, 10, "100");
+    File partitionDir = InputFormatTestUtil.prepareTable(basePath, baseFileFormat, 10, "100");
     InputFormatTestUtil.commit(basePath, "100");
 
     // Add the paths
@@ -171,7 +174,7 @@ public class TestHoodieParquetInputFormat {
     assertEquals(10, files.length);
 
     // update files
-    InputFormatTestUtil.simulateUpdates(partitionDir, "100", 5, "200", true);
+    InputFormatTestUtil.simulateUpdates(partitionDir, baseFileExtension, "100", 5, "200", true);
     // Before the commit
     files = inputFormat.listStatus(jobConf);
     assertEquals(10, files.length);
@@ -188,7 +191,7 @@ public class TestHoodieParquetInputFormat {
   @Test
   public void testInputFormatWithCompaction() throws IOException {
     // initial commit
-    File partitionDir = InputFormatTestUtil.prepareTable(basePath, 10, "100");
+    File partitionDir = InputFormatTestUtil.prepareTable(basePath, baseFileFormat, 10, "100");
     InputFormatTestUtil.commit(basePath, "100");
 
     // Add the paths
@@ -204,7 +207,7 @@ public class TestHoodieParquetInputFormat {
     createCompactionFile(basePath, "125");
 
     // add inserts after compaction timestamp
-    InputFormatTestUtil.simulateInserts(partitionDir, "fileId2", 5, "200");
+    InputFormatTestUtil.simulateInserts(partitionDir, baseFileExtension, "fileId2", 5, "200");
     InputFormatTestUtil.commit(basePath, "200");
 
     // verify snapshot reads show all new inserts even though there is pending compaction
@@ -221,7 +224,7 @@ public class TestHoodieParquetInputFormat {
   @Test
   public void testIncrementalSimple() throws IOException {
     // initial commit
-    File partitionDir = InputFormatTestUtil.prepareTable(basePath, 10, "100");
+    File partitionDir = InputFormatTestUtil.prepareTable(basePath, baseFileFormat, 10, "100");
     createCommitFile(basePath, "100", "2016/05/01");
 
     // Add the paths
@@ -266,25 +269,25 @@ public class TestHoodieParquetInputFormat {
   @Test
   public void testIncrementalWithMultipleCommits() throws IOException {
     // initial commit
-    File partitionDir = InputFormatTestUtil.prepareTable(basePath, 10, "100");
+    File partitionDir = InputFormatTestUtil.prepareTable(basePath, baseFileFormat, 10, "100");
     createCommitFile(basePath, "100", "2016/05/01");
 
     // Add the paths
     FileInputFormat.setInputPaths(jobConf, partitionDir.getPath());
     // update files
-    InputFormatTestUtil.simulateUpdates(partitionDir, "100", 5, "200", false);
+    InputFormatTestUtil.simulateUpdates(partitionDir, baseFileExtension, "100", 5, "200", false);
     createCommitFile(basePath, "200", "2016/05/01");
 
-    InputFormatTestUtil.simulateUpdates(partitionDir, "100", 4, "300", false);
+    InputFormatTestUtil.simulateUpdates(partitionDir, baseFileExtension, "100", 4, "300", false);
     createCommitFile(basePath, "300", "2016/05/01");
 
-    InputFormatTestUtil.simulateUpdates(partitionDir, "100", 3, "400", false);
+    InputFormatTestUtil.simulateUpdates(partitionDir, baseFileExtension, "100", 3, "400", false);
     createCommitFile(basePath, "400", "2016/05/01");
 
-    InputFormatTestUtil.simulateUpdates(partitionDir, "100", 2, "500", false);
+    InputFormatTestUtil.simulateUpdates(partitionDir, baseFileExtension, "100", 2, "500", false);
     createCommitFile(basePath, "500", "2016/05/01");
 
-    InputFormatTestUtil.simulateUpdates(partitionDir, "100", 1, "600", false);
+    InputFormatTestUtil.simulateUpdates(partitionDir, baseFileExtension, "100", 1, "600", false);
     createCommitFile(basePath, "600", "2016/05/01");
 
     InputFormatTestUtil.setupIncremental(jobConf, "100", 1);
@@ -364,14 +367,14 @@ public class TestHoodieParquetInputFormat {
   @Test
   public void testIncrementalWithPendingCompaction() throws IOException {
     // initial commit
-    File partitionDir = InputFormatTestUtil.prepareTable(basePath, 10, "100");
+    File partitionDir = InputFormatTestUtil.prepareTable(basePath, baseFileFormat, 10, "100");
     createCommitFile(basePath, "100", "2016/05/01");
 
     // simulate compaction requested at 300
     File compactionFile = createCompactionFile(basePath, "300");
 
     // write inserts into new bucket
-    InputFormatTestUtil.simulateInserts(partitionDir, "fileId2", 10, "400");
+    InputFormatTestUtil.simulateInserts(partitionDir, baseFileExtension, "fileId2", 10, "400");
     createCommitFile(basePath, "400", "2016/05/01");
 
     // Add the paths
diff --git a/hudi-hadoop-mr/src/test/java/org/apache/hudi/hadoop/testutils/InputFormatTestUtil.java b/hudi-hadoop-mr/src/test/java/org/apache/hudi/hadoop/testutils/InputFormatTestUtil.java
index 05669bb..0201fac 100644
--- a/hudi-hadoop-mr/src/test/java/org/apache/hudi/hadoop/testutils/InputFormatTestUtil.java
+++ b/hudi-hadoop-mr/src/test/java/org/apache/hudi/hadoop/testutils/InputFormatTestUtil.java
@@ -20,8 +20,10 @@ package org.apache.hudi.hadoop.testutils;
 
 import org.apache.hudi.avro.HoodieAvroUtils;
 import org.apache.hudi.common.fs.FSUtils;
+import org.apache.hudi.common.model.HoodieFileFormat;
 import org.apache.hudi.common.model.HoodieLogFile;
 import org.apache.hudi.common.model.HoodieRecord;
+import org.apache.hudi.common.model.HoodieTableType;
 import org.apache.hudi.common.table.log.HoodieLogFormat;
 import org.apache.hudi.common.table.log.block.HoodieAvroDataBlock;
 import org.apache.hudi.common.table.log.block.HoodieCommandBlock;
@@ -59,25 +61,29 @@ public class InputFormatTestUtil {
 
   private static String TEST_WRITE_TOKEN = "1-0-1";
 
-  public static File prepareTable(java.nio.file.Path basePath, int numberOfFiles, String commitNumber)
+  public static File prepareTable(java.nio.file.Path basePath, HoodieFileFormat baseFileFormat, int numberOfFiles,
+                                  String commitNumber)
       throws IOException {
-    HoodieTestUtils.init(HoodieTestUtils.getDefaultHadoopConf(), basePath.toString());
+    HoodieTestUtils.init(HoodieTestUtils.getDefaultHadoopConf(), basePath.toString(), HoodieTableType.COPY_ON_WRITE,
+        baseFileFormat);
     java.nio.file.Path partitionPath = basePath.resolve(Paths.get("2016", "05", "01"));
     Files.createDirectories(partitionPath);
-    return simulateInserts(partitionPath.toFile(), "fileId1", numberOfFiles, commitNumber);
+    return simulateInserts(partitionPath.toFile(), baseFileFormat.getFileExtension(), "fileId1", numberOfFiles,
+        commitNumber);
   }
 
-  public static File simulateInserts(File partitionPath, String fileId, int numberOfFiles, String commitNumber)
+  public static File simulateInserts(File partitionPath, String baseFileExtension, String fileId, int numberOfFiles,
+                                     String commitNumber)
       throws IOException {
     for (int i = 0; i < numberOfFiles; i++) {
       Files.createFile(partitionPath.toPath()
-          .resolve(FSUtils.makeDataFileName(commitNumber, TEST_WRITE_TOKEN, fileId + i)));
+          .resolve(FSUtils.makeDataFileName(commitNumber, TEST_WRITE_TOKEN, fileId + i, baseFileExtension)));
     }
     return partitionPath;
   }
 
-  public static void simulateUpdates(File directory, final String originalCommit, int numberOfFilesUpdated,
-      String newCommit, boolean randomize) throws IOException {
+  public static void simulateUpdates(File directory, String baseFileExtension, final String originalCommit,
+                                     int numberOfFilesUpdated, String newCommit, boolean randomize) throws IOException {
     List<File> dataFiles = Arrays.asList(Objects.requireNonNull(directory.listFiles((dir, name) -> {
       String commitTs = FSUtils.getCommitTime(name);
       return originalCommit.equals(commitTs);
@@ -88,7 +94,8 @@ public class InputFormatTestUtil {
     List<File> toUpdateList = dataFiles.subList(0, Math.min(numberOfFilesUpdated, dataFiles.size()));
     for (File file : toUpdateList) {
       String fileId = FSUtils.getFileId(file.getName());
-      Files.createFile(directory.toPath().resolve(FSUtils.makeDataFileName(newCommit, TEST_WRITE_TOKEN, fileId)));
+      Files.createFile(directory.toPath().resolve(FSUtils.makeDataFileName(newCommit, TEST_WRITE_TOKEN, fileId,
+          baseFileExtension)));
     }
   }
 
diff --git a/hudi-hive-sync/src/main/java/org/apache/hudi/hive/HiveSyncConfig.java b/hudi-hive-sync/src/main/java/org/apache/hudi/hive/HiveSyncConfig.java
index 37f4463..00a35aa 100644
--- a/hudi-hive-sync/src/main/java/org/apache/hudi/hive/HiveSyncConfig.java
+++ b/hudi-hive-sync/src/main/java/org/apache/hudi/hive/HiveSyncConfig.java
@@ -35,6 +35,9 @@ public class HiveSyncConfig implements Serializable {
   @Parameter(names = {"--table"}, description = "name of the target table in Hive", required = true)
   public String tableName;
 
+  @Parameter(names = {"--base-file-format"}, description = "Format of the base files (PARQUET (or) HFILE)")
+  public String baseFileFormat = "PARQUET";
+
   @Parameter(names = {"--user"}, description = "Hive username", required = true)
   public String hiveUser;
 
diff --git a/hudi-hive-sync/src/main/java/org/apache/hudi/hive/HiveSyncTool.java b/hudi-hive-sync/src/main/java/org/apache/hudi/hive/HiveSyncTool.java
index c3849d7..877ba47 100644
--- a/hudi-hive-sync/src/main/java/org/apache/hudi/hive/HiveSyncTool.java
+++ b/hudi-hive-sync/src/main/java/org/apache/hudi/hive/HiveSyncTool.java
@@ -19,10 +19,10 @@
 package org.apache.hudi.hive;
 
 import org.apache.hudi.common.fs.FSUtils;
+import org.apache.hudi.common.model.HoodieFileFormat;
 import org.apache.hudi.common.util.Option;
 import org.apache.hudi.exception.InvalidTableException;
-import org.apache.hudi.hadoop.HoodieParquetInputFormat;
-import org.apache.hudi.hadoop.realtime.HoodieParquetRealtimeInputFormat;
+import org.apache.hudi.hadoop.utils.HoodieInputFormatUtils;
 import org.apache.hudi.hive.HoodieHiveClient.PartitionEvent;
 import org.apache.hudi.hive.HoodieHiveClient.PartitionEvent.PartitionEventType;
 import org.apache.hudi.hive.util.HiveSchemaUtil;
@@ -32,8 +32,6 @@ import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.hive.conf.HiveConf;
 import org.apache.hadoop.hive.metastore.api.Partition;
-import org.apache.hadoop.hive.ql.io.parquet.MapredParquetOutputFormat;
-import org.apache.hadoop.hive.ql.io.parquet.serde.ParquetHiveSerDe;
 import org.apache.log4j.LogManager;
 import org.apache.log4j.Logger;
 import org.apache.parquet.schema.MessageType;
@@ -146,21 +144,24 @@ public class HiveSyncTool {
     // Check and sync schema
     if (!tableExists) {
       LOG.info("Hive table " + tableName + " is not found. Creating it");
-      if (!useRealTimeInputFormat) {
-        String inputFormatClassName = cfg.usePreApacheInputFormat ? com.uber.hoodie.hadoop.HoodieInputFormat.class.getName()
-            : HoodieParquetInputFormat.class.getName();
-        hoodieHiveClient.createTable(tableName, schema, inputFormatClassName, MapredParquetOutputFormat.class.getName(),
-            ParquetHiveSerDe.class.getName());
-      } else {
-        // Custom serde will not work with ALTER TABLE REPLACE COLUMNS
-        // https://github.com/apache/hive/blob/release-1.1.0/ql/src/java/org/apache/hadoop/hive
-        // /ql/exec/DDLTask.java#L3488
-        String inputFormatClassName =
-            cfg.usePreApacheInputFormat ? com.uber.hoodie.hadoop.realtime.HoodieRealtimeInputFormat.class.getName()
-                : HoodieParquetRealtimeInputFormat.class.getName();
-        hoodieHiveClient.createTable(tableName, schema, inputFormatClassName, MapredParquetOutputFormat.class.getName(),
-            ParquetHiveSerDe.class.getName());
+      HoodieFileFormat baseFileFormat = HoodieFileFormat.valueOf(cfg.baseFileFormat.toUpperCase());
+      String inputFormatClassName = HoodieInputFormatUtils.getInputFormatClassName(baseFileFormat, useRealTimeInputFormat,
+          new Configuration());
+
+      if (baseFileFormat.equals(HoodieFileFormat.PARQUET) && cfg.usePreApacheInputFormat) {
+        // Parquet input format had an InputFormat class visible under the old naming scheme.
+        inputFormatClassName = useRealTimeInputFormat
+            ? com.uber.hoodie.hadoop.realtime.HoodieRealtimeInputFormat.class.getName()
+            : com.uber.hoodie.hadoop.HoodieInputFormat.class.getName();
       }
+
+      String outputFormatClassName = HoodieInputFormatUtils.getOutputFormatClassName(baseFileFormat);
+      String serDeFormatClassName = HoodieInputFormatUtils.getSerDeClassName(baseFileFormat);
+
+      // Custom serde will not work with ALTER TABLE REPLACE COLUMNS
+      // https://github.com/apache/hive/blob/release-1.1.0/ql/src/java/org/apache/hadoop/hive
+      // /ql/exec/DDLTask.java#L3488
+      hoodieHiveClient.createTable(tableName, schema, inputFormatClassName, outputFormatClassName, serDeFormatClassName);
     } else {
       // Check if the table schema has evolved
       Map<String, String> tableSchema = hoodieHiveClient.getTableSchema(tableName);
diff --git a/hudi-integ-test/src/test/java/org/apache/hudi/integ/ITTestHoodieDemo.java b/hudi-integ-test/src/test/java/org/apache/hudi/integ/ITTestHoodieDemo.java
index 3af302f..f069a25 100644
--- a/hudi-integ-test/src/test/java/org/apache/hudi/integ/ITTestHoodieDemo.java
+++ b/hudi-integ-test/src/test/java/org/apache/hudi/integ/ITTestHoodieDemo.java
@@ -18,6 +18,7 @@
 
 package org.apache.hudi.integ;
 
+import org.apache.hudi.common.model.HoodieFileFormat;
 import org.apache.hudi.common.util.CollectionUtils;
 import org.apache.hudi.common.util.collection.Pair;
 
@@ -67,6 +68,8 @@ public class ITTestHoodieDemo extends ITTestBase {
   private static final String HIVE_INCREMENTAL_MOR_RO_COMMANDS = HOODIE_WS_ROOT + "/docker/demo/hive-incremental-mor-ro.commands";
   private static final String HIVE_INCREMENTAL_MOR_RT_COMMANDS = HOODIE_WS_ROOT + "/docker/demo/hive-incremental-mor-rt.commands";
 
+  private static HoodieFileFormat baseFileFormat;
+
   private static String HIVE_SYNC_CMD_FMT =
       " --enable-hive-sync --hoodie-conf hoodie.datasource.hive_sync.jdbcurl=jdbc:hive2://hiveserver:10000 "
           + " --hoodie-conf hoodie.datasource.hive_sync.username=hive "
@@ -76,7 +79,9 @@ public class ITTestHoodieDemo extends ITTestBase {
           + " --hoodie-conf hoodie.datasource.hive_sync.table=%s";
 
   @Test
-  public void testDemo() throws Exception {
+  public void testParquetDemo() throws Exception {
+    baseFileFormat = HoodieFileFormat.PARQUET;
+
     setupDemo();
 
     // batch 1
@@ -122,6 +127,7 @@ public class ITTestHoodieDemo extends ITTestBase {
     List<String> cmds = CollectionUtils.createImmutableList(
         "spark-submit --class org.apache.hudi.utilities.deltastreamer.HoodieDeltaStreamer " + HUDI_UTILITIES_BUNDLE
             + " --table-type COPY_ON_WRITE "
+            + " --base-file-format " + baseFileFormat.toString()
             + " --source-class org.apache.hudi.utilities.sources.JsonDFSSource --source-ordering-field ts "
             + " --target-base-path " + COW_BASE_PATH + " --target-table " + COW_TABLE_NAME
             + " --props /var/demo/config/dfs-source.properties"
@@ -130,12 +136,14 @@ public class ITTestHoodieDemo extends ITTestBase {
             + " --database default"
             + " --table " + COW_TABLE_NAME
             + " --base-path " + COW_BASE_PATH
+            + " --base-file-format " + baseFileFormat.toString()
             + " --user hive"
             + " --pass hive"
             + " --jdbc-url jdbc:hive2://hiveserver:10000"
             + " --partitioned-by dt",
         ("spark-submit --class org.apache.hudi.utilities.deltastreamer.HoodieDeltaStreamer " + HUDI_UTILITIES_BUNDLE
             + " --table-type MERGE_ON_READ "
+            + " --base-file-format " + baseFileFormat.toString()
             + " --source-class org.apache.hudi.utilities.sources.JsonDFSSource --source-ordering-field ts "
             + " --target-base-path " + MOR_BASE_PATH + " --target-table " + MOR_TABLE_NAME
             + " --props /var/demo/config/dfs-source.properties"
diff --git a/hudi-spark/src/main/java/org/apache/hudi/DataSourceUtils.java b/hudi-spark/src/main/java/org/apache/hudi/DataSourceUtils.java
index e45bd65..0a915c1 100644
--- a/hudi-spark/src/main/java/org/apache/hudi/DataSourceUtils.java
+++ b/hudi-spark/src/main/java/org/apache/hudi/DataSourceUtils.java
@@ -270,7 +270,7 @@ public class DataSourceUtils {
     return dropDuplicates(jssc, incomingHoodieRecords, writeConfig);
   }
 
-  public static HiveSyncConfig buildHiveSyncConfig(TypedProperties props, String basePath) {
+  public static HiveSyncConfig buildHiveSyncConfig(TypedProperties props, String basePath, String baseFileFormat) {
     checkRequiredProperties(props, Collections.singletonList(DataSourceWriteOptions.HIVE_TABLE_OPT_KEY()));
     HiveSyncConfig hiveSyncConfig = new HiveSyncConfig();
     hiveSyncConfig.basePath = basePath;
@@ -280,6 +280,7 @@ public class DataSourceUtils {
     hiveSyncConfig.databaseName = props.getString(DataSourceWriteOptions.HIVE_DATABASE_OPT_KEY(),
         DataSourceWriteOptions.DEFAULT_HIVE_DATABASE_OPT_VAL());
     hiveSyncConfig.tableName = props.getString(DataSourceWriteOptions.HIVE_TABLE_OPT_KEY());
+    hiveSyncConfig.baseFileFormat = baseFileFormat;
     hiveSyncConfig.hiveUser =
         props.getString(DataSourceWriteOptions.HIVE_USER_OPT_KEY(), DataSourceWriteOptions.DEFAULT_HIVE_USER_OPT_VAL());
     hiveSyncConfig.hivePass =
diff --git a/hudi-spark/src/main/scala/org/apache/hudi/DataSourceOptions.scala b/hudi-spark/src/main/scala/org/apache/hudi/DataSourceOptions.scala
index 3d1172f..5195f05 100644
--- a/hudi-spark/src/main/scala/org/apache/hudi/DataSourceOptions.scala
+++ b/hudi-spark/src/main/scala/org/apache/hudi/DataSourceOptions.scala
@@ -257,6 +257,7 @@ object DataSourceWriteOptions {
   val HIVE_SYNC_ENABLED_OPT_KEY = "hoodie.datasource.hive_sync.enable"
   val HIVE_DATABASE_OPT_KEY = "hoodie.datasource.hive_sync.database"
   val HIVE_TABLE_OPT_KEY = "hoodie.datasource.hive_sync.table"
+  val HIVE_BASE_FILE_FORMAT_OPT_KEY = "hoodie.datasource.hive_sync.base_file_format"
   val HIVE_USER_OPT_KEY = "hoodie.datasource.hive_sync.username"
   val HIVE_PASS_OPT_KEY = "hoodie.datasource.hive_sync.password"
   val HIVE_URL_OPT_KEY = "hoodie.datasource.hive_sync.jdbcurl"
@@ -270,6 +271,7 @@ object DataSourceWriteOptions {
   val DEFAULT_HIVE_SYNC_ENABLED_OPT_VAL = "false"
   val DEFAULT_HIVE_DATABASE_OPT_VAL = "default"
   val DEFAULT_HIVE_TABLE_OPT_VAL = "unknown"
+  val DEFAULT_HIVE_BASE_FILE_FORMAT_OPT_VAL = "PARQUET"
   val DEFAULT_HIVE_USER_OPT_VAL = "hive"
   val DEFAULT_HIVE_PASS_OPT_VAL = "hive"
   val DEFAULT_HIVE_URL_OPT_VAL = "jdbc:hive2://localhost:10000"
diff --git a/hudi-spark/src/main/scala/org/apache/hudi/HoodieSparkSqlWriter.scala b/hudi-spark/src/main/scala/org/apache/hudi/HoodieSparkSqlWriter.scala
index 88cef8f..5774c89 100644
--- a/hudi-spark/src/main/scala/org/apache/hudi/HoodieSparkSqlWriter.scala
+++ b/hudi-spark/src/main/scala/org/apache/hudi/HoodieSparkSqlWriter.scala
@@ -212,6 +212,7 @@ private[hudi] object HoodieSparkSqlWriter {
       HIVE_SYNC_ENABLED_OPT_KEY -> DEFAULT_HIVE_SYNC_ENABLED_OPT_VAL,
       HIVE_DATABASE_OPT_KEY -> DEFAULT_HIVE_DATABASE_OPT_VAL,
       HIVE_TABLE_OPT_KEY -> DEFAULT_HIVE_TABLE_OPT_VAL,
+      HIVE_BASE_FILE_FORMAT_OPT_KEY -> DEFAULT_HIVE_BASE_FILE_FORMAT_OPT_VAL,
       HIVE_USER_OPT_KEY -> DEFAULT_HIVE_USER_OPT_VAL,
       HIVE_PASS_OPT_KEY -> DEFAULT_HIVE_PASS_OPT_VAL,
       HIVE_URL_OPT_KEY -> DEFAULT_HIVE_URL_OPT_VAL,
@@ -239,6 +240,7 @@ private[hudi] object HoodieSparkSqlWriter {
   private def buildSyncConfig(basePath: Path, parameters: Map[String, String]): HiveSyncConfig = {
     val hiveSyncConfig: HiveSyncConfig = new HiveSyncConfig()
     hiveSyncConfig.basePath = basePath.toString
+    hiveSyncConfig.baseFileFormat = parameters(HIVE_BASE_FILE_FORMAT_OPT_KEY);
     hiveSyncConfig.usePreApacheInputFormat =
       parameters.get(HIVE_USE_PRE_APACHE_INPUT_FORMAT_OPT_KEY).exists(r => r.toBoolean)
     hiveSyncConfig.databaseName = parameters(HIVE_DATABASE_OPT_KEY)
diff --git a/hudi-utilities/src/main/java/org/apache/hudi/utilities/deltastreamer/DeltaSync.java b/hudi-utilities/src/main/java/org/apache/hudi/utilities/deltastreamer/DeltaSync.java
index 068e592..0d3e90c 100644
--- a/hudi-utilities/src/main/java/org/apache/hudi/utilities/deltastreamer/DeltaSync.java
+++ b/hudi-utilities/src/main/java/org/apache/hudi/utilities/deltastreamer/DeltaSync.java
@@ -205,7 +205,7 @@ public class DeltaSync implements Serializable {
     } else {
       this.commitTimelineOpt = Option.empty();
       HoodieTableMetaClient.initTableType(new Configuration(jssc.hadoopConfiguration()), cfg.targetBasePath,
-          cfg.tableType, cfg.targetTableName, "archived", cfg.payloadClassName);
+          cfg.tableType, cfg.targetTableName, "archived", cfg.payloadClassName, cfg.baseFileFormat);
     }
   }
 
@@ -274,7 +274,7 @@ public class DeltaSync implements Serializable {
       }
     } else {
       HoodieTableMetaClient.initTableType(new Configuration(jssc.hadoopConfiguration()), cfg.targetBasePath,
-          cfg.tableType, cfg.targetTableName, "archived", cfg.payloadClassName);
+          cfg.tableType, cfg.targetTableName, "archived", cfg.payloadClassName, cfg.baseFileFormat);
     }
 
     if (!resumeCheckpointStr.isPresent() && cfg.checkpoint != null) {
@@ -474,7 +474,7 @@ public class DeltaSync implements Serializable {
    */
   private void syncHive() {
     if (cfg.enableHiveSync) {
-      HiveSyncConfig hiveSyncConfig = DataSourceUtils.buildHiveSyncConfig(props, cfg.targetBasePath);
+      HiveSyncConfig hiveSyncConfig = DataSourceUtils.buildHiveSyncConfig(props, cfg.targetBasePath, cfg.baseFileFormat);
       LOG.info("Syncing target hoodie table with hive table(" + hiveSyncConfig.tableName + "). Hive metastore URL :"
           + hiveSyncConfig.jdbcUrl + ", basePath :" + cfg.targetBasePath);
       new HiveSyncTool(hiveSyncConfig, new HiveConf(conf, HiveConf.class), fs).syncHoodieTable();
diff --git a/hudi-utilities/src/main/java/org/apache/hudi/utilities/deltastreamer/HoodieDeltaStreamer.java b/hudi-utilities/src/main/java/org/apache/hudi/utilities/deltastreamer/HoodieDeltaStreamer.java
index a569c4f..a3d81fa 100644
--- a/hudi-utilities/src/main/java/org/apache/hudi/utilities/deltastreamer/HoodieDeltaStreamer.java
+++ b/hudi-utilities/src/main/java/org/apache/hudi/utilities/deltastreamer/HoodieDeltaStreamer.java
@@ -177,6 +177,9 @@ public class HoodieDeltaStreamer implements Serializable {
     @Parameter(names = {"--table-type"}, description = "Type of table. COPY_ON_WRITE (or) MERGE_ON_READ", required = true)
     public String tableType;
 
+    @Parameter(names = {"--base-file-format"}, description = "File format for the base files. PARQUET (or) HFILE", required = false)
+    public String baseFileFormat;
+
     @Parameter(names = {"--props"}, description = "path to properties file on localfs or dfs, with configurations for "
         + "hoodie client, schema provider, key generator and data source. For hoodie client props, sane defaults are "
         + "used, but recommend use to provide basic things like metrics endpoints, hive configs etc. For sources, refer"
@@ -379,8 +382,20 @@ public class HoodieDeltaStreamer implements Serializable {
         // This will guarantee there is no surprise with table type
         ValidationUtils.checkArgument(tableType.equals(HoodieTableType.valueOf(cfg.tableType)),
             "Hoodie table is of type " + tableType + " but passed in CLI argument is " + cfg.tableType);
+
+        // Load base file format
+        // This will guarantee there is no surprise with base file type
+        String baseFileFormat = meta.getTableConfig().getBaseFileFormat().toString();
+        ValidationUtils.checkArgument(baseFileFormat.equals(cfg.baseFileFormat) || cfg.baseFileFormat == null,
+            "Hoodie table's base file format is of type " + baseFileFormat + " but passed in CLI argument is "
+                + cfg.baseFileFormat);
+        cfg.baseFileFormat = meta.getTableConfig().getBaseFileFormat().toString();
+        this.cfg.baseFileFormat = cfg.baseFileFormat;
       } else {
         tableType = HoodieTableType.valueOf(cfg.tableType);
+        if (cfg.baseFileFormat == null) {
+          cfg.baseFileFormat = "PARQUET"; // default for backward compatibility
+        }
       }
 
       ValidationUtils.checkArgument(!cfg.filterDupes || cfg.operation != Operation.UPSERT,