You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hudi.apache.org by si...@apache.org on 2022/04/06 16:11:15 UTC

[hudi] branch master updated: [HUDI-3760] Adding capability to fetch Metadata Records by prefix (#5208)

This is an automated email from the ASF dual-hosted git repository.

sivabalan pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/hudi.git


The following commit(s) were added to refs/heads/master by this push:
     new 9e87d164b3 [HUDI-3760] Adding capability to fetch Metadata Records by prefix  (#5208)
9e87d164b3 is described below

commit 9e87d164b3f39d795ef70a1f22ba57353d6e198f
Author: Alexey Kudinkin <al...@infinilake.com>
AuthorDate: Wed Apr 6 09:11:08 2022 -0700

    [HUDI-3760] Adding capability to fetch Metadata Records by prefix  (#5208)
    
    - Adding capability to fetch Metadata Records by key prefix so that Data Skipping could fetch only Column Stats
    - Index records pertaining to the columns being queried by, instead of reading out whole Index.
    - Fixed usages of HFileScanner in HFileReader. few code paths uses cached scanner if available. Other code paths uses its own HFileScanner w/ positional read.
    
    Brief change log
    - Rebasing ColumnStatsIndexSupport to rely on HoodieBackedTableMetadata in lieu of reading t/h Spark DS
    - Adding methods enabling key-prefix lookups to HoodiFileReader, HoodieHFileReader
    - Wiring key-prefix lookup t/h LogRecordScanner impls
    - Cleaning up HoodieHFileReader impl
    
    Co-authored-by: sivabalan <n....@gmail.com>
    Co-authored-by: Sagar Sumit <sa...@gmail.com>
---
 .../hudi/cli/commands/ArchivedCommitsCommand.java  |   4 +-
 .../apache/hudi/cli/commands/ExportCommand.java    |   2 +-
 .../hudi/cli/commands/HoodieLogFileCommand.java    |   4 +-
 .../apache/hudi/client/HoodieTimelineArchiver.java |   2 +-
 .../apache/hudi/io/storage/HoodieHFileWriter.java  |   2 +-
 .../io/storage/TestHoodieHFileReaderWriter.java    | 118 +++-
 .../io/storage/TestHoodieReaderWriterBase.java     |   3 +-
 .../org/apache/hudi/HoodieConversionUtils.scala    |   8 +
 .../functional/TestHoodieBackedMetadata.java       | 167 ++++-
 .../functional/TestHoodieBackedTableMetadata.java  |  12 +-
 .../hudi/testutils/HoodieClientTestUtils.java      |   4 +-
 .../org/apache/hudi/BaseHoodieTableFileIndex.java  |   2 +-
 .../hudi/common/config/HoodieMetadataConfig.java   |   4 +-
 .../table/log/AbstractHoodieLogRecordReader.java   |  66 +-
 .../common/table/log/HoodieLogFormatReader.java    |  12 +-
 .../table/log/HoodieMergedLogRecordScanner.java    |  21 +-
 .../table/log/block/HoodieAvroDataBlock.java       |   2 +-
 .../common/table/log/block/HoodieDataBlock.java    |  28 +-
 .../table/log/block/HoodieHFileDataBlock.java      |  23 +-
 .../table/timeline/HoodieArchivedTimeline.java     |   2 +-
 .../apache/hudi/common/util/CollectionUtils.java   |  13 +
 .../apache/hudi/io/storage/HoodieFileReader.java   |  42 +-
 .../apache/hudi/io/storage/HoodieHFileReader.java  | 712 ++++++++++++---------
 .../apache/hudi/io/storage/HoodieOrcReader.java    |  11 +-
 .../hudi/io/storage/HoodieParquetReader.java       |  19 +-
 .../apache/hudi/metadata/BaseTableMetadata.java    |   2 +-
 .../metadata/FileSystemBackedTableMetadata.java    |   7 +
 .../hudi/metadata/HoodieBackedTableMetadata.java   | 329 +++++++---
 .../HoodieMetadataMergedLogRecordReader.java       |  97 ++-
 .../hudi/metadata/HoodieMetadataPayload.java       |   4 +-
 .../apache/hudi/metadata/HoodieTableMetadata.java  |  13 +
 .../hudi/metadata/HoodieTableMetadataUtil.java     |   8 +-
 .../main/java/org/apache/hudi/util/LazyRef.java    |  64 ++
 .../common/functional/TestHoodieLogFormat.java     |   2 +-
 .../org/apache/hudi/ColumnStatsIndexSupport.scala  | 113 +++-
 .../scala/org/apache/hudi/HoodieBaseRelation.scala |  10 +-
 .../scala/org/apache/hudi/HoodieFileIndex.scala    |  11 +-
 .../org/apache/hudi/HoodieMergeOnReadRDD.scala     |  12 +-
 .../org/apache/hudi/HoodieSparkSqlWriter.scala     |   9 +-
 .../scala/org/apache/hudi/HoodieWriterUtils.scala  |   6 -
 .../apache/hudi/SparkHoodieTableFileIndex.scala    |   2 +-
 .../sql/hudi/streaming/HoodieStreamSource.scala    |   1 +
 .../org/apache/hudi/TestHoodieFileIndex.scala      |   6 +-
 .../hudi/functional/TestColumnStatsIndex.scala     |  18 +-
 .../hudi/functional/TestLayoutOptimization.scala   |  10 +-
 .../TestMetadataTableWithSparkDataSource.scala     |  16 +-
 46 files changed, 1356 insertions(+), 667 deletions(-)

diff --git a/hudi-cli/src/main/java/org/apache/hudi/cli/commands/ArchivedCommitsCommand.java b/hudi-cli/src/main/java/org/apache/hudi/cli/commands/ArchivedCommitsCommand.java
index 1747a59f4f..fcb273f0a7 100644
--- a/hudi-cli/src/main/java/org/apache/hudi/cli/commands/ArchivedCommitsCommand.java
+++ b/hudi-cli/src/main/java/org/apache/hudi/cli/commands/ArchivedCommitsCommand.java
@@ -81,7 +81,7 @@ public class ArchivedCommitsCommand implements CommandMarker {
       // read the avro blocks
       while (reader.hasNext()) {
         HoodieAvroDataBlock blk = (HoodieAvroDataBlock) reader.next();
-        blk.getRecordItr().forEachRemaining(readRecords::add);
+        blk.getRecordIterator().forEachRemaining(readRecords::add);
       }
       List<Comparable[]> readCommits = readRecords.stream().map(r -> (GenericRecord) r)
           .filter(r -> r.get("actionType").toString().equals(HoodieTimeline.COMMIT_ACTION)
@@ -155,7 +155,7 @@ public class ArchivedCommitsCommand implements CommandMarker {
       // read the avro blocks
       while (reader.hasNext()) {
         HoodieAvroDataBlock blk = (HoodieAvroDataBlock) reader.next();
-        try (ClosableIterator<IndexedRecord> recordItr = blk.getRecordItr()) {
+        try (ClosableIterator<IndexedRecord> recordItr = blk.getRecordIterator()) {
           recordItr.forEachRemaining(readRecords::add);
         }
       }
diff --git a/hudi-cli/src/main/java/org/apache/hudi/cli/commands/ExportCommand.java b/hudi-cli/src/main/java/org/apache/hudi/cli/commands/ExportCommand.java
index 1d8d6dcd6a..fa6e15b7af 100644
--- a/hudi-cli/src/main/java/org/apache/hudi/cli/commands/ExportCommand.java
+++ b/hudi-cli/src/main/java/org/apache/hudi/cli/commands/ExportCommand.java
@@ -124,7 +124,7 @@ public class ExportCommand implements CommandMarker {
       // read the avro blocks
       while (reader.hasNext() && copyCount < limit) {
         HoodieAvroDataBlock blk = (HoodieAvroDataBlock) reader.next();
-        try (ClosableIterator<IndexedRecord> recordItr = blk.getRecordItr()) {
+        try (ClosableIterator<IndexedRecord> recordItr = blk.getRecordIterator()) {
           while (recordItr.hasNext()) {
             IndexedRecord ir = recordItr.next();
             // Archived instants are saved as arvo encoded HoodieArchivedMetaEntry records. We need to get the
diff --git a/hudi-cli/src/main/java/org/apache/hudi/cli/commands/HoodieLogFileCommand.java b/hudi-cli/src/main/java/org/apache/hudi/cli/commands/HoodieLogFileCommand.java
index 4a56858f39..8d99c410d6 100644
--- a/hudi-cli/src/main/java/org/apache/hudi/cli/commands/HoodieLogFileCommand.java
+++ b/hudi-cli/src/main/java/org/apache/hudi/cli/commands/HoodieLogFileCommand.java
@@ -122,7 +122,7 @@ public class HoodieLogFileCommand implements CommandMarker {
             instantTime = "dummy_instant_time_" + dummyInstantTimeCount;
           }
           if (n instanceof HoodieDataBlock) {
-            try (ClosableIterator<IndexedRecord> recordItr = ((HoodieDataBlock) n).getRecordItr()) {
+            try (ClosableIterator<IndexedRecord> recordItr = ((HoodieDataBlock) n).getRecordIterator()) {
               recordItr.forEachRemaining(r -> recordCount.incrementAndGet());
             }
           }
@@ -236,7 +236,7 @@ public class HoodieLogFileCommand implements CommandMarker {
           HoodieLogBlock n = reader.next();
           if (n instanceof HoodieDataBlock) {
             HoodieDataBlock blk = (HoodieDataBlock) n;
-            try (ClosableIterator<IndexedRecord> recordItr = blk.getRecordItr()) {
+            try (ClosableIterator<IndexedRecord> recordItr = blk.getRecordIterator()) {
               recordItr.forEachRemaining(record -> {
                 if (allRecords.size() < limit) {
                   allRecords.add(record);
diff --git a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/client/HoodieTimelineArchiver.java b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/client/HoodieTimelineArchiver.java
index b0d473be04..ca76e4e3bf 100644
--- a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/client/HoodieTimelineArchiver.java
+++ b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/client/HoodieTimelineArchiver.java
@@ -339,7 +339,7 @@ public class HoodieTimelineArchiver<T extends HoodieAvroPayload, I, K, O> {
           // Read the avro blocks
           while (reader.hasNext()) {
             HoodieAvroDataBlock blk = (HoodieAvroDataBlock) reader.next();
-            blk.getRecordItr().forEachRemaining(records::add);
+            blk.getRecordIterator().forEachRemaining(records::add);
             if (records.size() >= this.config.getCommitArchivalBatchSize()) {
               writeToFile(wrapperSchema, records);
             }
diff --git a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/io/storage/HoodieHFileWriter.java b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/io/storage/HoodieHFileWriter.java
index be79f50334..1642eb2c42 100644
--- a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/io/storage/HoodieHFileWriter.java
+++ b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/io/storage/HoodieHFileWriter.java
@@ -107,7 +107,7 @@ public class HoodieHFileWriter<T extends HoodieRecordPayload, R extends IndexedR
         .withFileContext(context)
         .create();
 
-    writer.appendFileInfo(HoodieHFileReader.KEY_SCHEMA.getBytes(), schema.toString().getBytes());
+    writer.appendFileInfo(HoodieHFileReader.SCHEMA_KEY.getBytes(), schema.toString().getBytes());
   }
 
   @Override
diff --git a/hudi-client/hudi-client-common/src/test/java/org/apache/hudi/io/storage/TestHoodieHFileReaderWriter.java b/hudi-client/hudi-client-common/src/test/java/org/apache/hudi/io/storage/TestHoodieHFileReaderWriter.java
index e1f97949ef..2db8eb0204 100644
--- a/hudi-client/hudi-client-common/src/test/java/org/apache/hudi/io/storage/TestHoodieHFileReaderWriter.java
+++ b/hudi-client/hudi-client-common/src/test/java/org/apache/hudi/io/storage/TestHoodieHFileReaderWriter.java
@@ -18,17 +18,6 @@
 
 package org.apache.hudi.io.storage;
 
-import org.apache.hudi.common.bootstrap.index.HFileBootstrapIndex;
-import org.apache.hudi.common.engine.TaskContextSupplier;
-import org.apache.hudi.common.fs.FSUtils;
-import org.apache.hudi.common.model.EmptyHoodieRecordPayload;
-import org.apache.hudi.common.model.HoodieAvroRecord;
-import org.apache.hudi.common.model.HoodieKey;
-import org.apache.hudi.common.model.HoodieRecord;
-import org.apache.hudi.common.util.FileIOUtils;
-import org.apache.hudi.config.HoodieIndexConfig;
-import org.apache.hudi.config.HoodieWriteConfig;
-
 import org.apache.avro.Schema;
 import org.apache.avro.generic.GenericData;
 import org.apache.avro.generic.GenericRecord;
@@ -39,7 +28,17 @@ import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.hbase.CellComparatorImpl;
 import org.apache.hadoop.hbase.io.hfile.CacheConfig;
 import org.apache.hadoop.hbase.io.hfile.HFile;
-import org.apache.hadoop.hbase.util.Pair;
+import org.apache.hudi.common.bootstrap.index.HFileBootstrapIndex;
+import org.apache.hudi.common.engine.TaskContextSupplier;
+import org.apache.hudi.common.fs.FSUtils;
+import org.apache.hudi.common.model.EmptyHoodieRecordPayload;
+import org.apache.hudi.common.model.HoodieAvroRecord;
+import org.apache.hudi.common.model.HoodieKey;
+import org.apache.hudi.common.model.HoodieRecord;
+import org.apache.hudi.common.util.FileIOUtils;
+import org.apache.hudi.common.util.Option;
+import org.apache.hudi.config.HoodieIndexConfig;
+import org.apache.hudi.config.HoodieWriteConfig;
 import org.junit.jupiter.api.Test;
 import org.junit.jupiter.params.ParameterizedTest;
 import org.junit.jupiter.params.provider.Arguments;
@@ -51,21 +50,25 @@ import java.io.IOException;
 import java.util.ArrayList;
 import java.util.Arrays;
 import java.util.Collections;
-import java.util.HashMap;
 import java.util.HashSet;
 import java.util.Iterator;
 import java.util.List;
 import java.util.Map;
 import java.util.Set;
+import java.util.Spliterator;
+import java.util.Spliterators;
+import java.util.TreeMap;
 import java.util.function.Supplier;
 import java.util.stream.Collectors;
 import java.util.stream.IntStream;
 import java.util.stream.Stream;
+import java.util.stream.StreamSupport;
 
 import static org.apache.hudi.common.testutils.FileSystemTestUtils.RANDOM;
 import static org.apache.hudi.common.testutils.SchemaTestUtil.getSchemaFromResource;
+import static org.apache.hudi.common.util.CollectionUtils.toStream;
 import static org.apache.hudi.io.storage.HoodieHFileConfig.HFILE_COMPARATOR;
-import static org.apache.hudi.io.storage.HoodieHFileReader.KEY_SCHEMA;
+import static org.apache.hudi.io.storage.HoodieHFileReader.SCHEMA_KEY;
 import static org.junit.jupiter.api.Assertions.assertEquals;
 import static org.junit.jupiter.api.Assertions.assertNotNull;
 import static org.junit.jupiter.api.Assertions.assertNull;
@@ -124,7 +127,7 @@ public class TestHoodieHFileReaderWriter extends TestHoodieReaderWriterBase {
     FileSystem fs = getFilePath().getFileSystem(conf);
     HFile.Reader hfileReader = HoodieHFileUtils.createHFileReader(fs, getFilePath(), new CacheConfig(conf), conf);
     assertEquals(getSchemaFromResource(TestHoodieHFileReaderWriter.class, schemaPath),
-        new Schema.Parser().parse(new String(hfileReader.getHFileInfo().get(KEY_SCHEMA.getBytes()))));
+        new Schema.Parser().parse(new String(hfileReader.getHFileInfo().get(SCHEMA_KEY.getBytes()))));
   }
 
   private static Stream<Arguments> populateMetaFieldsAndTestAvroWithMeta() {
@@ -142,7 +145,7 @@ public class TestHoodieHFileReaderWriter extends TestHoodieReaderWriterBase {
     Schema avroSchema = getSchemaFromResource(TestHoodieOrcReaderWriter.class, "/exampleSchemaWithMetaFields.avsc");
     HoodieFileWriter<GenericRecord> writer = createWriter(avroSchema, populateMetaFields);
     List<String> keys = new ArrayList<>();
-    Map<String, GenericRecord> recordMap = new HashMap<>();
+    Map<String, GenericRecord> recordMap = new TreeMap<>();
     for (int i = 0; i < 100; i++) {
       GenericRecord record = new GenericData.Record(avroSchema);
       String key = String.format("%s%04d", "key", i);
@@ -163,24 +166,30 @@ public class TestHoodieHFileReaderWriter extends TestHoodieReaderWriterBase {
 
     Configuration conf = new Configuration();
     HoodieHFileReader hoodieHFileReader = (HoodieHFileReader) createReader(conf);
-    List<Pair<String, IndexedRecord>> records = hoodieHFileReader.readAllRecords();
-    records.forEach(entry -> assertEquals(entry.getSecond(), recordMap.get(entry.getFirst())));
+    List<IndexedRecord> records = HoodieHFileReader.readAllRecords(hoodieHFileReader);
+    assertEquals(new ArrayList<>(recordMap.values()), records);
+
     hoodieHFileReader.close();
 
     for (int i = 0; i < 2; i++) {
       int randomRowstoFetch = 5 + RANDOM.nextInt(10);
       Set<String> rowsToFetch = getRandomKeys(randomRowstoFetch, keys);
+
       List<String> rowsList = new ArrayList<>(rowsToFetch);
       Collections.sort(rowsList);
-      hoodieHFileReader = (HoodieHFileReader) createReader(conf);
-      List<Pair<String, GenericRecord>> result = hoodieHFileReader.readRecords(rowsList);
-      assertEquals(result.size(), randomRowstoFetch);
+
+      List<GenericRecord> expectedRecords = rowsList.stream().map(recordMap::get).collect(Collectors.toList());
+
+      hoodieHFileReader = (HoodieHFileReader<GenericRecord>) createReader(conf);
+      List<GenericRecord> result = HoodieHFileReader.readRecords(hoodieHFileReader, rowsList);
+
+      assertEquals(expectedRecords, result);
+
       result.forEach(entry -> {
-        assertEquals(entry.getSecond(), recordMap.get(entry.getFirst()));
         if (populateMetaFields && testAvroWithMeta) {
-          assertNotNull(entry.getSecond().get(HoodieRecord.RECORD_KEY_METADATA_FIELD));
+          assertNotNull(entry.get(HoodieRecord.RECORD_KEY_METADATA_FIELD));
         } else {
-          assertNull(entry.getSecond().get(HoodieRecord.RECORD_KEY_METADATA_FIELD));
+          assertNull(entry.get(HoodieRecord.RECORD_KEY_METADATA_FIELD));
         }
       });
       hoodieHFileReader.close();
@@ -202,7 +211,7 @@ public class TestHoodieHFileReaderWriter extends TestHoodieReaderWriterBase {
         fs.open(getFilePath()), (int) fs.getFileStatus(getFilePath()).getLen());
     // Reading byte array in HFile format, without actual file path
     HoodieHFileReader<GenericRecord> hfileReader =
-        new HoodieHFileReader<>(fs, new Path(DUMMY_BASE_PATH), content);
+        new HoodieHFileReader<>(fs, new Path(DUMMY_BASE_PATH), content, Option.empty());
     Schema avroSchema = getSchemaFromResource(TestHoodieReaderWriterBase.class, "/exampleSchema.avsc");
     assertEquals(NUM_RECORDS, hfileReader.getTotalRecords());
     verifySimpleRecords(hfileReader.getRecordIterator(avroSchema));
@@ -217,7 +226,7 @@ public class TestHoodieHFileReaderWriter extends TestHoodieReaderWriterBase {
         IntStream.concat(IntStream.range(40, NUM_RECORDS * 2), IntStream.range(10, 20))
             .mapToObj(i -> "key" + String.format("%02d", i)).collect(Collectors.toList());
     Schema avroSchema = getSchemaFromResource(TestHoodieReaderWriterBase.class, "/exampleSchema.avsc");
-    Iterator<GenericRecord> iterator = hfileReader.getRecordIterator(keys, avroSchema);
+    Iterator<GenericRecord> iterator = hfileReader.getRecordsByKeysIterator(keys, avroSchema);
 
     List<Integer> expectedIds =
         IntStream.concat(IntStream.range(40, NUM_RECORDS), IntStream.range(10, 20))
@@ -233,6 +242,59 @@ public class TestHoodieHFileReaderWriter extends TestHoodieReaderWriterBase {
     }
   }
 
+  @Test
+  public void testReaderGetRecordIteratorByKeyPrefixes() throws Exception {
+    writeFileWithSimpleSchema();
+    HoodieHFileReader<GenericRecord> hfileReader =
+        (HoodieHFileReader<GenericRecord>) createReader(new Configuration());
+
+    Schema avroSchema = getSchemaFromResource(TestHoodieReaderWriterBase.class, "/exampleSchema.avsc");
+
+    List<String> keyPrefixes = Collections.singletonList("key");
+    Iterator<GenericRecord> iterator =
+        hfileReader.getRecordsByKeyPrefixIterator(keyPrefixes, avroSchema);
+
+    List<GenericRecord> recordsByPrefix = toStream(iterator).collect(Collectors.toList());
+
+    List<GenericRecord> allRecords = toStream(hfileReader.getRecordIterator()).collect(Collectors.toList());
+
+    assertEquals(allRecords, recordsByPrefix);
+
+    // filter for "key1" : entries from key10 to key19 should be matched
+    List<GenericRecord> expectedKey1s = allRecords.stream().filter(entry -> (entry.get("_row_key").toString()).contains("key1")).collect(Collectors.toList());
+    iterator =
+        hfileReader.getRecordsByKeyPrefixIterator(Collections.singletonList("key1"), avroSchema);
+    recordsByPrefix =
+        StreamSupport.stream(Spliterators.spliteratorUnknownSize(iterator, Spliterator.ORDERED), false)
+            .collect(Collectors.toList());
+    assertEquals(expectedKey1s, recordsByPrefix);
+
+    // exact match
+    List<GenericRecord> expectedKey25 = allRecords.stream().filter(entry -> (entry.get("_row_key").toString()).contains("key25")).collect(Collectors.toList());
+    iterator =
+        hfileReader.getRecordsByKeyPrefixIterator(Collections.singletonList("key25"), avroSchema);
+    recordsByPrefix =
+        StreamSupport.stream(Spliterators.spliteratorUnknownSize(iterator, Spliterator.ORDERED), false)
+            .collect(Collectors.toList());
+    assertEquals(expectedKey25, recordsByPrefix);
+
+    // no match. key prefix is beyond entries in file.
+    iterator =
+        hfileReader.getRecordsByKeyPrefixIterator(Collections.singletonList("key99"), avroSchema);
+    recordsByPrefix =
+        StreamSupport.stream(Spliterators.spliteratorUnknownSize(iterator, Spliterator.ORDERED), false)
+            .collect(Collectors.toList());
+    assertEquals(Collections.emptyList(), recordsByPrefix);
+
+    // no match. but keyPrefix is in between the entries found in file.
+    iterator =
+        hfileReader.getRecordsByKeyPrefixIterator(Collections.singletonList("key1234"), avroSchema);
+    recordsByPrefix =
+        StreamSupport.stream(Spliterators.spliteratorUnknownSize(iterator, Spliterator.ORDERED), false)
+            .collect(Collectors.toList());
+    assertEquals(Collections.emptyList(), recordsByPrefix);
+  }
+
   @ParameterizedTest
   @ValueSource(strings = {
       "/hudi_0_9_hbase_1_2_3", "/hudi_0_10_hbase_1_2_3", "/hudi_0_11_hbase_2_4_9"})
@@ -253,7 +315,7 @@ public class TestHoodieHFileReaderWriter extends TestHoodieReaderWriterBase {
         HoodieHFileUtils.createHFileReader(fs, new Path(DUMMY_BASE_PATH), content),
         hfilePrefix, true, HFILE_COMPARATOR.getClass(), NUM_RECORDS_FIXTURE);
     HoodieHFileReader<GenericRecord> hfileReader =
-        new HoodieHFileReader<>(fs, new Path(DUMMY_BASE_PATH), content);
+        new HoodieHFileReader<>(fs, new Path(DUMMY_BASE_PATH), content, Option.empty());
     Schema avroSchema = getSchemaFromResource(TestHoodieReaderWriterBase.class, "/exampleSchema.avsc");
     assertEquals(NUM_RECORDS_FIXTURE, hfileReader.getTotalRecords());
     verifySimpleRecords(hfileReader.getRecordIterator(avroSchema));
@@ -261,7 +323,7 @@ public class TestHoodieHFileReaderWriter extends TestHoodieReaderWriterBase {
     content = readHFileFromResources(complexHFile);
     verifyHFileReader(HoodieHFileUtils.createHFileReader(fs, new Path(DUMMY_BASE_PATH), content),
         hfilePrefix, true, HFILE_COMPARATOR.getClass(), NUM_RECORDS_FIXTURE);
-    hfileReader = new HoodieHFileReader<>(fs, new Path(DUMMY_BASE_PATH), content);
+    hfileReader = new HoodieHFileReader<>(fs, new Path(DUMMY_BASE_PATH), content, Option.empty());
     avroSchema = getSchemaFromResource(TestHoodieReaderWriterBase.class, "/exampleSchemaWithUDT.avsc");
     assertEquals(NUM_RECORDS_FIXTURE, hfileReader.getTotalRecords());
     verifySimpleRecords(hfileReader.getRecordIterator(avroSchema));
diff --git a/hudi-client/hudi-client-common/src/test/java/org/apache/hudi/io/storage/TestHoodieReaderWriterBase.java b/hudi-client/hudi-client-common/src/test/java/org/apache/hudi/io/storage/TestHoodieReaderWriterBase.java
index 19f9b93851..4617eb93a6 100644
--- a/hudi-client/hudi-client-common/src/test/java/org/apache/hudi/io/storage/TestHoodieReaderWriterBase.java
+++ b/hudi-client/hudi-client-common/src/test/java/org/apache/hudi/io/storage/TestHoodieReaderWriterBase.java
@@ -38,6 +38,7 @@ import java.util.Iterator;
 import java.util.List;
 import java.util.Map;
 import java.util.Set;
+import java.util.TreeSet;
 import java.util.stream.Collectors;
 import java.util.stream.IntStream;
 
@@ -217,7 +218,7 @@ public abstract class TestHoodieReaderWriterBase {
 
   private void verifyFilterRowKeys(HoodieFileReader<GenericRecord> hoodieReader) {
     Set<String> candidateRowKeys = IntStream.range(40, NUM_RECORDS * 2)
-        .mapToObj(i -> "key" + String.format("%02d", i)).collect(Collectors.toSet());
+        .mapToObj(i -> "key" + String.format("%02d", i)).collect(Collectors.toCollection(TreeSet::new));
     List<String> expectedKeys = IntStream.range(40, NUM_RECORDS)
         .mapToObj(i -> "key" + String.format("%02d", i)).sorted().collect(Collectors.toList());
     assertEquals(expectedKeys, hoodieReader.filterRowKeys(candidateRowKeys)
diff --git a/hudi-client/hudi-spark-client/src/main/scala/org/apache/hudi/HoodieConversionUtils.scala b/hudi-client/hudi-spark-client/src/main/scala/org/apache/hudi/HoodieConversionUtils.scala
index eaaf82182a..547c6aed62 100644
--- a/hudi-client/hudi-spark-client/src/main/scala/org/apache/hudi/HoodieConversionUtils.scala
+++ b/hudi-client/hudi-spark-client/src/main/scala/org/apache/hudi/HoodieConversionUtils.scala
@@ -18,6 +18,8 @@
 
 package org.apache.hudi
 
+import org.apache.hudi.common.config.TypedProperties
+
 object HoodieConversionUtils {
 
   def toJavaOption[T](opt: Option[T]): org.apache.hudi.common.util.Option[T] =
@@ -26,4 +28,10 @@ object HoodieConversionUtils {
   def toScalaOption[T](opt: org.apache.hudi.common.util.Option[T]): Option[T] =
     if (opt.isPresent) Some(opt.get) else None
 
+  def toProperties(params: Map[String, String]): TypedProperties = {
+    val props = new TypedProperties()
+    params.foreach(kv => props.setProperty(kv._1, kv._2))
+    props
+  }
+
 }
diff --git a/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/client/functional/TestHoodieBackedMetadata.java b/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/client/functional/TestHoodieBackedMetadata.java
index 35ae6da1cb..aebf046fe5 100644
--- a/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/client/functional/TestHoodieBackedMetadata.java
+++ b/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/client/functional/TestHoodieBackedMetadata.java
@@ -20,6 +20,7 @@ package org.apache.hudi.client.functional;
 
 import org.apache.hudi.avro.HoodieAvroUtils;
 import org.apache.hudi.avro.model.HoodieCleanMetadata;
+import org.apache.hudi.avro.model.HoodieMetadataColumnStats;
 import org.apache.hudi.avro.model.HoodieMetadataRecord;
 import org.apache.hudi.client.SparkRDDWriteClient;
 import org.apache.hudi.client.WriteStatus;
@@ -32,6 +33,7 @@ import org.apache.hudi.common.fs.ConsistencyGuardConfig;
 import org.apache.hudi.common.fs.FSUtils;
 import org.apache.hudi.common.metrics.Registry;
 import org.apache.hudi.common.model.FileSlice;
+import org.apache.hudi.common.model.HoodieAvroRecord;
 import org.apache.hudi.common.model.HoodieBaseFile;
 import org.apache.hudi.common.model.HoodieCleaningPolicy;
 import org.apache.hudi.common.model.HoodieCommitMetadata;
@@ -44,6 +46,7 @@ import org.apache.hudi.common.model.HoodieLogFile;
 import org.apache.hudi.common.model.HoodieRecord;
 import org.apache.hudi.common.model.HoodieRecordPayload;
 import org.apache.hudi.common.model.HoodieTableType;
+import org.apache.hudi.common.model.HoodieWriteStat;
 import org.apache.hudi.common.model.WriteConcurrencyMode;
 import org.apache.hudi.common.table.HoodieTableConfig;
 import org.apache.hudi.common.table.HoodieTableMetaClient;
@@ -69,6 +72,8 @@ import org.apache.hudi.common.util.HoodieTimer;
 import org.apache.hudi.common.util.Option;
 import org.apache.hudi.common.util.StringUtils;
 import org.apache.hudi.common.util.collection.ExternalSpillableMap;
+import org.apache.hudi.common.util.hash.ColumnIndexID;
+import org.apache.hudi.common.util.hash.PartitionIndexID;
 import org.apache.hudi.config.HoodieClusteringConfig;
 import org.apache.hudi.config.HoodieCompactionConfig;
 import org.apache.hudi.config.HoodieIndexConfig;
@@ -84,6 +89,7 @@ import org.apache.hudi.metadata.HoodieMetadataMergedLogRecordReader;
 import org.apache.hudi.metadata.HoodieMetadataMetrics;
 import org.apache.hudi.metadata.HoodieMetadataPayload;
 import org.apache.hudi.metadata.HoodieTableMetadata;
+import org.apache.hudi.metadata.HoodieTableMetadataUtil;
 import org.apache.hudi.metadata.MetadataPartitionType;
 import org.apache.hudi.metadata.SparkHoodieBackedTableMetadataWriter;
 import org.apache.hudi.table.HoodieSparkTable;
@@ -100,7 +106,6 @@ import org.apache.hadoop.fs.FSDataOutputStream;
 import org.apache.hadoop.fs.FileStatus;
 import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.hbase.io.hfile.CacheConfig;
-import org.apache.hadoop.hbase.util.Pair;
 import org.apache.hadoop.util.Time;
 import org.apache.log4j.LogManager;
 import org.apache.log4j.Logger;
@@ -736,12 +741,12 @@ public class TestHoodieBackedMetadata extends TestHoodieMetadataBase {
     HoodieBaseFile baseFile = fileSlices.get(0).getBaseFile().get();
     HoodieHFileReader hoodieHFileReader = new HoodieHFileReader(context.getHadoopConf().get(), new Path(baseFile.getPath()),
         new CacheConfig(context.getHadoopConf().get()));
-    List<Pair<String, IndexedRecord>> records = hoodieHFileReader.readAllRecords();
+    List<IndexedRecord> records = HoodieHFileReader.readAllRecords(hoodieHFileReader);
     records.forEach(entry -> {
       if (populateMetaFields) {
-        assertNotNull(((GenericRecord) entry.getSecond()).get(HoodieRecord.RECORD_KEY_METADATA_FIELD));
+        assertNotNull(((GenericRecord) entry).get(HoodieRecord.RECORD_KEY_METADATA_FIELD));
       } else {
-        assertNull(((GenericRecord) entry.getSecond()).get(HoodieRecord.RECORD_KEY_METADATA_FIELD));
+        assertNull(((GenericRecord) entry).get(HoodieRecord.RECORD_KEY_METADATA_FIELD));
       }
     });
   }
@@ -977,12 +982,11 @@ public class TestHoodieBackedMetadata extends TestHoodieMetadataBase {
       }
 
       Schema writerSchema = new AvroSchemaConverter().convert(writerSchemaMsg);
-
       try (HoodieLogFormat.Reader logFileReader = HoodieLogFormat.newReader(fs, new HoodieLogFile(fsStatus[0].getPath()), writerSchema)) {
         while (logFileReader.hasNext()) {
           HoodieLogBlock logBlock = logFileReader.next();
           if (logBlock instanceof HoodieDataBlock) {
-            try (ClosableIterator<IndexedRecord> recordItr = ((HoodieDataBlock) logBlock).getRecordItr()) {
+            try (ClosableIterator<IndexedRecord> recordItr = ((HoodieDataBlock) logBlock).getRecordIterator()) {
               recordItr.forEachRemaining(indexRecord -> {
                 final GenericRecord record = (GenericRecord) indexRecord;
                 if (enableMetaFields) {
@@ -1068,15 +1072,15 @@ public class TestHoodieBackedMetadata extends TestHoodieMetadataBase {
     HoodieHFileReader hoodieHFileReader = new HoodieHFileReader(context.getHadoopConf().get(),
         new Path(baseFile.getPath()),
         new CacheConfig(context.getHadoopConf().get()));
-    List<Pair<String, IndexedRecord>> records = hoodieHFileReader.readAllRecords();
+    List<IndexedRecord> records = HoodieHFileReader.readAllRecords(hoodieHFileReader);
     records.forEach(entry -> {
       if (enableMetaFields) {
-        assertNotNull(((GenericRecord) entry.getSecond()).get(HoodieRecord.RECORD_KEY_METADATA_FIELD));
+        assertNotNull(((GenericRecord) entry).get(HoodieRecord.RECORD_KEY_METADATA_FIELD));
       } else {
-        assertNull(((GenericRecord) entry.getSecond()).get(HoodieRecord.RECORD_KEY_METADATA_FIELD));
+        assertNull(((GenericRecord) entry).get(HoodieRecord.RECORD_KEY_METADATA_FIELD));
       }
 
-      final String keyInPayload = (String) ((GenericRecord) entry.getSecond())
+      final String keyInPayload = (String) ((GenericRecord) entry)
           .get(HoodieMetadataPayload.KEY_FIELD_NAME);
       assertFalse(keyInPayload.isEmpty());
     });
@@ -1383,6 +1387,139 @@ public class TestHoodieBackedMetadata extends TestHoodieMetadataBase {
     testTableOperationsImpl(engineContext, writeConfig);
   }
 
+  @Test
+  public void testColStatsPrefixLookup() throws IOException {
+    this.tableType = COPY_ON_WRITE;
+    initPath();
+    initSparkContexts("TestHoodieMetadata");
+    initFileSystem();
+    fs.mkdirs(new Path(basePath));
+    initTimelineService();
+    initMetaClient(tableType);
+    initTestDataGenerator();
+    metadataTableBasePath = HoodieTableMetadata.getMetadataTableBasePath(basePath);
+
+    HoodieSparkEngineContext engineContext = new HoodieSparkEngineContext(jsc);
+    // disable small file handling so that every insert goes to a new file group.
+    HoodieWriteConfig writeConfig = getWriteConfigBuilder(true, true, false)
+        .withRollbackUsingMarkers(false)
+        .withCompactionConfig(HoodieCompactionConfig.newBuilder().compactionSmallFileSize(0)
+            .withInlineCompaction(false).withMaxNumDeltaCommitsBeforeCompaction(1)
+            .withFailedWritesCleaningPolicy(HoodieFailedWritesCleaningPolicy.EAGER)
+            .withAutoClean(false).retainCommits(1).retainFileVersions(1).build())
+        .withMetadataConfig(HoodieMetadataConfig.newBuilder()
+            .enable(true)
+            .withMetadataIndexColumnStats(true)
+            .enableFullScan(false)
+            .build())
+        .build();
+
+    try (SparkRDDWriteClient client = new SparkRDDWriteClient(engineContext, writeConfig)) {
+
+      String firstCommit = "0000001";
+      List<HoodieRecord> records = dataGen.generateInserts(firstCommit, 20);
+
+      AtomicInteger counter = new AtomicInteger();
+      List<HoodieRecord> processedRecords = records.stream().map(entry ->
+              new HoodieAvroRecord(new HoodieKey("key1_" + counter.getAndIncrement(), entry.getPartitionPath()), (HoodieRecordPayload) entry.getData()))
+          .collect(Collectors.toList());
+
+      client.startCommitWithTime(firstCommit);
+      List<WriteStatus> writeStatuses = client.insert(jsc.parallelize(processedRecords, 1), firstCommit).collect();
+      assertNoWriteErrors(writeStatuses);
+
+      // Write 2 (inserts)
+      String secondCommit = "0000002";
+      client.startCommitWithTime(secondCommit);
+      records = dataGen.generateInserts(secondCommit, 20);
+      AtomicInteger counter1 = new AtomicInteger();
+      processedRecords = records.stream().map(entry ->
+              new HoodieAvroRecord(new HoodieKey("key2_" + counter1.getAndIncrement(), entry.getPartitionPath()), (HoodieRecordPayload) entry.getData()))
+          .collect(Collectors.toList());
+      writeStatuses = client.insert(jsc.parallelize(processedRecords, 1), secondCommit).collect();
+      assertNoWriteErrors(writeStatuses);
+
+      Map<String, Map<String, List<String>>> commitToPartitionsToFiles = new HashMap<>();
+      // populate commit -> partition -> file info to assist in validation and prefi
+      metaClient.getActiveTimeline().getInstants().forEach(entry -> {
+        try {
+          HoodieCommitMetadata commitMetadata = HoodieCommitMetadata
+              .fromBytes(metaClient.getActiveTimeline().getInstantDetails(entry).get(), HoodieCommitMetadata.class);
+          String commitTime = entry.getTimestamp();
+          if (!commitToPartitionsToFiles.containsKey(commitTime)) {
+            commitToPartitionsToFiles.put(commitTime, new HashMap<>());
+          }
+          commitMetadata.getPartitionToWriteStats().entrySet()
+              .stream()
+              .forEach(partitionWriteStat -> {
+                String partitionStatName = partitionWriteStat.getKey();
+                List<HoodieWriteStat> writeStats = partitionWriteStat.getValue();
+                String partition = HoodieTableMetadataUtil.getPartition(partitionStatName);
+                if (!commitToPartitionsToFiles.get(commitTime).containsKey(partition)) {
+                  commitToPartitionsToFiles.get(commitTime).put(partition, new ArrayList<>());
+                }
+                writeStats.forEach(writeStat -> commitToPartitionsToFiles.get(commitTime).get(partition).add(writeStat.getPath()));
+              });
+        } catch (IOException e) {
+          e.printStackTrace();
+        }
+      });
+
+      HoodieTableMetadata tableMetadata = metadata(client);
+      // prefix search for column (_hoodie_record_key)
+      ColumnIndexID columnIndexID = new ColumnIndexID(HoodieRecord.RECORD_KEY_METADATA_FIELD);
+      List<HoodieRecord<HoodieMetadataPayload>> result = tableMetadata.getRecordsByKeyPrefixes(Collections.singletonList(columnIndexID.asBase64EncodedString()),
+          MetadataPartitionType.COLUMN_STATS.getPartitionPath()).collectAsList();
+
+      // there are 3 partitions in total and 2 commits. total entries should be 6.
+      assertEquals(result.size(), 6);
+      result.forEach(entry -> {
+        //LOG.warn("Prefix search entries just for record key col : " + entry.getRecordKey().toString() + " :: " + entry.getData().getColumnStatMetadata().get().toString());
+      });
+
+      // prefix search for col(_hoodie_record_key) and first partition. only 2 files should be matched
+      PartitionIndexID partitionIndexID = new PartitionIndexID(HoodieTestDataGenerator.DEFAULT_FIRST_PARTITION_PATH);
+      result = tableMetadata.getRecordsByKeyPrefixes(Collections.singletonList(columnIndexID.asBase64EncodedString().concat(partitionIndexID.asBase64EncodedString())),
+          MetadataPartitionType.COLUMN_STATS.getPartitionPath()).collectAsList();
+      // 1 partition and 2 commits. total entries should be 2.
+      assertEquals(result.size(), 2);
+      result.forEach(entry -> {
+        // LOG.warn("Prefix search entries for record key col and first partition : " + entry.getRecordKey().toString() + " :: " + entry.getData().getColumnStatMetadata().get().toString());
+        HoodieMetadataColumnStats metadataColumnStats = entry.getData().getColumnStatMetadata().get();
+        String fileName = metadataColumnStats.getFileName();
+        if (fileName.contains(firstCommit)) {
+          assertTrue(commitToPartitionsToFiles.get(firstCommit).get(HoodieTestDataGenerator.DEFAULT_FIRST_PARTITION_PATH)
+              .contains(HoodieTestDataGenerator.DEFAULT_FIRST_PARTITION_PATH + "/" + fileName));
+        } else {
+          assertTrue(commitToPartitionsToFiles.get(secondCommit).get(HoodieTestDataGenerator.DEFAULT_FIRST_PARTITION_PATH)
+              .contains(HoodieTestDataGenerator.DEFAULT_FIRST_PARTITION_PATH + "/" + fileName));
+        }
+      });
+
+      // prefix search for column {commit time} and first partition
+      columnIndexID = new ColumnIndexID(HoodieRecord.COMMIT_TIME_METADATA_FIELD);
+      result = tableMetadata.getRecordsByKeyPrefixes(Collections.singletonList(columnIndexID.asBase64EncodedString().concat(partitionIndexID.asBase64EncodedString())),
+          MetadataPartitionType.COLUMN_STATS.getPartitionPath()).collectAsList();
+
+      // 1 partition and 2 commits. total entries should be 2.
+      assertEquals(result.size(), 2);
+      result.forEach(entry -> {
+        // LOG.warn("Prefix search entries for record key col and first partition : " + entry.getRecordKey().toString() + " :: " + entry.getData().getColumnStatMetadata().get().toString());
+        HoodieMetadataColumnStats metadataColumnStats = entry.getData().getColumnStatMetadata().get();
+        // for commit time column, min max should be the same since we disable small files, every commit will create a new file
+        assertEquals(metadataColumnStats.getMinValue(), metadataColumnStats.getMaxValue());
+        String fileName = metadataColumnStats.getFileName();
+        if (fileName.contains(firstCommit)) {
+          assertTrue(commitToPartitionsToFiles.get(firstCommit).get(HoodieTestDataGenerator.DEFAULT_FIRST_PARTITION_PATH)
+              .contains(HoodieTestDataGenerator.DEFAULT_FIRST_PARTITION_PATH + "/" + fileName));
+        } else {
+          assertTrue(commitToPartitionsToFiles.get(secondCommit).get(HoodieTestDataGenerator.DEFAULT_FIRST_PARTITION_PATH)
+              .contains(HoodieTestDataGenerator.DEFAULT_FIRST_PARTITION_PATH + "/" + fileName));
+        }
+      });
+    }
+  }
+
   /**
    * Test all major table operations with the given table, config and context.
    *
@@ -1476,8 +1613,8 @@ public class TestHoodieBackedMetadata extends TestHoodieMetadataBase {
 
     Properties properties = new Properties();
     properties.setProperty(FILESYSTEM_LOCK_PATH_PROP_KEY, basePath + "/.hoodie/.locks");
-    properties.setProperty(LockConfiguration.LOCK_ACQUIRE_WAIT_TIMEOUT_MS_PROP_KEY,"1000");
-    properties.setProperty(LockConfiguration.LOCK_ACQUIRE_CLIENT_NUM_RETRIES_PROP_KEY,"20");
+    properties.setProperty(LockConfiguration.LOCK_ACQUIRE_WAIT_TIMEOUT_MS_PROP_KEY, "1000");
+    properties.setProperty(LockConfiguration.LOCK_ACQUIRE_CLIENT_NUM_RETRIES_PROP_KEY, "20");
     HoodieWriteConfig writeConfig = getWriteConfigBuilder(true, true, false)
         .withCompactionConfig(HoodieCompactionConfig.newBuilder()
             .withFailedWritesCleaningPolicy(HoodieFailedWritesCleaningPolicy.LAZY).withAutoClean(false).build())
@@ -1540,7 +1677,7 @@ public class TestHoodieBackedMetadata extends TestHoodieMetadataBase {
 
     Properties properties = new Properties();
     properties.setProperty(FILESYSTEM_LOCK_PATH_PROP_KEY, basePath + "/.hoodie/.locks");
-    properties.setProperty(LockConfiguration.LOCK_ACQUIRE_WAIT_TIMEOUT_MS_PROP_KEY,"3000");
+    properties.setProperty(LockConfiguration.LOCK_ACQUIRE_WAIT_TIMEOUT_MS_PROP_KEY, "3000");
 
     HoodieWriteConfig writeConfig = getWriteConfigBuilder(true, true, false)
         .withCompactionConfig(HoodieCompactionConfig.newBuilder()
@@ -1871,7 +2008,7 @@ public class TestHoodieBackedMetadata extends TestHoodieMetadataBase {
     Properties properties = new Properties();
     properties.setProperty(FILESYSTEM_LOCK_PATH_PROP_KEY, basePath + "/.hoodie/.locks");
     properties.setProperty(LockConfiguration.LOCK_ACQUIRE_CLIENT_NUM_RETRIES_PROP_KEY, "3");
-    properties.setProperty(LockConfiguration.LOCK_ACQUIRE_WAIT_TIMEOUT_MS_PROP_KEY,"3000");
+    properties.setProperty(LockConfiguration.LOCK_ACQUIRE_WAIT_TIMEOUT_MS_PROP_KEY, "3000");
     HoodieWriteConfig writeConfig = getWriteConfigBuilder(false, true, false)
         .withCompactionConfig(HoodieCompactionConfig.newBuilder()
             .withFailedWritesCleaningPolicy(HoodieFailedWritesCleaningPolicy.LAZY).withAutoClean(false).build())
@@ -2364,7 +2501,7 @@ public class TestHoodieBackedMetadata extends TestHoodieMetadataBase {
         while (logFileReader.hasNext()) {
           HoodieLogBlock logBlock = logFileReader.next();
           if (logBlock instanceof HoodieDataBlock) {
-            try (ClosableIterator<IndexedRecord> recordItr = ((HoodieDataBlock) logBlock).getRecordItr()) {
+            try (ClosableIterator<IndexedRecord> recordItr = ((HoodieDataBlock) logBlock).getRecordIterator()) {
               recordItr.forEachRemaining(indexRecord -> {
                 final GenericRecord record = (GenericRecord) indexRecord;
                 final GenericRecord colStatsRecord = (GenericRecord) record.get(HoodieMetadataPayload.SCHEMA_FIELD_ID_COLUMN_STATS);
diff --git a/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/client/functional/TestHoodieBackedTableMetadata.java b/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/client/functional/TestHoodieBackedTableMetadata.java
index 323724a4fe..9a8fc55a20 100644
--- a/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/client/functional/TestHoodieBackedTableMetadata.java
+++ b/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/client/functional/TestHoodieBackedTableMetadata.java
@@ -21,9 +21,9 @@ package org.apache.hudi.client.functional;
 import org.apache.avro.Schema;
 import org.apache.avro.generic.GenericRecord;
 import org.apache.avro.generic.IndexedRecord;
+import org.apache.hadoop.fs.FileStatus;
 import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.hbase.io.hfile.CacheConfig;
-import org.apache.hadoop.hbase.util.Pair;
 import org.apache.hudi.avro.HoodieAvroUtils;
 import org.apache.hudi.avro.model.HoodieMetadataRecord;
 import org.apache.hudi.common.config.HoodieMetadataConfig;
@@ -51,8 +51,6 @@ import org.apache.hudi.metadata.HoodieTableMetadataKeyGenerator;
 import org.apache.hudi.metadata.MetadataPartitionType;
 import org.apache.hudi.table.HoodieSparkTable;
 import org.apache.hudi.table.HoodieTable;
-
-import org.apache.hadoop.fs.FileStatus;
 import org.apache.log4j.LogManager;
 import org.apache.log4j.Logger;
 import org.apache.parquet.avro.AvroSchemaConverter;
@@ -292,7 +290,7 @@ public class TestHoodieBackedTableMetadata extends TestHoodieMetadataBase {
         while (logFileReader.hasNext()) {
           HoodieLogBlock logBlock = logFileReader.next();
           if (logBlock instanceof HoodieDataBlock) {
-            try (ClosableIterator<IndexedRecord> recordItr = ((HoodieDataBlock) logBlock).getRecordItr()) {
+            try (ClosableIterator<IndexedRecord> recordItr = ((HoodieDataBlock) logBlock).getRecordIterator()) {
               recordItr.forEachRemaining(indexRecord -> {
                 final GenericRecord record = (GenericRecord) indexRecord;
                 assertNull(record.get(HoodieRecord.RECORD_KEY_METADATA_FIELD));
@@ -361,10 +359,10 @@ public class TestHoodieBackedTableMetadata extends TestHoodieMetadataBase {
     HoodieHFileReader hoodieHFileReader = new HoodieHFileReader(context.getHadoopConf().get(),
         new Path(baseFile.getPath()),
         new CacheConfig(context.getHadoopConf().get()));
-    List<Pair<String, IndexedRecord>> records = hoodieHFileReader.readAllRecords();
+    List<IndexedRecord> records = HoodieHFileReader.readAllRecords(hoodieHFileReader);
     records.forEach(entry -> {
-      assertNull(((GenericRecord) entry.getSecond()).get(HoodieRecord.RECORD_KEY_METADATA_FIELD));
-      final String keyInPayload = (String) ((GenericRecord) entry.getSecond())
+      assertNull(((GenericRecord) entry).get(HoodieRecord.RECORD_KEY_METADATA_FIELD));
+      final String keyInPayload = (String) ((GenericRecord) entry)
           .get(HoodieMetadataPayload.KEY_FIELD_NAME);
       assertFalse(keyInPayload.isEmpty());
     });
diff --git a/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/testutils/HoodieClientTestUtils.java b/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/testutils/HoodieClientTestUtils.java
index c1f05f9c99..75d2d14221 100644
--- a/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/testutils/HoodieClientTestUtils.java
+++ b/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/testutils/HoodieClientTestUtils.java
@@ -67,7 +67,7 @@ import java.util.List;
 import java.util.stream.Collectors;
 import java.util.stream.Stream;
 
-import static org.apache.hudi.io.storage.HoodieHFileReader.KEY_SCHEMA;
+import static org.apache.hudi.io.storage.HoodieHFileReader.SCHEMA_KEY;
 
 /**
  * Utility methods to aid testing inside the HoodieClient module.
@@ -247,7 +247,7 @@ public class HoodieClientTestUtils {
         HFile.Reader reader =
             HoodieHFileUtils.createHFileReader(fs, new Path(path), cacheConfig, fs.getConf());
         if (schema == null) {
-          schema = new Schema.Parser().parse(new String(reader.getHFileInfo().get(KEY_SCHEMA.getBytes())));
+          schema = new Schema.Parser().parse(new String(reader.getHFileInfo().get(SCHEMA_KEY.getBytes())));
         }
         HFileScanner scanner = reader.getScanner(false, false);
         if (!scanner.seekTo()) {
diff --git a/hudi-common/src/main/java/org/apache/hudi/BaseHoodieTableFileIndex.java b/hudi-common/src/main/java/org/apache/hudi/BaseHoodieTableFileIndex.java
index 8c88bfb001..ec70653b9c 100644
--- a/hudi-common/src/main/java/org/apache/hudi/BaseHoodieTableFileIndex.java
+++ b/hudi-common/src/main/java/org/apache/hudi/BaseHoodieTableFileIndex.java
@@ -69,7 +69,7 @@ public abstract class BaseHoodieTableFileIndex {
   private final String[] partitionColumns;
 
   private final FileSystemViewStorageConfig fileSystemStorageConfig;
-  private final HoodieMetadataConfig metadataConfig;
+  protected final HoodieMetadataConfig metadataConfig;
 
   private final HoodieTableQueryType queryType;
   private final Option<String> specifiedQueryInstant;
diff --git a/hudi-common/src/main/java/org/apache/hudi/common/config/HoodieMetadataConfig.java b/hudi-common/src/main/java/org/apache/hudi/common/config/HoodieMetadataConfig.java
index 9ff3fd57d2..c5753cc3da 100644
--- a/hudi-common/src/main/java/org/apache/hudi/common/config/HoodieMetadataConfig.java
+++ b/hudi-common/src/main/java/org/apache/hudi/common/config/HoodieMetadataConfig.java
@@ -289,8 +289,8 @@ public final class HoodieMetadataConfig extends HoodieConfig {
     return getString(DIR_FILTER_REGEX);
   }
 
-  public boolean enableFullScan() {
-    return getBoolean(ENABLE_FULL_SCAN_LOG_FILES);
+  public boolean allowFullScan() {
+    return getBooleanOrDefault(ENABLE_FULL_SCAN_LOG_FILES);
   }
 
   public boolean populateMetaFields() {
diff --git a/hudi-common/src/main/java/org/apache/hudi/common/table/log/AbstractHoodieLogRecordReader.java b/hudi-common/src/main/java/org/apache/hudi/common/table/log/AbstractHoodieLogRecordReader.java
index bf5ab9fd0f..9e56083b26 100644
--- a/hudi-common/src/main/java/org/apache/hudi/common/table/log/AbstractHoodieLogRecordReader.java
+++ b/hudi-common/src/main/java/org/apache/hudi/common/table/log/AbstractHoodieLogRecordReader.java
@@ -57,7 +57,6 @@ import org.apache.log4j.Logger;
 import java.io.IOException;
 import java.util.ArrayDeque;
 import java.util.Arrays;
-import java.util.Collections;
 import java.util.Deque;
 import java.util.HashSet;
 import java.util.List;
@@ -129,7 +128,7 @@ public abstract class AbstractHoodieLogRecordReader {
   // Store the last instant log blocks (needed to implement rollback)
   private Deque<HoodieLogBlock> currentInstantLogBlocks = new ArrayDeque<>();
   // Enables full scan of log records
-  protected final boolean enableFullScan;
+  protected final boolean forceFullScan;
   private int totalScannedLogFiles;
   // Progress
   private float progress = 0.0f;
@@ -150,7 +149,7 @@ public abstract class AbstractHoodieLogRecordReader {
   protected AbstractHoodieLogRecordReader(FileSystem fs, String basePath, List<String> logFilePaths,
                                           Schema readerSchema, String latestInstantTime, boolean readBlocksLazily,
                                           boolean reverseReader, int bufferSize, Option<InstantRange> instantRange,
-                                          boolean withOperationField, boolean enableFullScan,
+                                          boolean withOperationField, boolean forceFullScan,
                                           Option<String> partitionName, InternalSchema internalSchema) {
     this.readerSchema = readerSchema;
     this.latestInstantTime = latestInstantTime;
@@ -167,7 +166,7 @@ public abstract class AbstractHoodieLogRecordReader {
     this.bufferSize = bufferSize;
     this.instantRange = instantRange;
     this.withOperationField = withOperationField;
-    this.enableFullScan = enableFullScan;
+    this.forceFullScan = forceFullScan;
     this.internalSchema = internalSchema == null ? InternalSchema.getEmptyInternalSchema() : internalSchema;
     this.path = basePath;
 
@@ -189,10 +188,14 @@ public abstract class AbstractHoodieLogRecordReader {
   }
 
   public synchronized void scan() {
-    scan(Option.empty());
+    scanInternal(Option.empty());
   }
 
-  public synchronized void scan(Option<List<String>> keys) {
+  public synchronized void scan(List<String> keys) {
+    scanInternal(Option.of(new KeySpec(keys, true)));
+  }
+
+  protected synchronized void scanInternal(Option<KeySpec> keySpecOpt) {
     currentInstantLogBlocks = new ArrayDeque<>();
     progress = 0.0f;
     totalLogFiles = new AtomicLong(0);
@@ -205,15 +208,16 @@ public abstract class AbstractHoodieLogRecordReader {
     HoodieTimeline completedInstantsTimeline = commitsTimeline.filterCompletedInstants();
     HoodieTimeline inflightInstantsTimeline = commitsTimeline.filterInflights();
     try {
-
       // Get the key field based on populate meta fields config
       // and the table type
       final String keyField = getKeyField();
 
       // Iterate over the paths
+      boolean enableRecordLookups = !forceFullScan;
       logFormatReaderWrapper = new HoodieLogFormatReader(fs,
           logFilePaths.stream().map(logFile -> new HoodieLogFile(new Path(logFile))).collect(Collectors.toList()),
-          readerSchema, readBlocksLazily, reverseReader, bufferSize, !enableFullScan, keyField, internalSchema);
+          readerSchema, readBlocksLazily, reverseReader, bufferSize, enableRecordLookups, keyField, internalSchema);
+
       Set<HoodieLogFile> scannedLogFiles = new HashSet<>();
       while (logFormatReaderWrapper.hasNext()) {
         HoodieLogFile logFile = logFormatReaderWrapper.getLogFile();
@@ -250,7 +254,7 @@ public abstract class AbstractHoodieLogRecordReader {
             if (isNewInstantBlock(logBlock) && !readBlocksLazily) {
               // If this is an avro data block belonging to a different commit/instant,
               // then merge the last blocks and records into the main result
-              processQueuedBlocksForInstant(currentInstantLogBlocks, scannedLogFiles.size(), keys);
+              processQueuedBlocksForInstant(currentInstantLogBlocks, scannedLogFiles.size(), keySpecOpt);
             }
             // store the current block
             currentInstantLogBlocks.push(logBlock);
@@ -260,7 +264,7 @@ public abstract class AbstractHoodieLogRecordReader {
             if (isNewInstantBlock(logBlock) && !readBlocksLazily) {
               // If this is a delete data block belonging to a different commit/instant,
               // then merge the last blocks and records into the main result
-              processQueuedBlocksForInstant(currentInstantLogBlocks, scannedLogFiles.size(), keys);
+              processQueuedBlocksForInstant(currentInstantLogBlocks, scannedLogFiles.size(), keySpecOpt);
             }
             // store deletes so can be rolled back
             currentInstantLogBlocks.push(logBlock);
@@ -335,7 +339,7 @@ public abstract class AbstractHoodieLogRecordReader {
       // merge the last read block when all the blocks are done reading
       if (!currentInstantLogBlocks.isEmpty()) {
         LOG.info("Merging the final data blocks");
-        processQueuedBlocksForInstant(currentInstantLogBlocks, scannedLogFiles.size(), keys);
+        processQueuedBlocksForInstant(currentInstantLogBlocks, scannedLogFiles.size(), keySpecOpt);
       }
       // Done
       progress = 1.0f;
@@ -370,11 +374,11 @@ public abstract class AbstractHoodieLogRecordReader {
    * Iterate over the GenericRecord in the block, read the hoodie key and partition path and call subclass processors to
    * handle it.
    */
-  private void processDataBlock(HoodieDataBlock dataBlock, Option<List<String>> keys) throws Exception {
-    try (ClosableIterator<IndexedRecord> recordItr = dataBlock.getRecordItr(keys.orElse(Collections.emptyList()))) {
+  private void processDataBlock(HoodieDataBlock dataBlock, Option<KeySpec> keySpecOpt) throws Exception {
+    try (ClosableIterator<IndexedRecord> recordIterator = getRecordsIterator(dataBlock, keySpecOpt)) {
       Option<Schema> schemaOption = getMergedSchema(dataBlock);
-      while (recordItr.hasNext()) {
-        IndexedRecord currentRecord = recordItr.next();
+      while (recordIterator.hasNext()) {
+        IndexedRecord currentRecord = recordIterator.next();
         IndexedRecord record = schemaOption.isPresent() ? HoodieAvroUtils.rewriteRecordWithNewSchema(currentRecord, schemaOption.get()) : currentRecord;
         processNextRecord(createHoodieRecord(record, this.hoodieTableMetaClient.getTableConfig(), this.payloadClassFQN,
             this.preCombineField, this.withOperationField, this.simpleKeyGenFields, this.partitionName));
@@ -449,23 +453,20 @@ public abstract class AbstractHoodieLogRecordReader {
    * Process the set of log blocks belonging to the last instant which is read fully.
    */
   private void processQueuedBlocksForInstant(Deque<HoodieLogBlock> logBlocks, int numLogFilesSeen,
-                                             Option<List<String>> keys) throws Exception {
+                                             Option<KeySpec> keySpecOpt) throws Exception {
     while (!logBlocks.isEmpty()) {
       LOG.info("Number of remaining logblocks to merge " + logBlocks.size());
       // poll the element at the bottom of the stack since that's the order it was inserted
       HoodieLogBlock lastBlock = logBlocks.pollLast();
       switch (lastBlock.getBlockType()) {
         case AVRO_DATA_BLOCK:
-          processDataBlock((HoodieAvroDataBlock) lastBlock, keys);
+          processDataBlock((HoodieAvroDataBlock) lastBlock, keySpecOpt);
           break;
         case HFILE_DATA_BLOCK:
-          if (!keys.isPresent()) {
-            keys = Option.of(Collections.emptyList());
-          }
-          processDataBlock((HoodieHFileDataBlock) lastBlock, keys);
+          processDataBlock((HoodieHFileDataBlock) lastBlock, keySpecOpt);
           break;
         case PARQUET_DATA_BLOCK:
-          processDataBlock((HoodieParquetDataBlock) lastBlock, keys);
+          processDataBlock((HoodieParquetDataBlock) lastBlock, keySpecOpt);
           break;
         case DELETE_BLOCK:
           Arrays.stream(((HoodieDeleteBlock) lastBlock).getRecordsToDelete()).forEach(this::processNextDeletedRecord);
@@ -481,6 +482,15 @@ public abstract class AbstractHoodieLogRecordReader {
     progress = numLogFilesSeen - 1 / logFilePaths.size();
   }
 
+  private ClosableIterator<IndexedRecord> getRecordsIterator(HoodieDataBlock dataBlock, Option<KeySpec> keySpecOpt) throws IOException {
+    if (keySpecOpt.isPresent()) {
+      KeySpec keySpec = keySpecOpt.get();
+      return dataBlock.getRecordIterator(keySpec.keys, keySpec.fullKey);
+    }
+
+    return dataBlock.getRecordIterator();
+  }
+
   /**
    * Return progress of scanning as a float between 0.0 to 1.0.
    */
@@ -504,7 +514,7 @@ public abstract class AbstractHoodieLogRecordReader {
     return payloadClassFQN;
   }
 
-  protected Option<String> getPartitionName() {
+  public Option<String> getPartitionName() {
     return partitionName;
   }
 
@@ -520,6 +530,16 @@ public abstract class AbstractHoodieLogRecordReader {
     return withOperationField;
   }
 
+  protected static class KeySpec {
+    private final List<String> keys;
+    private final boolean fullKey;
+
+    public KeySpec(List<String> keys, boolean fullKey) {
+      this.keys = keys;
+      this.fullKey = fullKey;
+    }
+  }
+
   /**
    * Builder used to build {@code AbstractHoodieLogRecordScanner}.
    */
diff --git a/hudi-common/src/main/java/org/apache/hudi/common/table/log/HoodieLogFormatReader.java b/hudi-common/src/main/java/org/apache/hudi/common/table/log/HoodieLogFormatReader.java
index 0276c97a00..c48107e392 100644
--- a/hudi-common/src/main/java/org/apache/hudi/common/table/log/HoodieLogFormatReader.java
+++ b/hudi-common/src/main/java/org/apache/hudi/common/table/log/HoodieLogFormatReader.java
@@ -53,13 +53,7 @@ public class HoodieLogFormatReader implements HoodieLogFormat.Reader {
   private static final Logger LOG = LogManager.getLogger(HoodieLogFormatReader.class);
 
   HoodieLogFormatReader(FileSystem fs, List<HoodieLogFile> logFiles, Schema readerSchema, boolean readBlocksLazily,
-                        boolean reverseLogReader, int bufferSize, boolean enableInlineReading,
-                        String recordKeyField) throws IOException {
-    this(fs, logFiles, readerSchema, readBlocksLazily, reverseLogReader, bufferSize, enableInlineReading, recordKeyField, InternalSchema.getEmptyInternalSchema());
-  }
-
-  HoodieLogFormatReader(FileSystem fs, List<HoodieLogFile> logFiles, Schema readerSchema, boolean readBlocksLazily,
-                        boolean reverseLogReader, int bufferSize, boolean enableInlineReading,
+                        boolean reverseLogReader, int bufferSize, boolean enableRecordLookups,
                         String recordKeyField, InternalSchema internalSchema) throws IOException {
     this.logFiles = logFiles;
     this.fs = fs;
@@ -69,12 +63,12 @@ public class HoodieLogFormatReader implements HoodieLogFormat.Reader {
     this.bufferSize = bufferSize;
     this.prevReadersInOpenState = new ArrayList<>();
     this.recordKeyField = recordKeyField;
-    this.enableInlineReading = enableInlineReading;
+    this.enableInlineReading = enableRecordLookups;
     this.internalSchema = internalSchema == null ? InternalSchema.getEmptyInternalSchema() : internalSchema;
     if (logFiles.size() > 0) {
       HoodieLogFile nextLogFile = logFiles.remove(0);
       this.currentReader = new HoodieLogFileReader(fs, nextLogFile, readerSchema, bufferSize, readBlocksLazily, false,
-          enableInlineReading, recordKeyField, internalSchema);
+          enableRecordLookups, recordKeyField, internalSchema);
     }
   }
 
diff --git a/hudi-common/src/main/java/org/apache/hudi/common/table/log/HoodieMergedLogRecordScanner.java b/hudi-common/src/main/java/org/apache/hudi/common/table/log/HoodieMergedLogRecordScanner.java
index cc96f2d692..ed18736443 100644
--- a/hudi-common/src/main/java/org/apache/hudi/common/table/log/HoodieMergedLogRecordScanner.java
+++ b/hudi-common/src/main/java/org/apache/hudi/common/table/log/HoodieMergedLogRecordScanner.java
@@ -45,6 +45,8 @@ import java.util.Iterator;
 import java.util.List;
 import java.util.Map;
 
+import static org.apache.hudi.common.util.ValidationUtils.checkState;
+
 /**
  * Scans through all the blocks in a list of HoodieLogFile and builds up a compacted/merged list of records which will
  * be used as a lookup table when merging the base columnar file with the redo log file.
@@ -76,14 +78,14 @@ public class HoodieMergedLogRecordScanner extends AbstractHoodieLogRecordReader
   protected HoodieMergedLogRecordScanner(FileSystem fs, String basePath, List<String> logFilePaths, Schema readerSchema,
                                          String latestInstantTime, Long maxMemorySizeInBytes, boolean readBlocksLazily,
                                          boolean reverseReader, int bufferSize, String spillableMapBasePath,
-                                         Option<InstantRange> instantRange, boolean autoScan,
+                                         Option<InstantRange> instantRange,
                                          ExternalSpillableMap.DiskMapType diskMapType,
                                          boolean isBitCaskDiskMapCompressionEnabled,
-                                         boolean withOperationField, boolean enableFullScan,
+                                         boolean withOperationField, boolean forceFullScan,
                                          Option<String> partitionName, InternalSchema internalSchema) {
     super(fs, basePath, logFilePaths, readerSchema, latestInstantTime, readBlocksLazily, reverseReader, bufferSize,
         instantRange, withOperationField,
-        enableFullScan, partitionName, internalSchema);
+        forceFullScan, partitionName, internalSchema);
     try {
       // Store merged records for all versions for this log file, set the in-memory footprint to maxInMemoryMapSize
       this.records = new ExternalSpillableMap<>(maxMemorySizeInBytes, spillableMapBasePath, new DefaultSizeEstimator(),
@@ -93,7 +95,7 @@ public class HoodieMergedLogRecordScanner extends AbstractHoodieLogRecordReader
       throw new HoodieIOException("IOException when creating ExternalSpillableMap at " + spillableMapBasePath, e);
     }
 
-    if (autoScan) {
+    if (forceFullScan) {
       performScan();
     }
   }
@@ -115,10 +117,12 @@ public class HoodieMergedLogRecordScanner extends AbstractHoodieLogRecordReader
 
   @Override
   public Iterator<HoodieRecord<? extends HoodieRecordPayload>> iterator() {
+    checkState(forceFullScan, "Record reader has to be in full-scan mode to use this API");
     return records.iterator();
   }
 
   public Map<String, HoodieRecord<? extends HoodieRecordPayload>> getRecords() {
+    checkState(forceFullScan, "Record reader has to be in full-scan mode to use this API");
     return records;
   }
 
@@ -211,8 +215,6 @@ public class HoodieMergedLogRecordScanner extends AbstractHoodieLogRecordReader
     // incremental filtering
     protected Option<InstantRange> instantRange = Option.empty();
     protected String partitionName;
-    // auto scan default true
-    private boolean autoScan = true;
     // operation field default false
     private boolean withOperationField = false;
 
@@ -290,11 +292,6 @@ public class HoodieMergedLogRecordScanner extends AbstractHoodieLogRecordReader
       return this;
     }
 
-    public Builder withAutoScan(boolean autoScan) {
-      this.autoScan = autoScan;
-      return this;
-    }
-
     public Builder withInternalSchema(InternalSchema internalSchema) {
       this.internalSchema = internalSchema == null ? InternalSchema.getEmptyInternalSchema() : internalSchema;
       return this;
@@ -315,7 +312,7 @@ public class HoodieMergedLogRecordScanner extends AbstractHoodieLogRecordReader
     public HoodieMergedLogRecordScanner build() {
       return new HoodieMergedLogRecordScanner(fs, basePath, logFilePaths, readerSchema,
           latestInstantTime, maxMemorySizeInBytes, readBlocksLazily, reverseReader,
-          bufferSize, spillableMapBasePath, instantRange, autoScan,
+          bufferSize, spillableMapBasePath, instantRange,
           diskMapType, isBitCaskDiskMapCompressionEnabled, withOperationField, true,
           Option.ofNullable(partitionName), internalSchema);
     }
diff --git a/hudi-common/src/main/java/org/apache/hudi/common/table/log/block/HoodieAvroDataBlock.java b/hudi-common/src/main/java/org/apache/hudi/common/table/log/block/HoodieAvroDataBlock.java
index a04a32bf42..491c6700c9 100644
--- a/hudi-common/src/main/java/org/apache/hudi/common/table/log/block/HoodieAvroDataBlock.java
+++ b/hudi-common/src/main/java/org/apache/hudi/common/table/log/block/HoodieAvroDataBlock.java
@@ -314,7 +314,7 @@ public class HoodieAvroDataBlock extends HoodieDataBlock {
     output.write(schemaContent);
 
     List<IndexedRecord> records = new ArrayList<>();
-    try (ClosableIterator<IndexedRecord> recordItr = getRecordItr()) {
+    try (ClosableIterator<IndexedRecord> recordItr = getRecordIterator()) {
       recordItr.forEachRemaining(records::add);
     }
 
diff --git a/hudi-common/src/main/java/org/apache/hudi/common/table/log/block/HoodieDataBlock.java b/hudi-common/src/main/java/org/apache/hudi/common/table/log/block/HoodieDataBlock.java
index dd2c03b42c..c83b3bc82d 100644
--- a/hudi-common/src/main/java/org/apache/hudi/common/table/log/block/HoodieDataBlock.java
+++ b/hudi-common/src/main/java/org/apache/hudi/common/table/log/block/HoodieDataBlock.java
@@ -138,7 +138,7 @@ public abstract class HoodieDataBlock extends HoodieLogBlock {
   /**
    * Returns all the records iterator contained w/in this block.
    */
-  public final ClosableIterator<IndexedRecord> getRecordItr() {
+  public final ClosableIterator<IndexedRecord> getRecordIterator() {
     if (records.isPresent()) {
       return list2Iterator(records.get());
     }
@@ -162,21 +162,21 @@ public abstract class HoodieDataBlock extends HoodieLogBlock {
    * @return List of IndexedRecords for the keys of interest.
    * @throws IOException in case of failures encountered when reading/parsing records
    */
-  public final ClosableIterator<IndexedRecord> getRecordItr(List<String> keys) throws IOException {
+  public final ClosableIterator<IndexedRecord> getRecordIterator(List<String> keys, boolean fullKey) throws IOException {
     boolean fullScan = keys.isEmpty();
     if (enablePointLookups && !fullScan) {
-      return lookupRecords(keys);
+      return lookupRecords(keys, fullKey);
     }
 
     // Otherwise, we fetch all the records and filter out all the records, but the
     // ones requested
-    ClosableIterator<IndexedRecord> allRecords = getRecordItr();
+    ClosableIterator<IndexedRecord> allRecords = getRecordIterator();
     if (fullScan) {
       return allRecords;
     }
 
     HashSet<String> keySet = new HashSet<>(keys);
-    return FilteringIterator.getInstance(allRecords, keySet, this::getRecordKey);
+    return FilteringIterator.getInstance(allRecords, keySet, fullKey, this::getRecordKey);
   }
 
   protected ClosableIterator<IndexedRecord> readRecordsFromBlockPayload() throws IOException {
@@ -193,7 +193,7 @@ public abstract class HoodieDataBlock extends HoodieLogBlock {
     }
   }
 
-  protected ClosableIterator<IndexedRecord> lookupRecords(List<String> keys) throws IOException {
+  protected ClosableIterator<IndexedRecord> lookupRecords(List<String> keys, boolean fullKey) throws IOException {
     throw new UnsupportedOperationException(
         String.format("Point lookups are not supported by this Data block type (%s)", getBlockType())
     );
@@ -252,21 +252,25 @@ public abstract class HoodieDataBlock extends HoodieLogBlock {
     private final ClosableIterator<T> nested; // nested iterator
 
     private final Set<String> keys; // the filtering keys
+    private final boolean fullKey;
+
     private final Function<T, Option<String>> keyExtract; // function to extract the key
 
     private T next;
 
-    private FilteringIterator(ClosableIterator<T> nested, Set<String> keys, Function<T, Option<String>> keyExtract) {
+    private FilteringIterator(ClosableIterator<T> nested, Set<String> keys, boolean fullKey, Function<T, Option<String>> keyExtract) {
       this.nested = nested;
       this.keys = keys;
+      this.fullKey = fullKey;
       this.keyExtract = keyExtract;
     }
 
     public static <T extends IndexedRecord> FilteringIterator<T> getInstance(
         ClosableIterator<T> nested,
         Set<String> keys,
+        boolean fullKey,
         Function<T, Option<String>> keyExtract) {
-      return new FilteringIterator<>(nested, keys, keyExtract);
+      return new FilteringIterator<>(nested, keys, fullKey, keyExtract);
     }
 
     @Override
@@ -278,7 +282,13 @@ public abstract class HoodieDataBlock extends HoodieLogBlock {
     public boolean hasNext() {
       while (this.nested.hasNext()) {
         this.next = this.nested.next();
-        if (keys.contains(keyExtract.apply(this.next).orElse(null))) {
+        String key = keyExtract.apply(this.next)
+            .orElseGet(() -> {
+              throw new IllegalStateException(String.format("Record without a key (%s)", this.next));
+            });
+
+        if (fullKey && keys.contains(key)
+            || !fullKey && keys.stream().anyMatch(key::startsWith)) {
           return true;
         }
       }
diff --git a/hudi-common/src/main/java/org/apache/hudi/common/table/log/block/HoodieHFileDataBlock.java b/hudi-common/src/main/java/org/apache/hudi/common/table/log/block/HoodieHFileDataBlock.java
index 26c9db5a15..72cb3a0ef3 100644
--- a/hudi-common/src/main/java/org/apache/hudi/common/table/log/block/HoodieHFileDataBlock.java
+++ b/hudi-common/src/main/java/org/apache/hudi/common/table/log/block/HoodieHFileDataBlock.java
@@ -18,6 +18,7 @@
 
 package org.apache.hudi.common.table.log.block;
 
+import org.apache.hadoop.fs.FileSystem;
 import org.apache.hudi.avro.HoodieAvroUtils;
 import org.apache.hudi.common.fs.FSUtils;
 import org.apache.hudi.common.fs.inline.InLineFSUtils;
@@ -47,6 +48,7 @@ import org.apache.log4j.Logger;
 
 import java.io.ByteArrayOutputStream;
 import java.io.IOException;
+import java.util.ArrayList;
 import java.util.Collections;
 import java.util.HashMap;
 import java.util.Iterator;
@@ -149,6 +151,8 @@ public class HoodieHFileDataBlock extends HoodieDataBlock {
       }
     });
 
+    writer.appendFileInfo(HoodieHFileReader.SCHEMA_KEY.getBytes(), getSchema().toString().getBytes());
+
     writer.close();
     ostream.flush();
     ostream.close();
@@ -163,11 +167,9 @@ public class HoodieHFileDataBlock extends HoodieDataBlock {
     // Get schema from the header
     Schema writerSchema = new Schema.Parser().parse(super.getLogBlockHeader().get(HeaderMetadataType.SCHEMA));
 
+    FileSystem fs = FSUtils.getFs(pathForReader.toString(), new Configuration());
     // Read the content
-    HoodieHFileReader<IndexedRecord> reader = new HoodieHFileReader<>(
-        FSUtils.getFs(pathForReader.toString(), new Configuration()), pathForReader, content);
-    // Sets up the writer schema
-    reader.withSchema(writerSchema);
+    HoodieHFileReader<IndexedRecord> reader = new HoodieHFileReader<>(fs, pathForReader, content, Option.of(writerSchema));
     Iterator<IndexedRecord> recordIterator = reader.getRecordIterator(readerSchema);
     return new ClosableIterator<IndexedRecord>() {
       @Override
@@ -189,7 +191,7 @@ public class HoodieHFileDataBlock extends HoodieDataBlock {
 
   // TODO abstract this w/in HoodieDataBlock
   @Override
-  protected ClosableIterator<IndexedRecord> lookupRecords(List<String> keys) throws IOException {
+  protected ClosableIterator<IndexedRecord> lookupRecords(List<String> keys, boolean fullKey) throws IOException {
     HoodieLogBlockContentLocation blockContentLoc = getBlockContentLocation().get();
 
     // NOTE: It's important to extend Hadoop configuration here to make sure configuration
@@ -204,13 +206,18 @@ public class HoodieHFileDataBlock extends HoodieDataBlock {
         blockContentLoc.getContentPositionInLogFile(),
         blockContentLoc.getBlockSize());
 
-    // HFile read will be efficient if keys are sorted, since on storage, records are sorted by key. This will avoid unnecessary seeks.
-    Collections.sort(keys);
+    // HFile read will be efficient if keys are sorted, since on storage records are sorted by key.
+    // This will avoid unnecessary seeks.
+    List<String> sortedKeys = new ArrayList<>(keys);
+    Collections.sort(sortedKeys);
 
     final HoodieHFileReader<IndexedRecord> reader =
              new HoodieHFileReader<>(inlineConf, inlinePath, new CacheConfig(inlineConf), inlinePath.getFileSystem(inlineConf));
+
     // Get writer's schema from the header
-    final ClosableIterator<IndexedRecord> recordIterator = reader.getRecordIterator(keys, readerSchema);
+    final ClosableIterator<IndexedRecord> recordIterator =
+        fullKey ? reader.getRecordsByKeysIterator(sortedKeys, readerSchema) : reader.getRecordsByKeyPrefixIterator(sortedKeys, readerSchema);
+
     return new ClosableIterator<IndexedRecord>() {
       @Override
       public boolean hasNext() {
diff --git a/hudi-common/src/main/java/org/apache/hudi/common/table/timeline/HoodieArchivedTimeline.java b/hudi-common/src/main/java/org/apache/hudi/common/table/timeline/HoodieArchivedTimeline.java
index 21c7c4db21..a9b25844ec 100644
--- a/hudi-common/src/main/java/org/apache/hudi/common/table/timeline/HoodieArchivedTimeline.java
+++ b/hudi-common/src/main/java/org/apache/hudi/common/table/timeline/HoodieArchivedTimeline.java
@@ -257,7 +257,7 @@ public class HoodieArchivedTimeline extends HoodieDefaultTimeline {
               HoodieAvroDataBlock avroBlock = (HoodieAvroDataBlock) block;
               // TODO If we can store additional metadata in datablock, we can skip parsing records
               // (such as startTime, endTime of records in the block)
-              try (ClosableIterator<IndexedRecord> itr = avroBlock.getRecordItr()) {
+              try (ClosableIterator<IndexedRecord> itr = avroBlock.getRecordIterator()) {
                 StreamSupport.stream(Spliterators.spliteratorUnknownSize(itr, Spliterator.IMMUTABLE), true)
                     // Filter blocks in desired time window
                     .filter(r -> commitsFilter.apply((GenericRecord) r))
diff --git a/hudi-common/src/main/java/org/apache/hudi/common/util/CollectionUtils.java b/hudi-common/src/main/java/org/apache/hudi/common/util/CollectionUtils.java
index 5673921721..9040a04d5e 100644
--- a/hudi-common/src/main/java/org/apache/hudi/common/util/CollectionUtils.java
+++ b/hudi-common/src/main/java/org/apache/hudi/common/util/CollectionUtils.java
@@ -32,9 +32,12 @@ import java.util.Map;
 import java.util.Objects;
 import java.util.Properties;
 import java.util.Set;
+import java.util.Spliterator;
+import java.util.Spliterators;
 import java.util.stream.Collectors;
 import java.util.stream.IntStream;
 import java.util.stream.Stream;
+import java.util.stream.StreamSupport;
 
 public class CollectionUtils {
 
@@ -48,6 +51,16 @@ public class CollectionUtils {
     return !isNullOrEmpty(c);
   }
 
+  /**
+   * Collects provided {@link Iterator} to a {@link Stream}
+   */
+  public static <T> Stream<T> toStream(Iterator<T> iterator) {
+    return StreamSupport.stream(
+        Spliterators.spliteratorUnknownSize(iterator, Spliterator.ORDERED),
+        false
+    );
+  }
+
   /**
    * Combines provided arrays into one
    */
diff --git a/hudi-common/src/main/java/org/apache/hudi/io/storage/HoodieFileReader.java b/hudi-common/src/main/java/org/apache/hudi/io/storage/HoodieFileReader.java
index cb330b8143..6490425c42 100644
--- a/hudi-common/src/main/java/org/apache/hudi/io/storage/HoodieFileReader.java
+++ b/hudi-common/src/main/java/org/apache/hudi/io/storage/HoodieFileReader.java
@@ -18,32 +18,28 @@
 
 package org.apache.hudi.io.storage;
 
-import java.io.IOException;
-import java.util.Iterator;
-import java.util.List;
-import java.util.Map;
-import java.util.Set;
-
 import org.apache.avro.Schema;
 import org.apache.avro.generic.IndexedRecord;
 import org.apache.hudi.common.bloom.BloomFilter;
+import org.apache.hudi.common.util.ClosableIterator;
 import org.apache.hudi.common.util.Option;
 
-public interface HoodieFileReader<R extends IndexedRecord> extends AutoCloseable {
+import java.io.IOException;
+import java.io.UnsupportedEncodingException;
+import java.util.List;
+import java.util.Set;
 
-  public String[] readMinMaxRecordKeys();
+public interface HoodieFileReader<R extends IndexedRecord> extends AutoCloseable {
 
-  public BloomFilter readBloomFilter();
+  String[] readMinMaxRecordKeys();
 
-  public Set<String> filterRowKeys(Set<String> candidateRowKeys);
+  BloomFilter readBloomFilter();
 
-  default Map<String, R> getRecordsByKeys(List<String> rowKeys) throws IOException {
-    throw new UnsupportedOperationException();
-  }
+  Set<String> filterRowKeys(Set<String> candidateRowKeys);
 
-  public Iterator<R> getRecordIterator(Schema readerSchema) throws IOException;
+  ClosableIterator<R> getRecordIterator(Schema readerSchema) throws IOException;
 
-  default Iterator<R> getRecordIterator() throws IOException {
+  default ClosableIterator<R> getRecordIterator() throws IOException {
     return getRecordIterator(getSchema());
   }
 
@@ -55,6 +51,22 @@ public interface HoodieFileReader<R extends IndexedRecord> extends AutoCloseable
     return getRecordByKey(key, getSchema());
   }
 
+  default ClosableIterator<R> getRecordsByKeysIterator(List<String> keys, Schema schema) throws IOException {
+    throw new UnsupportedOperationException();
+  }
+
+  default ClosableIterator<R> getRecordsByKeysIterator(List<String> keys) throws IOException {
+    return getRecordsByKeysIterator(keys, getSchema());
+  }
+
+  default ClosableIterator<R> getRecordsByKeyPrefixIterator(List<String> keyPrefixes, Schema schema) throws IOException {
+    throw new UnsupportedEncodingException();
+  }
+
+  default ClosableIterator<R> getRecordsByKeyPrefixIterator(List<String> keyPrefixes) throws IOException {
+    return getRecordsByKeyPrefixIterator(keyPrefixes, getSchema());
+  }
+
   Schema getSchema();
 
   void close();
diff --git a/hudi-common/src/main/java/org/apache/hudi/io/storage/HoodieHFileReader.java b/hudi-common/src/main/java/org/apache/hudi/io/storage/HoodieHFileReader.java
index 90440345f7..412b7e4a54 100644
--- a/hudi-common/src/main/java/org/apache/hudi/io/storage/HoodieHFileReader.java
+++ b/hudi-common/src/main/java/org/apache/hudi/io/storage/HoodieHFileReader.java
@@ -18,21 +18,10 @@
 
 package org.apache.hudi.io.storage;
 
-import org.apache.hudi.avro.HoodieAvroUtils;
-import org.apache.hudi.common.bloom.BloomFilter;
-import org.apache.hudi.common.bloom.BloomFilterFactory;
-import org.apache.hudi.common.fs.FSUtils;
-import org.apache.hudi.common.util.ClosableIterator;
-import org.apache.hudi.common.util.Option;
-import org.apache.hudi.common.util.ValidationUtils;
-import org.apache.hudi.common.util.io.ByteBufferBackedInputStream;
-import org.apache.hudi.exception.HoodieException;
-import org.apache.hudi.exception.HoodieIOException;
-
 import org.apache.avro.Schema;
+import org.apache.avro.generic.GenericRecord;
 import org.apache.avro.generic.IndexedRecord;
 import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.fs.FSDataInputStream;
 import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.fs.PositionedReadable;
@@ -44,97 +33,117 @@ import org.apache.hadoop.hbase.io.hfile.HFile;
 import org.apache.hadoop.hbase.io.hfile.HFileInfo;
 import org.apache.hadoop.hbase.io.hfile.HFileScanner;
 import org.apache.hadoop.hbase.nio.ByteBuff;
-import org.apache.hadoop.hbase.util.Pair;
+import org.apache.hudi.avro.HoodieAvroUtils;
+import org.apache.hudi.common.bloom.BloomFilter;
+import org.apache.hudi.common.bloom.BloomFilterFactory;
+import org.apache.hudi.common.fs.FSUtils;
+import org.apache.hudi.common.util.ClosableIterator;
+import org.apache.hudi.common.util.Option;
+import org.apache.hudi.common.util.io.ByteBufferBackedInputStream;
+import org.apache.hudi.exception.HoodieException;
+import org.apache.hudi.exception.HoodieIOException;
+import org.apache.hudi.util.LazyRef;
 import org.apache.log4j.LogManager;
 import org.apache.log4j.Logger;
 
 import java.io.IOException;
-import java.util.ArrayList;
 import java.util.Arrays;
-import java.util.HashMap;
+import java.util.Collections;
 import java.util.Iterator;
-import java.util.LinkedList;
 import java.util.List;
-import java.util.Map;
+import java.util.Objects;
 import java.util.Set;
 import java.util.TreeSet;
 import java.util.stream.Collectors;
 
+import static org.apache.hudi.common.util.CollectionUtils.toStream;
+import static org.apache.hudi.common.util.ValidationUtils.checkState;
+
+/**
+ * NOTE: PLEASE READ DOCS & COMMENTS CAREFULLY BEFORE MAKING CHANGES
+ * <p>
+ * {@link HoodieFileReader} implementation allowing to read from {@link HFile}.
+ */
 public class HoodieHFileReader<R extends IndexedRecord> implements HoodieFileReader<R> {
-  public static final String KEY_FIELD_NAME = "key";
-  public static final String KEY_SCHEMA = "schema";
+
+  // TODO HoodieHFileReader right now tightly coupled to MT, we should break that coupling
+  public static final String SCHEMA_KEY = "schema";
   public static final String KEY_BLOOM_FILTER_META_BLOCK = "bloomFilter";
   public static final String KEY_BLOOM_FILTER_TYPE_CODE = "bloomFilterTypeCode";
+
+  public static final String KEY_FIELD_NAME = "key";
   public static final String KEY_MIN_RECORD = "minRecordKey";
   public static final String KEY_MAX_RECORD = "maxRecordKey";
 
   private static final Logger LOG = LogManager.getLogger(HoodieHFileReader.class);
 
-  private Path path;
-  private Configuration conf;
-  private HFile.Reader reader;
-  private FSDataInputStream fsDataInputStream;
-  private Schema schema;
-  // Scanner used to read individual keys. This is cached to prevent the overhead of opening the scanner for each
-  // key retrieval.
-  private HFileScanner keyScanner;
-
-  public HoodieHFileReader(Configuration configuration, Path path, CacheConfig cacheConfig) throws IOException {
-    this.conf = configuration;
-    this.path = path;
-    this.reader = HoodieHFileUtils.createHFileReader(FSUtils.getFs(path.toString(), configuration), path, cacheConfig, conf);
+  private final Path path;
+
+  private final LazyRef<Schema> schema;
+
+  // NOTE: Reader is ONLY THREAD-SAFE for {@code Scanner} operating in Positional Read ("pread")
+  //       mode (ie created w/ "pread = true")
+  private final HFile.Reader reader;
+  // NOTE: Scanner caches read blocks, therefore it's important to re-use scanner
+  //       wherever possible
+  private final HFileScanner sharedScanner;
+
+  private final Object sharedScannerLock = new Object();
+
+  public HoodieHFileReader(Configuration hadoopConf, Path path, CacheConfig cacheConfig) throws IOException {
+    this(path,
+        HoodieHFileUtils.createHFileReader(FSUtils.getFs(path.toString(), hadoopConf), path, cacheConfig, hadoopConf),
+        Option.empty());
   }
 
-  public HoodieHFileReader(Configuration configuration, Path path, CacheConfig cacheConfig, FileSystem fs) throws IOException {
-    this.conf = configuration;
-    this.path = path;
-    this.fsDataInputStream = fs.open(path);
-    this.reader = HoodieHFileUtils.createHFileReader(fs, path, cacheConfig, configuration);
+  public HoodieHFileReader(Configuration hadoopConf, Path path, CacheConfig cacheConfig, FileSystem fs) throws IOException {
+    this(path, HoodieHFileUtils.createHFileReader(fs, path, cacheConfig, hadoopConf), Option.empty());
   }
 
-  public HoodieHFileReader(FileSystem fs, Path dummyPath, byte[] content) throws IOException {
-    this.reader = HoodieHFileUtils.createHFileReader(fs, dummyPath, content);
+  public HoodieHFileReader(FileSystem fs, Path dummyPath, byte[] content, Option<Schema> schemaOpt) throws IOException {
+    this(null, HoodieHFileUtils.createHFileReader(fs, dummyPath, content), schemaOpt);
+  }
+
+  public HoodieHFileReader(Path path, HFile.Reader reader, Option<Schema> schemaOpt) throws IOException {
+    this.path = path;
+    this.reader = reader;
+    // For shared scanner, which is primarily used for point-lookups, we're caching blocks
+    // by default, to minimize amount of traffic to the underlying storage
+    this.sharedScanner = getHFileScanner(reader, true);
+    this.schema = schemaOpt.map(LazyRef::eager)
+        .orElseGet(() -> LazyRef.lazy(() -> fetchSchema(reader)));
   }
 
   @Override
   public String[] readMinMaxRecordKeys() {
+    // NOTE: This access to reader is thread-safe
     HFileInfo fileInfo = reader.getHFileInfo();
-    return new String[] {new String(fileInfo.get(KEY_MIN_RECORD.getBytes())),
+    return new String[]{new String(fileInfo.get(KEY_MIN_RECORD.getBytes())),
         new String(fileInfo.get(KEY_MAX_RECORD.getBytes()))};
   }
 
-  @Override
-  public Schema getSchema() {
-    if (schema == null) {
-      HFileInfo fileInfo = reader.getHFileInfo();
-      schema = new Schema.Parser().parse(new String(fileInfo.get(KEY_SCHEMA.getBytes())));
-    }
-
-    return schema;
-  }
-
-  /**
-   * Sets up the writer schema explicitly.
-   */
-  public void withSchema(Schema schema) {
-    this.schema = schema;
-  }
-
   @Override
   public BloomFilter readBloomFilter() {
-    HFileInfo fileInfo;
     try {
-      fileInfo = reader.getHFileInfo();
-      ByteBuff serializedFilter = reader.getMetaBlock(KEY_BLOOM_FILTER_META_BLOCK, false).getBufferWithoutHeader();
-      byte[] filterBytes = new byte[serializedFilter.remaining()];
-      serializedFilter.get(filterBytes); // read the bytes that were written
-      return BloomFilterFactory.fromString(new String(filterBytes),
+      // NOTE: This access to reader is thread-safe
+      HFileInfo fileInfo = reader.getHFileInfo();
+      ByteBuff buf = reader.getMetaBlock(KEY_BLOOM_FILTER_META_BLOCK, false).getBufferWithoutHeader();
+      // We have to copy bytes here, since we can't reuse buffer's underlying
+      // array as is, since it contains additional metadata (header)
+      byte[] bytes = new byte[buf.remaining()];
+      buf.get(bytes);
+      return BloomFilterFactory.fromString(new String(bytes),
           new String(fileInfo.get(KEY_BLOOM_FILTER_TYPE_CODE.getBytes())));
     } catch (IOException e) {
       throw new HoodieException("Could not read bloom filter from " + path, e);
     }
   }
 
+  @Override
+  public Schema getSchema() {
+    return schema.get();
+  }
+
   /**
    * Filter keys by availability.
    * <p>
@@ -145,289 +154,420 @@ public class HoodieHFileReader<R extends IndexedRecord> implements HoodieFileRea
    */
   @Override
   public Set<String> filterRowKeys(Set<String> candidateRowKeys) {
-    return candidateRowKeys.stream().filter(k -> {
-      try {
-        return isKeyAvailable(k);
-      } catch (IOException e) {
-        LOG.error("Failed to check key availability: " + k);
-        return false;
-      }
-    }).collect(Collectors.toSet());
-  }
+    checkState(candidateRowKeys instanceof TreeSet,
+        String.format("HFile reader expects a TreeSet as iterating over ordered keys is more performant, got (%s)", candidateRowKeys.getClass().getSimpleName()));
 
-  @Override
-  public Map<String, R> getRecordsByKeys(List<String> rowKeys) throws IOException {
-    return filterRecordsImpl(new TreeSet<>(rowKeys));
+    synchronized (sharedScannerLock) {
+      return candidateRowKeys.stream().filter(k -> {
+        try {
+          return isKeyAvailable(k, sharedScanner);
+        } catch (IOException e) {
+          LOG.error("Failed to check key availability: " + k);
+          return false;
+        }
+      }).collect(Collectors.toSet());
+    }
   }
 
-  /**
-   * Filter records by sorted keys.
-   * <p>
-   * TODO: Implement single seek and sequential scan till the last candidate key
-   * instead of repeated seeks.
-   *
-   * @param sortedCandidateRowKeys - Sorted set of keys to fetch records for
-   * @return Map of keys to fetched records
-   * @throws IOException When the deserialization of records fail
-   */
-  private synchronized Map<String, R> filterRecordsImpl(TreeSet<String> sortedCandidateRowKeys) throws IOException {
-    HashMap<String, R> filteredRecords = new HashMap<>();
-    for (String key : sortedCandidateRowKeys) {
-      Option<R> record = getRecordByKey(key);
-      if (record.isPresent()) {
-        filteredRecords.put(key, record.get());
-      }
+  @SuppressWarnings("unchecked")
+  @Override
+  public Option<R> getRecordByKey(String key, Schema readerSchema) throws IOException {
+    synchronized (sharedScannerLock) {
+      return (Option<R>) fetchRecordByKeyInternal(sharedScanner, key, getSchema(), readerSchema);
     }
-    return filteredRecords;
   }
 
-  /**
-   * Reads all the records with given schema.
-   *
-   * <p>NOTE: This should only be used for testing,
-   * the records are materialized eagerly into a list and returned,
-   * use {@code getRecordIterator} where possible.
-   */
-  private List<Pair<String, R>> readAllRecords(Schema writerSchema, Schema readerSchema) {
-    final Option<Schema.Field> keyFieldSchema = Option.ofNullable(readerSchema.getField(KEY_FIELD_NAME));
-    List<Pair<String, R>> recordList = new LinkedList<>();
-    try {
-      final HFileScanner scanner = reader.getScanner(false, false);
-      if (scanner.seekTo()) {
-        do {
-          Cell c = scanner.getCell();
-          final Pair<String, R> keyAndRecordPair = getRecordFromCell(c, writerSchema, readerSchema, keyFieldSchema);
-          recordList.add(keyAndRecordPair);
-        } while (scanner.next());
-      }
+  @SuppressWarnings("unchecked")
+  @Override
+  public ClosableIterator<R> getRecordIterator(Schema readerSchema) throws IOException {
+    // TODO eval whether seeking scanner would be faster than pread
+    HFileScanner scanner = getHFileScanner(reader, false);
+    return (ClosableIterator<R>) new RecordIterator(scanner, getSchema(), readerSchema);
+  }
 
-      return recordList;
-    } catch (IOException e) {
-      throw new HoodieException("Error reading hfile " + path + " as a dataframe", e);
-    }
+  @SuppressWarnings("unchecked")
+  @Override
+  public ClosableIterator<R> getRecordsByKeysIterator(List<String> keys, Schema readerSchema) throws IOException {
+    // We're caching blocks for this scanner to minimize amount of traffic
+    // to the underlying storage as we fetched (potentially) sparsely distributed
+    // keys
+    HFileScanner scanner = getHFileScanner(reader, true);
+    return (ClosableIterator<R>) new RecordByKeyIterator(scanner, keys, getSchema(), readerSchema);
   }
 
-  /**
-   * Reads all the records with current schema.
-   *
-   * <p>NOTE: This should only be used for testing,
-   * the records are materialized eagerly into a list and returned,
-   * use {@code getRecordIterator} where possible.
-   */
-  public List<Pair<String, R>> readAllRecords() {
-    Schema schema = getSchema();
-    return readAllRecords(schema, schema);
+  @SuppressWarnings("unchecked")
+  @Override
+  public ClosableIterator<R> getRecordsByKeyPrefixIterator(List<String> keyPrefixes, Schema readerSchema) throws IOException {
+    // We're caching blocks for this scanner to minimize amount of traffic
+    // to the underlying storage as we fetched (potentially) sparsely distributed
+    // keys
+    HFileScanner scanner = getHFileScanner(reader, true);
+    return (ClosableIterator<R>) new RecordByKeyPrefixIterator(scanner, keyPrefixes, getSchema(), readerSchema);
   }
 
-  /**
-   * Reads all the records with current schema and filtering keys.
-   *
-   * <p>NOTE: This should only be used for testing,
-   * the records are materialized eagerly into a list and returned,
-   * use {@code getRecordIterator} where possible.
-   */
-  public List<Pair<String, R>> readRecords(List<String> keys) throws IOException {
-    return readRecords(keys, getSchema());
+  @Override
+  public long getTotalRecords() {
+    // NOTE: This access to reader is thread-safe
+    return reader.getEntries();
   }
 
-  /**
-   * Reads all the records with given schema and filtering keys.
-   *
-   * <p>NOTE: This should only be used for testing,
-   * the records are materialized eagerly into a list and returned,
-   * use {@code getRecordIterator} where possible.
-   */
-  public List<Pair<String, R>> readRecords(List<String> keys, Schema schema) throws IOException {
-    this.schema = schema;
-    List<Pair<String, R>> records = new ArrayList<>();
-    for (String key: keys) {
-      Option<R> value = getRecordByKey(key, schema);
-      if (value.isPresent()) {
-        records.add(new Pair(key, value.get()));
+  @Override
+  public void close() {
+    try {
+      synchronized (this) {
+        reader.close();
       }
+    } catch (IOException e) {
+      throw new HoodieIOException("Error closing the hfile reader", e);
     }
-    return records;
   }
 
-  public ClosableIterator<R> getRecordIterator(List<String> keys, Schema schema) throws IOException {
-    this.schema = schema;
-    Iterator<String> iterator = keys.iterator();
-    return new ClosableIterator<R>() {
-      private R next;
-      @Override
-      public void close() {
+  private boolean isKeyAvailable(String key, HFileScanner keyScanner) throws IOException {
+    final KeyValue kv = new KeyValue(key.getBytes(), null, null, null);
+    return keyScanner.seekTo(kv) == 0;
+  }
+
+  private static Iterator<GenericRecord> getRecordByKeyPrefixIteratorInternal(HFileScanner scanner,
+                                                                              String keyPrefix,
+                                                                              Schema writerSchema,
+                                                                              Schema readerSchema) throws IOException {
+    KeyValue kv = new KeyValue(keyPrefix.getBytes(), null, null, null);
+
+    // NOTE: HFile persists both keys/values as bytes, therefore lexicographical sorted is
+    //       essentially employed
+    //
+    // For the HFile containing list of cells c[0], c[1], ..., c[N], `seekTo(cell)` would return
+    // following:
+    //    a) -1, if cell < c[0], no position;
+    //    b) 0, such that c[i] = cell and scanner is left in position i;
+    //    c) and 1, such that c[i] < cell, and scanner is left in position i.
+    //
+    // Consider entries w/ the following keys in HFile: [key01, key02, key03, key04,..., key20];
+    // In case looked up key-prefix is
+    //    - "key", `seekTo()` will return -1 and place the cursor just before "key01",
+    //    `getCell()` will return "key01" entry
+    //    - "key03", `seekTo()` will return 0 (exact match) and place the cursor just before "key03",
+    //    `getCell()` will return "key03" entry
+    //    - "key1", `seekTo()` will return 1 (first not lower than) and place the cursor just before
+    //    "key10" (i.e. on "key09");
+    //
+    int val = scanner.seekTo(kv);
+    if (val == 1) {
+      // Try moving to next entry, matching the prefix key; if we're at the EOF,
+      // `next()` will return false
+      if (!scanner.next()) {
+        return Collections.emptyIterator();
       }
+    }
+
+    class KeyPrefixIterator implements Iterator<GenericRecord> {
+      private GenericRecord next = null;
+      private boolean eof = false;
 
       @Override
       public boolean hasNext() {
-        try {
-          while (iterator.hasNext()) {
-            Option<R> value = getRecordByKey(iterator.next(), schema);
-            if (value.isPresent()) {
-              next = value.get();
-              return true;
-            }
-          }
+        if (next != null) {
+          return true;
+        } else if (eof) {
+          return false;
+        }
+
+        Cell c = Objects.requireNonNull(scanner.getCell());
+        byte[] keyBytes = copyKeyFromCell(c);
+        String key = new String(keyBytes);
+        // Check whether we're still reading records corresponding to the key-prefix
+        if (!key.startsWith(keyPrefix)) {
           return false;
+        }
+
+        // Extract the byte value before releasing the lock since we cannot hold on to the returned cell afterwards
+        byte[] valueBytes = copyValueFromCell(c);
+        try {
+          next = deserialize(keyBytes, valueBytes, writerSchema, readerSchema);
+          // In case scanner is not able to advance, it means we reached EOF
+          eof = !scanner.next();
         } catch (IOException e) {
-          throw new HoodieIOException("unable to read next record from hfile ", e);
+          throw new HoodieIOException("Failed to deserialize payload", e);
         }
+
+        return true;
       }
 
       @Override
-      public R next() {
+      public GenericRecord next() {
+        GenericRecord next = this.next;
+        this.next = null;
         return next;
       }
-    };
+    }
+
+    return new KeyPrefixIterator();
   }
 
-  @Override
-  public Iterator getRecordIterator(Schema readerSchema) throws IOException {
-    final HFileScanner scanner = reader.getScanner(false, false);
-    final Option<Schema.Field> keyFieldSchema = Option.ofNullable(readerSchema.getField(KEY_FIELD_NAME));
-    ValidationUtils.checkState(keyFieldSchema != null,
-        "Missing key field '" + KEY_FIELD_NAME + "' in the schema!");
-    return new Iterator<R>() {
-      private R next = null;
-      private boolean eof = false;
+  private static Option<GenericRecord> fetchRecordByKeyInternal(HFileScanner scanner, String key, Schema writerSchema, Schema readerSchema) throws IOException {
+    KeyValue kv = new KeyValue(key.getBytes(), null, null, null);
+    if (scanner.seekTo(kv) != 0) {
+      return Option.empty();
+    }
 
-      @Override
-      public boolean hasNext() {
-        try {
-          // To handle when hasNext() is called multiple times for idempotency and/or the first time
-          if (this.next == null && !this.eof) {
-            if (!scanner.isSeeked() && scanner.seekTo()) {
-              final Pair<String, R> keyAndRecordPair = getRecordFromCell(scanner.getCell(), getSchema(), readerSchema, keyFieldSchema);
-              this.next = keyAndRecordPair.getSecond();
-            }
-          }
-          return this.next != null;
-        } catch (IOException io) {
-          throw new HoodieIOException("unable to read next record from hfile ", io);
-        }
-      }
+    Cell c = scanner.getCell();
+    byte[] valueBytes = copyValueFromCell(c);
+    GenericRecord record = deserialize(key.getBytes(), valueBytes, writerSchema, readerSchema);
 
-      @Override
-      public R next() {
-        try {
-          // To handle case when next() is called before hasNext()
-          if (this.next == null) {
-            if (!hasNext()) {
-              throw new HoodieIOException("No more records left to read from hfile");
-            }
-          }
-          R retVal = this.next;
-          if (scanner.next()) {
-            final Pair<String, R> keyAndRecordPair = getRecordFromCell(scanner.getCell(), getSchema(), readerSchema, keyFieldSchema);
-            this.next = keyAndRecordPair.getSecond();
-          } else {
-            this.next = null;
-            this.eof = true;
-          }
-          return retVal;
-        } catch (IOException io) {
-          throw new HoodieIOException("unable to read next record from parquet file ", io);
-        }
-      }
-    };
+    return Option.of(record);
   }
 
-  private boolean isKeyAvailable(String key) throws IOException {
-    final KeyValue kv = new KeyValue(key.getBytes(), null, null, null);
-    synchronized (this) {
-      if (keyScanner == null) {
-        keyScanner = reader.getScanner(false, false);
-      }
-      if (keyScanner.seekTo(kv) == 0) {
-        return true;
-      }
+  private static GenericRecord getRecordFromCell(Cell cell, Schema writerSchema, Schema readerSchema) throws IOException {
+    final byte[] keyBytes = copyKeyFromCell(cell);
+    final byte[] valueBytes = copyValueFromCell(cell);
+    return deserialize(keyBytes, valueBytes, writerSchema, readerSchema);
+  }
+
+  private static GenericRecord deserializeUnchecked(final byte[] keyBytes,
+                                                    final byte[] valueBytes,
+                                                    Schema writerSchema,
+                                                    Schema readerSchema) {
+    try {
+      return deserialize(keyBytes, valueBytes, writerSchema, readerSchema);
+    } catch (IOException e) {
+      throw new HoodieIOException("Failed to deserialize payload", e);
     }
-    return false;
   }
 
-  @Override
-  public Option getRecordByKey(String key, Schema readerSchema) throws IOException {
-    byte[] value = null;
-    final Option<Schema.Field> keyFieldSchema = Option.ofNullable(readerSchema.getField(KEY_FIELD_NAME));
-    ValidationUtils.checkState(keyFieldSchema != null);
-    KeyValue kv = new KeyValue(key.getBytes(), null, null, null);
+  private static GenericRecord deserialize(final byte[] keyBytes,
+                                           final byte[] valueBytes,
+                                           Schema writerSchema,
+                                           Schema readerSchema) throws IOException {
+    GenericRecord record = HoodieAvroUtils.bytesToAvro(valueBytes, writerSchema, readerSchema);
 
-    synchronized (this) {
-      if (keyScanner == null) {
-        keyScanner = reader.getScanner(false, false);
+    getKeySchema(readerSchema).ifPresent(keyFieldSchema -> {
+      final Object keyObject = record.get(keyFieldSchema.pos());
+      if (keyObject != null && keyObject.toString().isEmpty()) {
+        record.put(keyFieldSchema.pos(), new String(keyBytes));
       }
+    });
 
-      if (keyScanner.seekTo(kv) == 0) {
-        Cell c = keyScanner.getCell();
-        // Extract the byte value before releasing the lock since we cannot hold on to the returned cell afterwards
-        value = Arrays.copyOfRange(c.getValueArray(), c.getValueOffset(), c.getValueOffset() + c.getValueLength());
-      }
-    }
+    return record;
+  }
 
-    if (value != null) {
-      R record = deserialize(key.getBytes(), value, getSchema(), readerSchema, keyFieldSchema);
-      return Option.of(record);
-    }
+  private static Schema fetchSchema(HFile.Reader reader) {
+    HFileInfo fileInfo = reader.getHFileInfo();
+    return new Schema.Parser().parse(new String(fileInfo.get(SCHEMA_KEY.getBytes())));
+  }
 
-    return Option.empty();
+  private static byte[] copyKeyFromCell(Cell cell) {
+    return Arrays.copyOfRange(cell.getRowArray(), cell.getRowOffset(), cell.getRowOffset() + cell.getRowLength());
   }
 
-  private Pair<String, R> getRecordFromCell(Cell cell, Schema writerSchema, Schema readerSchema, Option<Schema.Field> keyFieldSchema) throws IOException {
-    final byte[] keyBytes = Arrays.copyOfRange(cell.getRowArray(), cell.getRowOffset(), cell.getRowOffset() + cell.getRowLength());
-    final byte[] valueBytes = Arrays.copyOfRange(cell.getValueArray(), cell.getValueOffset(), cell.getValueOffset() + cell.getValueLength());
-    R record = deserialize(keyBytes, valueBytes, writerSchema, readerSchema, keyFieldSchema);
-    return new Pair<>(new String(keyBytes), record);
+  private static byte[] copyValueFromCell(Cell c) {
+    return Arrays.copyOfRange(c.getValueArray(), c.getValueOffset(), c.getValueOffset() + c.getValueLength());
   }
 
   /**
-   * Deserialize the record byte array contents to record object.
-   *
-   * @param keyBytes       - Record key as byte array
-   * @param valueBytes     - Record content as byte array
-   * @param writerSchema   - Writer schema
-   * @param readerSchema   - Reader schema
-   * @param keyFieldSchema - Key field id in the schema
-   * @return Deserialized record object
+   * NOTE: THIS SHOULD ONLY BE USED FOR TESTING, RECORDS ARE MATERIALIZED EAGERLY
+   * <p>
+   * Reads all the records with given schema
    */
-  private R deserialize(final byte[] keyBytes, final byte[] valueBytes, Schema writerSchema, Schema readerSchema,
-                        Option<Schema.Field> keyFieldSchema) throws IOException {
-    R record = (R) HoodieAvroUtils.bytesToAvro(valueBytes, writerSchema, readerSchema);
-    materializeRecordIfNeeded(keyBytes, record, keyFieldSchema);
-    return record;
+  public static <R extends IndexedRecord> List<R> readAllRecords(HoodieHFileReader<R> reader) throws IOException {
+    Schema schema = reader.getSchema();
+    return toStream(reader.getRecordIterator(schema))
+        .collect(Collectors.toList());
   }
 
   /**
-   * Materialize the record for any missing fields, if needed.
-   *
-   * @param keyBytes       - Key byte array
-   * @param record         - Record object to materialize
-   * @param keyFieldSchema - Key field id in the schema
+   * NOTE: THIS SHOULD ONLY BE USED FOR TESTING, RECORDS ARE MATERIALIZED EAGERLY
+   * <p>
+   * Reads all the records with given schema and filtering keys.
    */
-  private void materializeRecordIfNeeded(final byte[] keyBytes, R record, Option<Schema.Field> keyFieldSchema) {
-    if (keyFieldSchema.isPresent()) {
-      final Object keyObject = record.get(keyFieldSchema.get().pos());
-      if (keyObject != null && keyObject.toString().isEmpty()) {
-        record.put(keyFieldSchema.get().pos(), new String(keyBytes));
+  public static <R extends IndexedRecord> List<R> readRecords(HoodieHFileReader<R> reader,
+                                                              List<String> keys) throws IOException {
+    return readRecords(reader, keys, reader.getSchema());
+  }
+
+  /**
+   * NOTE: THIS SHOULD ONLY BE USED FOR TESTING, RECORDS ARE MATERIALIZED EAGERLY
+   * <p>
+   * Reads all the records with given schema and filtering keys.
+   */
+  public static <R extends IndexedRecord> List<R> readRecords(HoodieHFileReader<R> reader,
+                                                              List<String> keys,
+                                                              Schema schema) throws IOException {
+    Collections.sort(keys);
+    return toStream(reader.getRecordsByKeysIterator(keys, schema))
+        .collect(Collectors.toList());
+  }
+
+  private static HFileScanner getHFileScanner(HFile.Reader reader, boolean cacheBlocks) {
+    // NOTE: Only scanners created in Positional Read ("pread") mode could share the same reader,
+    //       since scanners in default mode will be seeking w/in the underlying stream
+    return reader.getScanner(cacheBlocks, true);
+  }
+
+  private static Option<Schema.Field> getKeySchema(Schema schema) {
+    return Option.ofNullable(schema.getField(KEY_FIELD_NAME));
+  }
+
+  private static class RecordByKeyPrefixIterator implements ClosableIterator<GenericRecord> {
+    private final Iterator<String> keyPrefixesIterator;
+    private Iterator<GenericRecord> recordsIterator;
+
+    private final HFileScanner scanner;
+
+    private final Schema writerSchema;
+    private final Schema readerSchema;
+
+    private GenericRecord next = null;
+
+    RecordByKeyPrefixIterator(HFileScanner scanner, List<String> keyPrefixes, Schema writerSchema, Schema readerSchema) throws IOException {
+      this.keyPrefixesIterator = keyPrefixes.iterator();
+
+      this.scanner = scanner;
+      this.scanner.seekTo(); // position at the beginning of the file
+
+      this.writerSchema = writerSchema;
+      this.readerSchema = readerSchema;
+    }
+
+    @Override
+    public boolean hasNext() {
+      try {
+        while (true) {
+          // NOTE: This is required for idempotency
+          if (next != null) {
+            return true;
+          } else if (recordsIterator != null && recordsIterator.hasNext()) {
+            next = recordsIterator.next();
+            return true;
+          } else if (keyPrefixesIterator.hasNext()) {
+            String currentKeyPrefix = keyPrefixesIterator.next();
+            recordsIterator =
+                getRecordByKeyPrefixIteratorInternal(scanner, currentKeyPrefix, writerSchema, readerSchema);
+          } else {
+            return false;
+          }
+        }
+      } catch (IOException e) {
+        throw new HoodieIOException("Unable to read next record from HFile", e);
       }
     }
+
+    @Override
+    public GenericRecord next() {
+      GenericRecord next = this.next;
+      this.next = null;
+      return next;
+    }
+
+    @Override
+    public void close() {
+      scanner.close();
+    }
   }
 
-  @Override
-  public long getTotalRecords() {
-    return reader.getEntries();
+  private static class RecordByKeyIterator implements ClosableIterator<GenericRecord> {
+    private final Iterator<String> keyIterator;
+
+    private final HFileScanner scanner;
+
+    private final Schema readerSchema;
+    private final Schema writerSchema;
+
+    private GenericRecord next = null;
+
+    RecordByKeyIterator(HFileScanner scanner, List<String> keys, Schema writerSchema, Schema readerSchema) throws IOException {
+      this.keyIterator = keys.iterator();
+
+      this.scanner = scanner;
+      this.scanner.seekTo(); // position at the beginning of the file
+
+      this.writerSchema = writerSchema;
+      this.readerSchema = readerSchema;
+    }
+
+    @Override
+    public boolean hasNext() {
+      try {
+        // NOTE: This is required for idempotency
+        if (next != null) {
+          return true;
+        }
+
+        while (keyIterator.hasNext()) {
+          Option<GenericRecord> value = fetchRecordByKeyInternal(scanner, keyIterator.next(), writerSchema, readerSchema);
+          if (value.isPresent()) {
+            next = value.get();
+            return true;
+          }
+        }
+        return false;
+      } catch (IOException e) {
+        throw new HoodieIOException("unable to read next record from hfile ", e);
+      }
+    }
+
+    @Override
+    public GenericRecord next() {
+      GenericRecord next = this.next;
+      this.next = null;
+      return next;
+    }
+
+    @Override
+    public void close() {
+      scanner.close();
+    }
   }
 
-  @Override
-  public synchronized void close() {
-    try {
-      reader.close();
-      reader = null;
-      if (fsDataInputStream != null) {
-        fsDataInputStream.close();
+  private static class RecordIterator implements ClosableIterator<GenericRecord> {
+    private final HFileScanner scanner;
+
+    private final Schema writerSchema;
+    private final Schema readerSchema;
+
+    private GenericRecord next = null;
+
+    RecordIterator(HFileScanner scanner, Schema writerSchema, Schema readerSchema) {
+      this.scanner = scanner;
+      this.writerSchema = writerSchema;
+      this.readerSchema = readerSchema;
+    }
+
+    @Override
+    public boolean hasNext() {
+      try {
+        // NOTE: This is required for idempotency
+        if (next != null) {
+          return true;
+        }
+
+        boolean hasRecords;
+        if (!scanner.isSeeked()) {
+          hasRecords = scanner.seekTo();
+        } else {
+          hasRecords = scanner.next();
+        }
+
+        if (!hasRecords) {
+          return false;
+        }
+
+        this.next = getRecordFromCell(scanner.getCell(), writerSchema, readerSchema);
+        return true;
+      } catch (IOException io) {
+        throw new HoodieIOException("unable to read next record from hfile ", io);
       }
-      keyScanner = null;
-    } catch (IOException e) {
-      throw new HoodieIOException("Error closing the hfile reader", e);
+    }
+
+    @Override
+    public GenericRecord next() {
+      GenericRecord next = this.next;
+      this.next = null;
+      return next;
+    }
+
+    @Override
+    public void close() {
+      scanner.close();
     }
   }
 
diff --git a/hudi-common/src/main/java/org/apache/hudi/io/storage/HoodieOrcReader.java b/hudi-common/src/main/java/org/apache/hudi/io/storage/HoodieOrcReader.java
index 319f8d7da1..5431bf3782 100644
--- a/hudi-common/src/main/java/org/apache/hudi/io/storage/HoodieOrcReader.java
+++ b/hudi-common/src/main/java/org/apache/hudi/io/storage/HoodieOrcReader.java
@@ -18,9 +18,6 @@
 
 package org.apache.hudi.io.storage;
 
-import java.io.IOException;
-import java.util.Iterator;
-import java.util.Set;
 import org.apache.avro.Schema;
 import org.apache.avro.generic.IndexedRecord;
 import org.apache.hadoop.conf.Configuration;
@@ -29,6 +26,7 @@ import org.apache.hudi.common.bloom.BloomFilter;
 import org.apache.hudi.common.model.HoodieFileFormat;
 import org.apache.hudi.common.util.AvroOrcUtils;
 import org.apache.hudi.common.util.BaseFileUtils;
+import org.apache.hudi.common.util.ClosableIterator;
 import org.apache.hudi.common.util.OrcReaderIterator;
 import org.apache.hudi.exception.HoodieIOException;
 import org.apache.orc.OrcFile;
@@ -37,6 +35,9 @@ import org.apache.orc.Reader.Options;
 import org.apache.orc.RecordReader;
 import org.apache.orc.TypeDescription;
 
+import java.io.IOException;
+import java.util.Set;
+
 public class HoodieOrcReader<R extends IndexedRecord> implements HoodieFileReader {
   private Path path;
   private Configuration conf;
@@ -64,12 +65,12 @@ public class HoodieOrcReader<R extends IndexedRecord> implements HoodieFileReade
   }
 
   @Override
-  public Iterator<R> getRecordIterator(Schema schema) throws IOException {
+  public ClosableIterator<R> getRecordIterator(Schema schema) throws IOException {
     try {
       Reader reader = OrcFile.createReader(path, OrcFile.readerOptions(conf));
       TypeDescription orcSchema = AvroOrcUtils.createOrcSchema(schema);
       RecordReader recordReader = reader.rows(new Options(conf).schema(orcSchema));
-      return new OrcReaderIterator(recordReader, schema, orcSchema);
+      return new OrcReaderIterator<>(recordReader, schema, orcSchema);
     } catch (IOException io) {
       throw new HoodieIOException("Unable to create an ORC reader.", io);
     }
diff --git a/hudi-common/src/main/java/org/apache/hudi/io/storage/HoodieParquetReader.java b/hudi-common/src/main/java/org/apache/hudi/io/storage/HoodieParquetReader.java
index dc368a2e08..804e4354c7 100644
--- a/hudi-common/src/main/java/org/apache/hudi/io/storage/HoodieParquetReader.java
+++ b/hudi-common/src/main/java/org/apache/hudi/io/storage/HoodieParquetReader.java
@@ -18,12 +18,6 @@
 
 package org.apache.hudi.io.storage;
 
-import java.io.IOException;
-import java.util.ArrayList;
-import java.util.Iterator;
-import java.util.List;
-import java.util.Set;
-
 import org.apache.avro.Schema;
 import org.apache.avro.generic.IndexedRecord;
 import org.apache.hadoop.conf.Configuration;
@@ -31,12 +25,17 @@ import org.apache.hadoop.fs.Path;
 import org.apache.hudi.common.bloom.BloomFilter;
 import org.apache.hudi.common.model.HoodieFileFormat;
 import org.apache.hudi.common.util.BaseFileUtils;
+import org.apache.hudi.common.util.ClosableIterator;
 import org.apache.hudi.common.util.ParquetReaderIterator;
-
 import org.apache.parquet.avro.AvroParquetReader;
 import org.apache.parquet.avro.AvroReadSupport;
 import org.apache.parquet.hadoop.ParquetReader;
 
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Set;
+
 public class HoodieParquetReader<R extends IndexedRecord> implements HoodieFileReader<R> {
   
   private final Path path;
@@ -66,10 +65,10 @@ public class HoodieParquetReader<R extends IndexedRecord> implements HoodieFileR
   }
 
   @Override
-  public Iterator<R> getRecordIterator(Schema schema) throws IOException {
+  public ClosableIterator<R> getRecordIterator(Schema schema) throws IOException {
     AvroReadSupport.setAvroReadSchema(conf, schema);
     ParquetReader<R> reader = AvroParquetReader.<R>builder(path).withConf(conf).build();
-    ParquetReaderIterator parquetReaderIterator = new ParquetReaderIterator<>(reader);
+    ParquetReaderIterator<R> parquetReaderIterator = new ParquetReaderIterator<>(reader);
     readerIterators.add(parquetReaderIterator);
     return parquetReaderIterator;
   }
@@ -81,7 +80,7 @@ public class HoodieParquetReader<R extends IndexedRecord> implements HoodieFileR
 
   @Override
   public void close() {
-    readerIterators.forEach(entry -> entry.close());
+    readerIterators.forEach(ParquetReaderIterator::close);
   }
 
   @Override
diff --git a/hudi-common/src/main/java/org/apache/hudi/metadata/BaseTableMetadata.java b/hudi-common/src/main/java/org/apache/hudi/metadata/BaseTableMetadata.java
index 73b5dcb89f..2036500ac6 100644
--- a/hudi-common/src/main/java/org/apache/hudi/metadata/BaseTableMetadata.java
+++ b/hudi-common/src/main/java/org/apache/hudi/metadata/BaseTableMetadata.java
@@ -378,7 +378,7 @@ public abstract class BaseTableMetadata implements HoodieTableMetadata {
 
   protected abstract Option<HoodieRecord<HoodieMetadataPayload>> getRecordByKey(String key, String partitionName);
 
-  protected abstract List<Pair<String, Option<HoodieRecord<HoodieMetadataPayload>>>> getRecordsByKeys(List<String> key, String partitionName);
+  public abstract List<Pair<String, Option<HoodieRecord<HoodieMetadataPayload>>>> getRecordsByKeys(List<String> key, String partitionName);
 
   protected HoodieEngineContext getEngineContext() {
     return engineContext != null ? engineContext : new HoodieLocalEngineContext(hadoopConf.get());
diff --git a/hudi-common/src/main/java/org/apache/hudi/metadata/FileSystemBackedTableMetadata.java b/hudi-common/src/main/java/org/apache/hudi/metadata/FileSystemBackedTableMetadata.java
index b4a76a7282..b77bb12c49 100644
--- a/hudi-common/src/main/java/org/apache/hudi/metadata/FileSystemBackedTableMetadata.java
+++ b/hudi-common/src/main/java/org/apache/hudi/metadata/FileSystemBackedTableMetadata.java
@@ -21,9 +21,11 @@ package org.apache.hudi.metadata;
 import org.apache.hudi.avro.model.HoodieMetadataColumnStats;
 import org.apache.hudi.common.bloom.BloomFilter;
 import org.apache.hudi.common.config.SerializableConfiguration;
+import org.apache.hudi.common.data.HoodieData;
 import org.apache.hudi.common.engine.HoodieEngineContext;
 import org.apache.hudi.common.fs.FSUtils;
 import org.apache.hudi.common.model.HoodiePartitionMetadata;
+import org.apache.hudi.common.model.HoodieRecord;
 import org.apache.hudi.common.table.HoodieTableMetaClient;
 import org.apache.hudi.common.util.Option;
 import org.apache.hudi.common.util.collection.Pair;
@@ -159,4 +161,9 @@ public class FileSystemBackedTableMetadata implements HoodieTableMetadata {
       throws HoodieMetadataException {
     throw new HoodieMetadataException("Unsupported operation: getColumnsStats!");
   }
+
+  @Override
+  public HoodieData<HoodieRecord<HoodieMetadataPayload>> getRecordsByKeyPrefixes(List<String> keyPrefixes, String partitionName) {
+    throw new HoodieMetadataException("Unsupported operation: getRecordsByKeyPrefixes!");
+  }
 }
diff --git a/hudi-common/src/main/java/org/apache/hudi/metadata/HoodieBackedTableMetadata.java b/hudi-common/src/main/java/org/apache/hudi/metadata/HoodieBackedTableMetadata.java
index a4bc8c5524..cf941bb70c 100644
--- a/hudi-common/src/main/java/org/apache/hudi/metadata/HoodieBackedTableMetadata.java
+++ b/hudi-common/src/main/java/org/apache/hudi/metadata/HoodieBackedTableMetadata.java
@@ -18,6 +18,9 @@
 
 package org.apache.hudi.metadata;
 
+import org.apache.avro.Schema;
+import org.apache.avro.generic.GenericRecord;
+import org.apache.hadoop.fs.Path;
 import org.apache.hudi.avro.HoodieAvroUtils;
 import org.apache.hudi.avro.model.HoodieMetadataRecord;
 import org.apache.hudi.avro.model.HoodieRestoreMetadata;
@@ -25,23 +28,24 @@ import org.apache.hudi.avro.model.HoodieRollbackMetadata;
 import org.apache.hudi.common.config.HoodieCommonConfig;
 import org.apache.hudi.common.config.HoodieMetadataConfig;
 import org.apache.hudi.common.config.SerializableConfiguration;
+import org.apache.hudi.common.data.HoodieData;
 import org.apache.hudi.common.engine.HoodieEngineContext;
+import org.apache.hudi.common.function.SerializableFunction;
 import org.apache.hudi.common.model.FileSlice;
 import org.apache.hudi.common.model.HoodieAvroRecord;
 import org.apache.hudi.common.model.HoodieBaseFile;
 import org.apache.hudi.common.model.HoodieLogFile;
 import org.apache.hudi.common.model.HoodieRecord;
-import org.apache.hudi.common.model.HoodieRecordPayload;
 import org.apache.hudi.common.table.HoodieTableConfig;
 import org.apache.hudi.common.table.HoodieTableMetaClient;
 import org.apache.hudi.common.table.timeline.HoodieActiveTimeline;
 import org.apache.hudi.common.table.timeline.HoodieInstant;
 import org.apache.hudi.common.table.timeline.HoodieTimeline;
 import org.apache.hudi.common.table.timeline.TimelineMetadataUtils;
+import org.apache.hudi.common.util.ClosableIterator;
 import org.apache.hudi.common.util.HoodieTimer;
 import org.apache.hudi.common.util.Option;
 import org.apache.hudi.common.util.SpillableMapUtils;
-import org.apache.hudi.common.util.ValidationUtils;
 import org.apache.hudi.common.util.collection.Pair;
 import org.apache.hudi.exception.HoodieException;
 import org.apache.hudi.exception.HoodieIOException;
@@ -49,10 +53,6 @@ import org.apache.hudi.exception.HoodieMetadataException;
 import org.apache.hudi.exception.TableNotFoundException;
 import org.apache.hudi.io.storage.HoodieFileReader;
 import org.apache.hudi.io.storage.HoodieFileReaderFactory;
-
-import org.apache.avro.Schema;
-import org.apache.avro.generic.GenericRecord;
-import org.apache.hadoop.fs.Path;
 import org.apache.log4j.LogManager;
 import org.apache.log4j.Logger;
 
@@ -60,15 +60,22 @@ import java.io.IOException;
 import java.util.ArrayList;
 import java.util.Collections;
 import java.util.HashMap;
+import java.util.Iterator;
 import java.util.LinkedList;
 import java.util.List;
 import java.util.Map;
+import java.util.Objects;
 import java.util.Set;
 import java.util.concurrent.ConcurrentHashMap;
 import java.util.concurrent.atomic.AtomicInteger;
 import java.util.stream.Collectors;
 
 import static org.apache.hudi.common.util.CollectionUtils.isNullOrEmpty;
+import static org.apache.hudi.common.util.CollectionUtils.toStream;
+import static org.apache.hudi.common.util.ValidationUtils.checkArgument;
+import static org.apache.hudi.metadata.HoodieTableMetadataUtil.PARTITION_NAME_BLOOM_FILTERS;
+import static org.apache.hudi.metadata.HoodieTableMetadataUtil.PARTITION_NAME_COLUMN_STATS;
+import static org.apache.hudi.metadata.HoodieTableMetadataUtil.PARTITION_NAME_FILES;
 
 /**
  * Table metadata provided by an internal DFS backed Hudi metadata table.
@@ -77,6 +84,8 @@ public class HoodieBackedTableMetadata extends BaseTableMetadata {
 
   private static final Logger LOG = LogManager.getLogger(HoodieBackedTableMetadata.class);
 
+  private static final Schema METADATA_RECORD_SCHEMA = HoodieMetadataRecord.getClassSchema();
+
   private String metadataBasePath;
   // Metadata table's timeline and metaclient
   private HoodieTableMetaClient metadataMetaClient;
@@ -133,28 +142,79 @@ public class HoodieBackedTableMetadata extends BaseTableMetadata {
   }
 
   @Override
-  protected List<Pair<String, Option<HoodieRecord<HoodieMetadataPayload>>>> getRecordsByKeys(List<String> keys,
-                                                                                             String partitionName) {
+  public HoodieData<HoodieRecord<HoodieMetadataPayload>> getRecordsByKeyPrefixes(List<String> keyPrefixes,
+                                                                                 String partitionName) {
+    // NOTE: Since we partition records to a particular file-group by full key, we will have
+    //       to scan all file-groups for all key-prefixes as each of these might contain some
+    //       records matching the key-prefix
+    List<FileSlice> partitionFileSlices =
+        HoodieTableMetadataUtil.getPartitionLatestMergedFileSlices(metadataMetaClient, partitionName);
+
+    return engineContext.parallelize(partitionFileSlices)
+        .flatMap(
+            (SerializableFunction<FileSlice, Iterator<Pair<String, Option<HoodieRecord<HoodieMetadataPayload>>>>>) fileSlice -> {
+              // NOTE: Since this will be executed by executors, we can't access previously cached
+              //       readers, and therefore have to always open new ones
+              Pair<HoodieFileReader, HoodieMetadataMergedLogRecordReader> readers =
+                  openReaders(partitionName, fileSlice);
+              try {
+                List<Long> timings = new ArrayList<>();
+
+                HoodieFileReader baseFileReader = readers.getKey();
+                HoodieMetadataMergedLogRecordReader logRecordScanner = readers.getRight();
+
+                if (baseFileReader == null && logRecordScanner == null) {
+                  // TODO: what do we do if both does not exist? should we throw an exception and let caller do the fallback ?
+                  return Collections.emptyIterator();
+                }
+
+                boolean fullKeys = false;
+
+                Map<String, Option<HoodieRecord<HoodieMetadataPayload>>> logRecords =
+                    readLogRecords(logRecordScanner, keyPrefixes, fullKeys, timings);
+
+                List<Pair<String, Option<HoodieRecord<HoodieMetadataPayload>>>> mergedRecords =
+                    readFromBaseAndMergeWithLogRecords(baseFileReader, keyPrefixes, fullKeys, logRecords, timings, partitionName);
+
+                LOG.debug(String.format("Metadata read for %s keys took [baseFileRead, logMerge] %s ms",
+                    keyPrefixes.size(), timings));
+
+                return mergedRecords.iterator();
+              } catch (IOException ioe) {
+                throw new HoodieIOException("Error merging records from metadata table for  " + keyPrefixes.size() + " key : ", ioe);
+              } finally {
+                closeReader(readers);
+              }
+            }
+        )
+        .map(keyRecordPair -> keyRecordPair.getValue().orElse(null))
+        .filter(Objects::nonNull);
+  }
+
+  @Override
+  public List<Pair<String, Option<HoodieRecord<HoodieMetadataPayload>>>> getRecordsByKeys(List<String> keys,
+                                                                                          String partitionName) {
     Map<Pair<String, FileSlice>, List<String>> partitionFileSliceToKeysMap = getPartitionFileSliceToKeysMapping(partitionName, keys);
     List<Pair<String, Option<HoodieRecord<HoodieMetadataPayload>>>> result = new ArrayList<>();
     AtomicInteger fileSlicesKeysCount = new AtomicInteger();
     partitionFileSliceToKeysMap.forEach((partitionFileSlicePair, fileSliceKeys) -> {
-      Pair<HoodieFileReader, HoodieMetadataMergedLogRecordReader> readers = openReadersIfNeeded(partitionName,
-          partitionFileSlicePair.getRight());
+      Pair<HoodieFileReader, HoodieMetadataMergedLogRecordReader> readers =
+          getOrCreateReaders(partitionName, partitionFileSlicePair.getRight());
       try {
         List<Long> timings = new ArrayList<>();
         HoodieFileReader baseFileReader = readers.getKey();
         HoodieMetadataMergedLogRecordReader logRecordScanner = readers.getRight();
-
         if (baseFileReader == null && logRecordScanner == null) {
           return;
         }
 
-        // local map to assist in merging with base file records
-        Map<String, Option<HoodieRecord<HoodieMetadataPayload>>> logRecords = readLogRecords(logRecordScanner,
-            fileSliceKeys, timings);
-        result.addAll(readFromBaseAndMergeWithLogRecords(baseFileReader, fileSliceKeys, logRecords,
+        boolean fullKeys = true;
+        Map<String, Option<HoodieRecord<HoodieMetadataPayload>>> logRecords =
+            readLogRecords(logRecordScanner, fileSliceKeys, fullKeys, timings);
+
+        result.addAll(readFromBaseAndMergeWithLogRecords(baseFileReader, fileSliceKeys, fullKeys, logRecords,
             timings, partitionName));
+
         LOG.debug(String.format("Metadata read for %s keys took [baseFileRead, logMerge] %s ms",
             fileSliceKeys.size(), timings));
         fileSlicesKeysCount.addAndGet(fileSliceKeys.size());
@@ -171,81 +231,127 @@ public class HoodieBackedTableMetadata extends BaseTableMetadata {
   }
 
   private Map<String, Option<HoodieRecord<HoodieMetadataPayload>>> readLogRecords(HoodieMetadataMergedLogRecordReader logRecordScanner,
-                                                                                  List<String> keys, List<Long> timings) {
+                                                                                  List<String> keys,
+                                                                                  boolean fullKey,
+                                                                                  List<Long> timings) {
     HoodieTimer timer = new HoodieTimer().startTimer();
-    Map<String, Option<HoodieRecord<HoodieMetadataPayload>>> logRecords = new HashMap<>();
-    // Retrieve records from log file
     timer.startTimer();
-    if (logRecordScanner != null) {
-      if (metadataConfig.enableFullScan()) {
-        // path which does full scan of log files
-        for (String key : keys) {
-          logRecords.put(key, logRecordScanner.getRecordByKey(key).get(0).getValue());
-        }
-      } else {
-        // this path will do seeks pertaining to the keys passed in
-        List<Pair<String, Option<HoodieRecord<HoodieMetadataPayload>>>> logRecordsList = logRecordScanner.getRecordsByKeys(keys);
-        for (Pair<String, Option<HoodieRecord<HoodieMetadataPayload>>> entry : logRecordsList) {
-          logRecords.put(entry.getKey(), entry.getValue());
-        }
+
+    if (logRecordScanner == null) {
+      timings.add(timer.endTimer());
+      return Collections.emptyMap();
+    }
+
+    String partitionName = logRecordScanner.getPartitionName().get();
+
+    Map<String, Option<HoodieRecord<HoodieMetadataPayload>>> logRecords = new HashMap<>();
+    if (isFullScanAllowedForPartition(partitionName)) {
+      checkArgument(fullKey, "If full-scan is required, only full keys could be used!");
+      // Path which does full scan of log files
+      for (String key : keys) {
+        logRecords.put(key, logRecordScanner.getRecordByKey(key).get(0).getValue());
       }
     } else {
-      for (String key : keys) {
-        logRecords.put(key, Option.empty());
+      // This path will do seeks pertaining to the keys passed in
+      List<Pair<String, Option<HoodieRecord<HoodieMetadataPayload>>>> logRecordsList =
+          fullKey ? logRecordScanner.getRecordsByKeys(keys)
+              : logRecordScanner.getRecordsByKeyPrefixes(keys)
+                  .stream()
+                  .map(record -> Pair.of(record.getRecordKey(), Option.of(record)))
+                  .collect(Collectors.toList());
+
+      for (Pair<String, Option<HoodieRecord<HoodieMetadataPayload>>> entry : logRecordsList) {
+        logRecords.put(entry.getKey(), entry.getValue());
       }
     }
+
     timings.add(timer.endTimer());
     return logRecords;
   }
 
   private List<Pair<String, Option<HoodieRecord<HoodieMetadataPayload>>>> readFromBaseAndMergeWithLogRecords(HoodieFileReader baseFileReader,
-                                                                                                             List<String> keys, Map<String,
-      Option<HoodieRecord<HoodieMetadataPayload>>> logRecords, List<Long> timings, String partitionName) throws IOException {
-    List<Pair<String, Option<HoodieRecord<HoodieMetadataPayload>>>> result = new ArrayList<>();
-    // merge with base records
+                                                                                                             List<String> keys,
+                                                                                                             boolean fullKeys,
+                                                                                                             Map<String, Option<HoodieRecord<HoodieMetadataPayload>>> logRecords,
+                                                                                                             List<Long> timings,
+                                                                                                             String partitionName) throws IOException {
     HoodieTimer timer = new HoodieTimer().startTimer();
     timer.startTimer();
-    HoodieRecord<HoodieMetadataPayload> hoodieRecord = null;
-    // Retrieve record from base file
-    if (baseFileReader != null) {
-      HoodieTimer readTimer = new HoodieTimer();
-      Map<String, GenericRecord> baseFileRecords = baseFileReader.getRecordsByKeys(keys);
-      for (String key : keys) {
-        readTimer.startTimer();
-        if (baseFileRecords.containsKey(key)) {
-          hoodieRecord = getRecord(Option.of(baseFileRecords.get(key)), partitionName);
-          metrics.ifPresent(m -> m.updateMetrics(HoodieMetadataMetrics.BASEFILE_READ_STR, readTimer.endTimer()));
-          // merge base file record w/ log record if present
-          if (logRecords.containsKey(key) && logRecords.get(key).isPresent()) {
-            HoodieRecordPayload mergedPayload = logRecords.get(key).get().getData().preCombine(hoodieRecord.getData());
-            result.add(Pair.of(key, Option.of(new HoodieAvroRecord(hoodieRecord.getKey(), mergedPayload))));
-          } else {
-            // only base record
-            result.add(Pair.of(key, Option.of(hoodieRecord)));
-          }
-        } else {
-          // only log record
-          result.add(Pair.of(key, logRecords.get(key)));
-        }
-      }
-      timings.add(timer.endTimer());
-    } else {
-      // no base file at all
+
+    if (baseFileReader == null) {
+      // No base file at all
       timings.add(timer.endTimer());
-      for (Map.Entry<String, Option<HoodieRecord<HoodieMetadataPayload>>> entry : logRecords.entrySet()) {
-        result.add(Pair.of(entry.getKey(), entry.getValue()));
+      if (fullKeys) {
+        // In case full-keys (not key-prefixes) were provided, it's expected that the list of
+        // records will contain an (optional) entry for each corresponding key
+        return keys.stream()
+            .map(key -> Pair.of(key, logRecords.getOrDefault(key, Option.empty())))
+            .collect(Collectors.toList());
+      } else {
+        return logRecords.entrySet().stream()
+            .map(entry -> Pair.of(entry.getKey(), entry.getValue()))
+            .collect(Collectors.toList());
       }
     }
-    return result;
+
+    List<Pair<String, Option<HoodieRecord<HoodieMetadataPayload>>>> result = new ArrayList<>();
+
+    HoodieTimer readTimer = new HoodieTimer();
+    readTimer.startTimer();
+
+    Map<String, HoodieRecord<HoodieMetadataPayload>> records =
+        fetchBaseFileRecordsByKeys(baseFileReader, keys, fullKeys, partitionName);
+
+    metrics.ifPresent(m -> m.updateMetrics(HoodieMetadataMetrics.BASEFILE_READ_STR, readTimer.endTimer()));
+
+    // Iterate over all provided log-records, merging them into existing records
+    for (Option<HoodieRecord<HoodieMetadataPayload>> logRecordOpt : logRecords.values()) {
+      if (logRecordOpt.isPresent()) {
+        HoodieRecord<HoodieMetadataPayload> logRecord = logRecordOpt.get();
+        records.merge(
+            logRecord.getRecordKey(),
+            logRecord,
+            (oldRecord, newRecord) ->
+                new HoodieAvroRecord<>(oldRecord.getKey(), newRecord.getData().preCombine(oldRecord.getData()))
+        );
+      }
+    }
+
+    timings.add(timer.endTimer());
+
+    if (fullKeys) {
+      // In case full-keys (not key-prefixes) were provided, it's expected that the list of
+      // records will contain an (optional) entry for each corresponding key
+      return keys.stream()
+          .map(key -> Pair.of(key, Option.ofNullable(records.get(key))))
+          .collect(Collectors.toList());
+    } else {
+      return records.values().stream()
+          .map(record -> Pair.of(record.getRecordKey(), Option.of(record)))
+          .collect(Collectors.toList());
+    }
+  }
+
+  private Map<String, HoodieRecord<HoodieMetadataPayload>> fetchBaseFileRecordsByKeys(HoodieFileReader<GenericRecord> baseFileReader,
+                                                                                      List<String> keys,
+                                                                                      boolean fullKeys,
+                                                                                      String partitionName) throws IOException {
+    ClosableIterator<GenericRecord> records = fullKeys ? baseFileReader.getRecordsByKeysIterator(keys)
+        : baseFileReader.getRecordsByKeyPrefixIterator(keys);
+
+    return toStream(records)
+        .map(record -> Pair.of(
+            (String) record.get(HoodieMetadataPayload.KEY_FIELD_NAME),
+            composeRecord(record, partitionName)))
+        .collect(Collectors.toMap(Pair::getKey, Pair::getValue));
   }
 
-  private HoodieRecord<HoodieMetadataPayload> getRecord(Option<GenericRecord> baseRecord, String partitionName) {
-    ValidationUtils.checkState(baseRecord.isPresent());
+  private HoodieRecord<HoodieMetadataPayload> composeRecord(GenericRecord avroRecord, String partitionName) {
     if (metadataTableConfig.populateMetaFields()) {
-      return SpillableMapUtils.convertToHoodieRecordPayload(baseRecord.get(),
+      return SpillableMapUtils.convertToHoodieRecordPayload(avroRecord,
           metadataTableConfig.getPayloadClass(), metadataTableConfig.getPreCombineField(), false);
     }
-    return SpillableMapUtils.convertToHoodieRecordPayload(baseRecord.get(),
+    return SpillableMapUtils.convertToHoodieRecordPayload(avroRecord,
         metadataTableConfig.getPayloadClass(), metadataTableConfig.getPreCombineField(),
         Pair.of(metadataTableConfig.getRecordKeyFieldProp(), metadataTableConfig.getPartitionFieldProp()),
         false, Option.of(partitionName));
@@ -279,34 +385,35 @@ public class HoodieBackedTableMetadata extends BaseTableMetadata {
    * Create a file reader and the record scanner for a given partition and file slice
    * if readers are not already available.
    *
-   * @param partitionName - Partition name
-   * @param slice         - The file slice to open readers for
+   * @param partitionName    - Partition name
+   * @param slice            - The file slice to open readers for
    * @return File reader and the record scanner pair for the requested file slice
    */
-  private Pair<HoodieFileReader, HoodieMetadataMergedLogRecordReader> openReadersIfNeeded(String partitionName, FileSlice slice) {
-    return partitionReaders.computeIfAbsent(Pair.of(partitionName, slice.getFileId()), k -> {
-      try {
-        HoodieTimer timer = new HoodieTimer().startTimer();
-
-        // Open base file reader
-        Pair<HoodieFileReader, Long> baseFileReaderOpenTimePair = getBaseFileReader(slice, timer);
-        HoodieFileReader baseFileReader = baseFileReaderOpenTimePair.getKey();
-        final long baseFileOpenMs = baseFileReaderOpenTimePair.getValue();
-
-        // Open the log record scanner using the log files from the latest file slice
-        List<HoodieLogFile> logFiles = slice.getLogFiles().collect(Collectors.toList());
-        Pair<HoodieMetadataMergedLogRecordReader, Long> logRecordScannerOpenTimePair =
-            getLogRecordScanner(logFiles, partitionName);
-        HoodieMetadataMergedLogRecordReader logRecordScanner = logRecordScannerOpenTimePair.getKey();
-        final long logScannerOpenMs = logRecordScannerOpenTimePair.getValue();
-
-        metrics.ifPresent(metrics -> metrics.updateMetrics(HoodieMetadataMetrics.SCAN_STR,
-            +baseFileOpenMs + logScannerOpenMs));
-        return Pair.of(baseFileReader, logRecordScanner);
-      } catch (IOException e) {
-        throw new HoodieIOException("Error opening readers for metadata table partition " + partitionName, e);
-      }
-    });
+  private Pair<HoodieFileReader, HoodieMetadataMergedLogRecordReader> getOrCreateReaders(String partitionName, FileSlice slice) {
+    return partitionReaders.computeIfAbsent(Pair.of(partitionName, slice.getFileId()), k -> openReaders(partitionName, slice));
+  }
+
+  private Pair<HoodieFileReader, HoodieMetadataMergedLogRecordReader> openReaders(String partitionName, FileSlice slice) {
+    try {
+      HoodieTimer timer = new HoodieTimer().startTimer();
+      // Open base file reader
+      Pair<HoodieFileReader, Long> baseFileReaderOpenTimePair = getBaseFileReader(slice, timer);
+      HoodieFileReader baseFileReader = baseFileReaderOpenTimePair.getKey();
+      final long baseFileOpenMs = baseFileReaderOpenTimePair.getValue();
+
+      // Open the log record scanner using the log files from the latest file slice
+      List<HoodieLogFile> logFiles = slice.getLogFiles().collect(Collectors.toList());
+      Pair<HoodieMetadataMergedLogRecordReader, Long> logRecordScannerOpenTimePair =
+          getLogRecordScanner(logFiles, partitionName);
+      HoodieMetadataMergedLogRecordReader logRecordScanner = logRecordScannerOpenTimePair.getKey();
+      final long logScannerOpenMs = logRecordScannerOpenTimePair.getValue();
+
+      metrics.ifPresent(metrics -> metrics.updateMetrics(HoodieMetadataMetrics.SCAN_STR,
+          +baseFileOpenMs + logScannerOpenMs));
+      return Pair.of(baseFileReader, logRecordScanner);
+    } catch (IOException e) {
+      throw new HoodieIOException("Error opening readers for metadata table partition " + partitionName, e);
+    }
   }
 
   private Pair<HoodieFileReader, Long> getBaseFileReader(FileSlice slice, HoodieTimer timer) throws IOException {
@@ -349,7 +456,14 @@ public class HoodieBackedTableMetadata extends BaseTableMetadata {
     return validInstantTimestamps;
   }
 
-  public Pair<HoodieMetadataMergedLogRecordReader, Long> getLogRecordScanner(List<HoodieLogFile> logFiles, String partitionName) {
+  public Pair<HoodieMetadataMergedLogRecordReader, Long> getLogRecordScanner(List<HoodieLogFile> logFiles,
+                                                                             String partitionName) {
+    return getLogRecordScanner(logFiles, partitionName, Option.empty());
+  }
+
+  public Pair<HoodieMetadataMergedLogRecordReader, Long> getLogRecordScanner(List<HoodieLogFile> logFiles,
+                                                                             String partitionName,
+                                                                             Option<Boolean> allowFullScanOverride) {
     HoodieTimer timer = new HoodieTimer().startTimer();
     List<String> sortedLogFilePaths = logFiles.stream()
         .sorted(HoodieLogFile.getLogFileComparator())
@@ -363,6 +477,8 @@ public class HoodieBackedTableMetadata extends BaseTableMetadata {
     Option<HoodieInstant> latestMetadataInstant = metadataMetaClient.getActiveTimeline().filterCompletedInstants().lastInstant();
     String latestMetadataInstantTime = latestMetadataInstant.map(HoodieInstant::getTimestamp).orElse(SOLO_COMMIT_TIMESTAMP);
 
+    boolean allowFullScan = allowFullScanOverride.orElseGet(() -> isFullScanAllowedForPartition(partitionName));
+
     // Load the schema
     Schema schema = HoodieAvroUtils.addMetadataFields(HoodieMetadataRecord.getClassSchema());
     HoodieCommonConfig commonConfig = HoodieCommonConfig.newBuilder().fromProperties(metadataConfig.getProps()).build();
@@ -378,7 +494,7 @@ public class HoodieBackedTableMetadata extends BaseTableMetadata {
         .withDiskMapType(commonConfig.getSpillableDiskMapType())
         .withBitCaskDiskMapCompressionEnabled(commonConfig.isBitCaskDiskMapCompressionEnabled())
         .withLogBlockTimestamps(validInstantTimestamps)
-        .enableFullScan(metadataConfig.enableFullScan())
+        .allowFullScan(allowFullScan)
         .withPartition(partitionName)
         .build();
 
@@ -388,6 +504,21 @@ public class HoodieBackedTableMetadata extends BaseTableMetadata {
     return Pair.of(logRecordScanner, logScannerOpenMs);
   }
 
+  // NOTE: We're allowing eager full-scan of the log-files only for "files" partition.
+  //       Other partitions (like "column_stats", "bloom_filters") will have to be fetched
+  //       t/h point-lookups
+  private boolean isFullScanAllowedForPartition(String partitionName) {
+    switch (partitionName) {
+      case PARTITION_NAME_FILES:
+        return metadataConfig.allowFullScan();
+
+      case PARTITION_NAME_COLUMN_STATS:
+      case PARTITION_NAME_BLOOM_FILTERS:
+      default:
+        return false;
+    }
+  }
+
   /**
    * Returns a list of commits which were rolled back as part of a Rollback or Restore operation.
    *
@@ -433,6 +564,10 @@ public class HoodieBackedTableMetadata extends BaseTableMetadata {
   private synchronized void close(Pair<String, String> partitionFileSlicePair) {
     Pair<HoodieFileReader, HoodieMetadataMergedLogRecordReader> readers =
         partitionReaders.remove(partitionFileSlicePair);
+    closeReader(readers);
+  }
+
+  private void closeReader(Pair<HoodieFileReader, HoodieMetadataMergedLogRecordReader> readers) {
     if (readers != null) {
       try {
         if (readers.getKey() != null) {
diff --git a/hudi-common/src/main/java/org/apache/hudi/metadata/HoodieMetadataMergedLogRecordReader.java b/hudi-common/src/main/java/org/apache/hudi/metadata/HoodieMetadataMergedLogRecordReader.java
index a024c9c3dc..d8c631a22a 100644
--- a/hudi-common/src/main/java/org/apache/hudi/metadata/HoodieMetadataMergedLogRecordReader.java
+++ b/hudi-common/src/main/java/org/apache/hudi/metadata/HoodieMetadataMergedLogRecordReader.java
@@ -18,11 +18,13 @@
 
 package org.apache.hudi.metadata;
 
+import org.apache.avro.Schema;
+import org.apache.avro.generic.GenericRecord;
+import org.apache.avro.generic.IndexedRecord;
+import org.apache.hadoop.fs.FileSystem;
 import org.apache.hudi.common.config.HoodieMetadataConfig;
-import org.apache.hudi.common.model.DeleteRecord;
 import org.apache.hudi.common.model.HoodieAvroRecord;
 import org.apache.hudi.common.model.HoodieRecord;
-import org.apache.hudi.common.model.HoodieRecordPayload;
 import org.apache.hudi.common.table.HoodieTableConfig;
 import org.apache.hudi.common.table.log.HoodieMergedLogRecordScanner;
 import org.apache.hudi.common.table.log.InstantRange;
@@ -31,19 +33,16 @@ import org.apache.hudi.common.util.SpillableMapUtils;
 import org.apache.hudi.common.util.collection.ExternalSpillableMap;
 import org.apache.hudi.common.util.collection.Pair;
 import org.apache.hudi.internal.schema.InternalSchema;
-
-import org.apache.avro.Schema;
-import org.apache.avro.generic.GenericRecord;
-import org.apache.avro.generic.IndexedRecord;
-import org.apache.hadoop.fs.FileSystem;
 import org.apache.log4j.LogManager;
 import org.apache.log4j.Logger;
 
-import java.io.IOException;
-import java.util.ArrayList;
 import java.util.Collections;
 import java.util.List;
+import java.util.Objects;
 import java.util.Set;
+import java.util.stream.Collectors;
+
+import static org.apache.hudi.common.util.ValidationUtils.checkState;
 
 /**
  * A {@code HoodieMergedLogRecordScanner} implementation which only merged records matching providing keys. This is
@@ -53,38 +52,16 @@ public class HoodieMetadataMergedLogRecordReader extends HoodieMergedLogRecordSc
 
   private static final Logger LOG = LogManager.getLogger(HoodieMetadataMergedLogRecordReader.class);
 
-  // Set of all record keys that are to be read in memory
-  private Set<String> mergeKeyFilter;
-
   private HoodieMetadataMergedLogRecordReader(FileSystem fs, String basePath, String partitionName,
                                               List<String> logFilePaths,
                                               Schema readerSchema, String latestInstantTime,
                                               Long maxMemorySizeInBytes, int bufferSize,
-                                              String spillableMapBasePath, Set<String> mergeKeyFilter,
+                                              String spillableMapBasePath,
                                               ExternalSpillableMap.DiskMapType diskMapType,
                                               boolean isBitCaskDiskMapCompressionEnabled,
-                                              Option<InstantRange> instantRange, boolean enableFullScan) {
+                                              Option<InstantRange> instantRange, boolean allowFullScan) {
     super(fs, basePath, logFilePaths, readerSchema, latestInstantTime, maxMemorySizeInBytes, false, false, bufferSize,
-        spillableMapBasePath, instantRange, false, diskMapType, isBitCaskDiskMapCompressionEnabled, false,
-        enableFullScan, Option.of(partitionName), InternalSchema.getEmptyInternalSchema());
-    this.mergeKeyFilter = mergeKeyFilter;
-    if (enableFullScan) {
-      performScan();
-    }
-  }
-
-  @Override
-  protected void processNextRecord(HoodieRecord<? extends HoodieRecordPayload> hoodieRecord) throws IOException {
-    if (mergeKeyFilter.isEmpty() || mergeKeyFilter.contains(hoodieRecord.getRecordKey())) {
-      super.processNextRecord(hoodieRecord);
-    }
-  }
-
-  @Override
-  protected void processNextDeletedRecord(DeleteRecord deleteRecord) {
-    if (mergeKeyFilter.isEmpty() || mergeKeyFilter.contains(deleteRecord.getRecordKey())) {
-      super.processNextDeletedRecord(deleteRecord);
-    }
+        spillableMapBasePath, instantRange, diskMapType, isBitCaskDiskMapCompressionEnabled, false, allowFullScan, Option.of(partitionName), InternalSchema.getEmptyInternalSchema());
   }
 
   @Override
@@ -118,24 +95,37 @@ public class HoodieMetadataMergedLogRecordReader extends HoodieMergedLogRecordSc
    * @return {@code HoodieRecord} if key was found else {@code Option.empty()}
    */
   public synchronized List<Pair<String, Option<HoodieRecord<HoodieMetadataPayload>>>> getRecordByKey(String key) {
+    checkState(forceFullScan, "Record reader has to be in full-scan mode to use this API");
     return Collections.singletonList(Pair.of(key, Option.ofNullable((HoodieRecord) records.get(key))));
   }
 
+  @SuppressWarnings("unchecked")
+  public List<HoodieRecord<HoodieMetadataPayload>> getRecordsByKeyPrefixes(List<String> keyPrefixes) {
+    // Following operations have to be atomic, otherwise concurrent
+    // readers would race with each other and could crash when
+    // processing log block records as part of scan.
+    synchronized (this) {
+      records.clear();
+      scanInternal(Option.of(new KeySpec(keyPrefixes, false)));
+      return records.values().stream()
+          .filter(Objects::nonNull)
+          .map(record -> (HoodieRecord<HoodieMetadataPayload>) record)
+          .collect(Collectors.toList());
+    }
+  }
+
+  @SuppressWarnings("unchecked")
   public synchronized List<Pair<String, Option<HoodieRecord<HoodieMetadataPayload>>>> getRecordsByKeys(List<String> keys) {
     // Following operations have to be atomic, otherwise concurrent
     // readers would race with each other and could crash when
     // processing log block records as part of scan.
-    records.clear();
-    scan(Option.of(keys));
-    List<Pair<String, Option<HoodieRecord<HoodieMetadataPayload>>>> metadataRecords = new ArrayList<>();
-    keys.forEach(entry -> {
-      if (records.containsKey(entry)) {
-        metadataRecords.add(Pair.of(entry, Option.ofNullable((HoodieRecord) records.get(entry))));
-      } else {
-        metadataRecords.add(Pair.of(entry, Option.empty()));
-      }
-    });
-    return metadataRecords;
+    synchronized (this) {
+      records.clear();
+      scan(keys);
+      return keys.stream()
+          .map(key -> Pair.of(key, Option.ofNullable((HoodieRecord<HoodieMetadataPayload>) records.get(key))))
+          .collect(Collectors.toList());
+    }
   }
 
   @Override
@@ -147,9 +137,7 @@ public class HoodieMetadataMergedLogRecordReader extends HoodieMergedLogRecordSc
    * Builder used to build {@code HoodieMetadataMergedLogRecordScanner}.
    */
   public static class Builder extends HoodieMergedLogRecordScanner.Builder {
-    private Set<String> mergeKeyFilter = Collections.emptySet();
-    private boolean enableFullScan = HoodieMetadataConfig.ENABLE_FULL_SCAN_LOG_FILES.defaultValue();
-    private boolean enableInlineReading;
+    private boolean allowFullScan = HoodieMetadataConfig.ENABLE_FULL_SCAN_LOG_FILES.defaultValue();
 
     @Override
     public Builder withFileSystem(FileSystem fs) {
@@ -227,26 +215,21 @@ public class HoodieMetadataMergedLogRecordReader extends HoodieMergedLogRecordSc
       return this;
     }
 
-    public Builder withMergeKeyFilter(Set<String> mergeKeyFilter) {
-      this.mergeKeyFilter = mergeKeyFilter;
-      return this;
-    }
-
     public Builder withLogBlockTimestamps(Set<String> validLogBlockTimestamps) {
       withInstantRange(Option.of(new ExplicitMatchRange(validLogBlockTimestamps)));
       return this;
     }
 
-    public Builder enableFullScan(boolean enableFullScan) {
-      this.enableFullScan = enableFullScan;
+    public Builder allowFullScan(boolean enableFullScan) {
+      this.allowFullScan = enableFullScan;
       return this;
     }
 
     @Override
     public HoodieMetadataMergedLogRecordReader build() {
       return new HoodieMetadataMergedLogRecordReader(fs, basePath, partitionName, logFilePaths, readerSchema,
-          latestInstantTime, maxMemorySizeInBytes, bufferSize, spillableMapBasePath, mergeKeyFilter,
-          diskMapType, isBitCaskDiskMapCompressionEnabled, instantRange, enableFullScan);
+          latestInstantTime, maxMemorySizeInBytes, bufferSize, spillableMapBasePath,
+          diskMapType, isBitCaskDiskMapCompressionEnabled, instantRange, allowFullScan);
     }
   }
 
diff --git a/hudi-common/src/main/java/org/apache/hudi/metadata/HoodieMetadataPayload.java b/hudi-common/src/main/java/org/apache/hudi/metadata/HoodieMetadataPayload.java
index 01618c5f37..1b33de795b 100644
--- a/hudi-common/src/main/java/org/apache/hudi/metadata/HoodieMetadataPayload.java
+++ b/hudi-common/src/main/java/org/apache/hudi/metadata/HoodieMetadataPayload.java
@@ -75,8 +75,8 @@ import java.util.stream.Collectors;
 import java.util.stream.Stream;
 
 import static org.apache.hudi.TypeUtils.unsafeCast;
-import static org.apache.hudi.common.util.DateTimeUtils.microsToInstant;
 import static org.apache.hudi.common.util.DateTimeUtils.instantToMicros;
+import static org.apache.hudi.common.util.DateTimeUtils.microsToInstant;
 import static org.apache.hudi.common.util.ValidationUtils.checkArgument;
 import static org.apache.hudi.common.util.ValidationUtils.checkState;
 import static org.apache.hudi.metadata.HoodieTableMetadata.RECORDKEY_PARTITION_LIST;
@@ -391,7 +391,7 @@ public class HoodieMetadataPayload implements HoodieRecordPayload<HoodieMetadata
   }
 
   @Override
-  public Option<IndexedRecord> getInsertValue(Schema schema, Properties properties) throws IOException {
+  public Option<IndexedRecord> getInsertValue(Schema schemaIgnored, Properties propertiesIgnored) throws IOException {
     if (key == null) {
       return Option.empty();
     }
diff --git a/hudi-common/src/main/java/org/apache/hudi/metadata/HoodieTableMetadata.java b/hudi-common/src/main/java/org/apache/hudi/metadata/HoodieTableMetadata.java
index e206072866..a059b57845 100644
--- a/hudi-common/src/main/java/org/apache/hudi/metadata/HoodieTableMetadata.java
+++ b/hudi-common/src/main/java/org/apache/hudi/metadata/HoodieTableMetadata.java
@@ -24,7 +24,9 @@ import org.apache.hudi.avro.model.HoodieMetadataColumnStats;
 import org.apache.hudi.common.bloom.BloomFilter;
 import org.apache.hudi.common.config.HoodieMetadataConfig;
 import org.apache.hudi.common.config.SerializableConfiguration;
+import org.apache.hudi.common.data.HoodieData;
 import org.apache.hudi.common.engine.HoodieEngineContext;
+import org.apache.hudi.common.model.HoodieRecord;
 import org.apache.hudi.common.table.HoodieTableMetaClient;
 import org.apache.hudi.common.util.Option;
 
@@ -159,6 +161,17 @@ public interface HoodieTableMetadata extends Serializable, AutoCloseable {
   Map<Pair<String, String>, HoodieMetadataColumnStats> getColumnStats(final List<Pair<String, String>> partitionNameFileNameList, final String columnName)
       throws HoodieMetadataException;
 
+  /**
+   * Fetch records by key prefixes. Key prefix passed is expected to match the same prefix as stored in Metadata table partitions. For eg, in case of col stats partition,
+   * actual keys in metadata partition is encoded values of column name, partition name and file name. So, key prefixes passed to this method is expected to be encoded already.
+   *
+   * @param keyPrefixes list of key prefixes for which interested records are looked up for.
+   * @param partitionName partition name in metadata table where the records are looked up for.
+   * @return {@link HoodieData} of {@link HoodieRecord}s with records matching the passed in key prefixes.
+   */
+  HoodieData<HoodieRecord<HoodieMetadataPayload>> getRecordsByKeyPrefixes(List<String> keyPrefixes,
+                                                                          String partitionName);
+
   /**
    * Get the instant time to which the metadata is synced w.r.t data timeline.
    */
diff --git a/hudi-common/src/main/java/org/apache/hudi/metadata/HoodieTableMetadataUtil.java b/hudi-common/src/main/java/org/apache/hudi/metadata/HoodieTableMetadataUtil.java
index b6d9224b34..21337ceaeb 100644
--- a/hudi-common/src/main/java/org/apache/hudi/metadata/HoodieTableMetadataUtil.java
+++ b/hudi-common/src/main/java/org/apache/hudi/metadata/HoodieTableMetadataUtil.java
@@ -105,9 +105,9 @@ public class HoodieTableMetadataUtil {
 
   private static final Logger LOG = LogManager.getLogger(HoodieTableMetadataUtil.class);
 
-  protected static final String PARTITION_NAME_FILES = "files";
-  protected static final String PARTITION_NAME_COLUMN_STATS = "column_stats";
-  protected static final String PARTITION_NAME_BLOOM_FILTERS = "bloom_filters";
+  public static final String PARTITION_NAME_FILES = "files";
+  public static final String PARTITION_NAME_COLUMN_STATS = "column_stats";
+  public static final String PARTITION_NAME_BLOOM_FILTERS = "bloom_filters";
 
   /**
    * Collects {@link HoodieColumnRangeMetadata} for the provided collection of records, pretending
@@ -815,7 +815,7 @@ public class HoodieTableMetadataUtil {
    * @param path
    * @return
    */
-  static String getPartition(@Nonnull String path) {
+  public static String getPartition(@Nonnull String path) {
     return EMPTY_PARTITION_NAME.equals(path) ? NON_PARTITIONED_NAME : path;
   }
 
diff --git a/hudi-common/src/main/java/org/apache/hudi/util/LazyRef.java b/hudi-common/src/main/java/org/apache/hudi/util/LazyRef.java
new file mode 100644
index 0000000000..e4c4f881fc
--- /dev/null
+++ b/hudi-common/src/main/java/org/apache/hudi/util/LazyRef.java
@@ -0,0 +1,64 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hudi.util;
+
+import java.util.function.Supplier;
+
+// TODO java-doc
+public class LazyRef<T> {
+
+  private volatile boolean initialized;
+
+  private Supplier<T> initializer;
+  private T ref;
+
+  private LazyRef(Supplier<T> initializer) {
+    this.initializer = initializer;
+    this.ref = null;
+    this.initialized = false;
+  }
+
+  private LazyRef(T ref) {
+    this.initializer = null;
+    this.ref = ref;
+    this.initialized = true;
+  }
+
+  public T get() {
+    if (!initialized) {
+      synchronized (this) {
+        if (!initialized) {
+          this.ref = initializer.get();
+          this.initializer = null;
+          initialized = true;
+        }
+      }
+    }
+
+    return ref;
+  }
+
+  public static <T> LazyRef<T> lazy(Supplier<T> initializer) {
+    return new LazyRef<>(initializer);
+  }
+
+  public static <T> LazyRef<T> eager(T ref) {
+    return new LazyRef<>(ref);
+  }
+}
diff --git a/hudi-common/src/test/java/org/apache/hudi/common/functional/TestHoodieLogFormat.java b/hudi-common/src/test/java/org/apache/hudi/common/functional/TestHoodieLogFormat.java
index 71917f9f56..4fa53bb41f 100755
--- a/hudi-common/src/test/java/org/apache/hudi/common/functional/TestHoodieLogFormat.java
+++ b/hudi-common/src/test/java/org/apache/hudi/common/functional/TestHoodieLogFormat.java
@@ -2038,7 +2038,7 @@ public class TestHoodieLogFormat extends HoodieCommonTestHarness {
    * Utility to convert the given iterator to a List.
    */
   private static List<IndexedRecord> getRecords(HoodieDataBlock dataBlock) {
-    ClosableIterator<IndexedRecord> itr = dataBlock.getRecordItr();
+    ClosableIterator<IndexedRecord> itr = dataBlock.getRecordIterator();
 
     List<IndexedRecord> elements = new ArrayList<>();
     itr.forEachRemaining(elements::add);
diff --git a/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/ColumnStatsIndexSupport.scala b/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/ColumnStatsIndexSupport.scala
index ea4417033a..d176a3755f 100644
--- a/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/ColumnStatsIndexSupport.scala
+++ b/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/ColumnStatsIndexSupport.scala
@@ -17,22 +17,39 @@
 
 package org.apache.hudi
 
-import org.apache.hudi.ColumnStatsIndexSupport.{composeIndexSchema, deserialize, tryUnpackNonNullVal}
-import org.apache.hudi.metadata.{HoodieMetadataPayload, MetadataPartitionType}
+import org.apache.avro.Schema.Parser
+import org.apache.avro.generic.GenericRecord
+import org.apache.hudi.ColumnStatsIndexSupport.{composeIndexSchema, deserialize, metadataRecordSchemaString, metadataRecordStructType, tryUnpackNonNullVal}
+import org.apache.hudi.HoodieConversionUtils.toScalaOption
+import org.apache.hudi.avro.model.HoodieMetadataRecord
+import org.apache.hudi.client.common.HoodieSparkEngineContext
+import org.apache.hudi.common.config.HoodieMetadataConfig
+import org.apache.hudi.common.model.HoodieRecord
+import org.apache.hudi.common.table.view.FileSystemViewStorageConfig
+import org.apache.hudi.common.util.hash.ColumnIndexID
+import org.apache.hudi.data.HoodieJavaRDD
+import org.apache.hudi.metadata.{HoodieMetadataPayload, HoodieTableMetadata, HoodieTableMetadataUtil, MetadataPartitionType}
+import org.apache.spark.api.java.JavaSparkContext
+import org.apache.spark.rdd.RDD
+import org.apache.spark.sql.catalyst.InternalRow
 import org.apache.spark.sql.catalyst.util.DateTimeUtils
 import org.apache.spark.sql.functions.col
 import org.apache.spark.sql.types._
-import org.apache.spark.sql.{DataFrame, Row, SparkSession}
+import org.apache.spark.sql.{DataFrame, HoodieUnsafeRDDUtils, Row, SparkSession}
 
+import scala.collection.JavaConverters._
 import scala.collection.immutable.TreeSet
 
 /**
  * Mixin trait abstracting away heavy-lifting of interactions with Metadata Table's Column Stats Index,
  * providing convenient interfaces to read it, transpose, etc
  */
-trait ColumnStatsIndexSupport {
+trait ColumnStatsIndexSupport extends SparkAdapterSupport {
 
-  def readColumnStatsIndex(spark: SparkSession, metadataTablePath: String): DataFrame = {
+  def readColumnStatsIndex(spark: SparkSession,
+                           tableBasePath: String,
+                           metadataConfig: HoodieMetadataConfig,
+                           targetColumns: Seq[String] = Seq.empty): DataFrame = {
     val targetColStatsIndexColumns = Seq(
       HoodieMetadataPayload.COLUMN_STATS_FIELD_FILE_NAME,
       HoodieMetadataPayload.COLUMN_STATS_FIELD_MIN_VALUE,
@@ -43,11 +60,17 @@ trait ColumnStatsIndexSupport {
       (targetColStatsIndexColumns :+ HoodieMetadataPayload.COLUMN_STATS_FIELD_COLUMN_NAME).map(colName =>
         s"${HoodieMetadataPayload.SCHEMA_FIELD_ID_COLUMN_STATS}.${colName}")
 
-    // Read Metadata Table's Column Stats Index into Spark's [[DataFrame]]
-    val metadataTableDF = spark.read.format("org.apache.hudi")
-      .load(s"$metadataTablePath/${MetadataPartitionType.COLUMN_STATS.getPartitionPath}")
+    val metadataTableDF: DataFrame = {
+      // NOTE: If specific columns have been provided, we can considerably trim down amount of data fetched
+      //       by only fetching Column Stats Index records pertaining to the requested columns.
+      //       Otherwise we fallback to read whole Column Stats Index
+      if (targetColumns.nonEmpty) {
+        readColumnStatsIndexForColumnsInternal(spark, targetColumns, metadataConfig, tableBasePath)
+      } else {
+        readFullColumnStatsIndexInternal(spark, tableBasePath)
+      }
+    }
 
-    // TODO filter on (column, partition) prefix
     val colStatsDF = metadataTableDF.where(col(HoodieMetadataPayload.SCHEMA_FIELD_ID_COLUMN_STATS).isNotNull)
       .select(requiredMetadataIndexColumns.map(col): _*)
 
@@ -105,34 +128,40 @@ trait ColumnStatsIndexSupport {
     //       of the transposed table
     val sortedColumns = TreeSet(targetColumns: _*)
 
+    val colNameOrdinal = colStatsSchemaOrdinalsMap(HoodieMetadataPayload.COLUMN_STATS_FIELD_COLUMN_NAME)
+    val minValueOrdinal = colStatsSchemaOrdinalsMap(HoodieMetadataPayload.COLUMN_STATS_FIELD_MIN_VALUE)
+    val maxValueOrdinal = colStatsSchemaOrdinalsMap(HoodieMetadataPayload.COLUMN_STATS_FIELD_MAX_VALUE)
+    val fileNameOrdinal = colStatsSchemaOrdinalsMap(HoodieMetadataPayload.COLUMN_STATS_FIELD_FILE_NAME)
+    val nullCountOrdinal = colStatsSchemaOrdinalsMap(HoodieMetadataPayload.COLUMN_STATS_FIELD_NULL_COUNT)
+
     val transposedRDD = colStatsDF.rdd
-      .filter(row => sortedColumns.contains(row.getString(colStatsSchemaOrdinalsMap("columnName"))))
+      .filter(row => sortedColumns.contains(row.getString(colNameOrdinal)))
       .map { row =>
-        val (minValue, _) = tryUnpackNonNullVal(row.getAs[Row](colStatsSchemaOrdinalsMap("minValue")))
-        val (maxValue, _) = tryUnpackNonNullVal(row.getAs[Row](colStatsSchemaOrdinalsMap("maxValue")))
+        val (minValue, _) = tryUnpackNonNullVal(row.getAs[Row](minValueOrdinal))
+        val (maxValue, _) = tryUnpackNonNullVal(row.getAs[Row](maxValueOrdinal))
 
-        val colName = row.getString(colStatsSchemaOrdinalsMap("columnName"))
+        val colName = row.getString(colNameOrdinal)
         val colType = tableSchemaFieldMap(colName).dataType
 
         val rowValsSeq = row.toSeq.toArray
 
-        rowValsSeq(colStatsSchemaOrdinalsMap("minValue")) = deserialize(minValue, colType)
-        rowValsSeq(colStatsSchemaOrdinalsMap("maxValue")) = deserialize(maxValue, colType)
+        rowValsSeq(minValueOrdinal) = deserialize(minValue, colType)
+        rowValsSeq(maxValueOrdinal) = deserialize(maxValue, colType)
 
         Row(rowValsSeq:_*)
       }
-      .groupBy(r => r.getString(colStatsSchemaOrdinalsMap("fileName")))
+      .groupBy(r => r.getString(fileNameOrdinal))
       .foldByKey(Seq[Row]()) {
         case (_, columnRows) =>
           // Rows seq is always non-empty (otherwise it won't be grouped into)
-          val fileName = columnRows.head.get(colStatsSchemaOrdinalsMap("fileName"))
+          val fileName = columnRows.head.get(fileNameOrdinal)
           val coalescedRowValuesSeq = columnRows.toSeq
             // NOTE: It's crucial to maintain appropriate ordering of the columns
             //       matching table layout
-            .sortBy(_.getString(colStatsSchemaOrdinalsMap("columnName")))
+            .sortBy(_.getString(colNameOrdinal))
             .foldLeft(Seq[Any](fileName)) {
               case (acc, columnRow) =>
-                acc ++ Seq("minValue", "maxValue", "nullCount").map(ord => columnRow.get(colStatsSchemaOrdinalsMap(ord)))
+                acc ++ Seq(minValueOrdinal, maxValueOrdinal, nullCountOrdinal).map(ord => columnRow.get(ord))
             }
 
           Seq(Row(coalescedRowValuesSeq:_*))
@@ -147,6 +176,49 @@ trait ColumnStatsIndexSupport {
 
     spark.createDataFrame(transposedRDD, indexSchema)
   }
+
+  private def readFullColumnStatsIndexInternal(spark: SparkSession, tableBasePath: String) = {
+    val metadataTablePath = HoodieTableMetadata.getMetadataTableBasePath(tableBasePath)
+    // Read Metadata Table's Column Stats Index into Spark's [[DataFrame]]
+    spark.read.format("org.apache.hudi")
+      .load(s"$metadataTablePath/${MetadataPartitionType.COLUMN_STATS.getPartitionPath}")
+  }
+
+  private def readColumnStatsIndexForColumnsInternal(spark: SparkSession, targetColumns: Seq[String], metadataConfig: HoodieMetadataConfig, tableBasePath: String) = {
+    val ctx = new HoodieSparkEngineContext(new JavaSparkContext(spark.sparkContext))
+
+    // Read Metadata Table's Column Stats Index into Spark's [[DataFrame]] by
+    //    - Fetching the records from CSI by key-prefixes (encoded column names)
+    //    - Deserializing fetched records into [[InternalRow]]s
+    //    - Composing [[DataFrame]]
+    val metadataTableDF = {
+      val metadataTable = HoodieTableMetadata.create(ctx, metadataConfig, tableBasePath, FileSystemViewStorageConfig.SPILLABLE_DIR.defaultValue)
+
+      // TODO encoding should be done internally w/in HoodieBackedTableMetadata
+      val encodedTargetColumnNames = targetColumns.map(colName => new ColumnIndexID(colName).asBase64EncodedString())
+
+      val recordsRDD: RDD[HoodieRecord[HoodieMetadataPayload]] =
+        HoodieJavaRDD.getJavaRDD(
+          metadataTable.getRecordsByKeyPrefixes(encodedTargetColumnNames.asJava, HoodieTableMetadataUtil.PARTITION_NAME_COLUMN_STATS)
+        )
+
+      val catalystRowsRDD: RDD[InternalRow] = recordsRDD.mapPartitions { it =>
+        val metadataRecordSchema = new Parser().parse(metadataRecordSchemaString)
+        val converter = AvroConversionUtils.createAvroToInternalRowConverter(metadataRecordSchema, metadataRecordStructType)
+
+        it.map { record =>
+          // schema and props are ignored for generating metadata record from the payload
+          // instead, the underlying file system, or bloom filter, or columns stats metadata (part of payload) are directly used
+          toScalaOption(record.getData.getInsertValue(null, null))
+            .flatMap(avroRecord => converter(avroRecord.asInstanceOf[GenericRecord]))
+            .orNull
+        }
+      }
+
+      HoodieUnsafeRDDUtils.createDataFrame(spark, catalystRowsRDD, metadataRecordStructType)
+    }
+    metadataTableDF
+  }
 }
 
 object ColumnStatsIndexSupport {
@@ -156,6 +228,9 @@ object ColumnStatsIndexSupport {
   private val COLUMN_STATS_INDEX_MAX_VALUE_STAT_NAME = "maxValue"
   private val COLUMN_STATS_INDEX_NUM_NULLS_STAT_NAME = "num_nulls"
 
+  private val metadataRecordSchemaString: String = HoodieMetadataRecord.SCHEMA$.toString
+  private val metadataRecordStructType: StructType = AvroConversionUtils.convertAvroSchemaToStructType(HoodieMetadataRecord.SCHEMA$)
+
   /**
    * @VisibleForTesting
    */
diff --git a/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/HoodieBaseRelation.scala b/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/HoodieBaseRelation.scala
index f79ba96d89..c33c6dce6d 100644
--- a/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/HoodieBaseRelation.scala
+++ b/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/HoodieBaseRelation.scala
@@ -27,7 +27,7 @@ import org.apache.hadoop.mapred.JobConf
 
 import org.apache.hudi.HoodieBaseRelation.getPartitionPath
 import org.apache.hudi.HoodieConversionUtils.toScalaOption
-import org.apache.hudi.common.config.SerializableConfiguration
+import org.apache.hudi.common.config.{HoodieMetadataConfig, SerializableConfiguration}
 import org.apache.hudi.common.fs.FSUtils
 import org.apache.hudi.common.model.{HoodieFileFormat, HoodieRecord}
 import org.apache.hudi.common.table.timeline.{HoodieInstant, HoodieTimeline}
@@ -68,7 +68,8 @@ case class HoodieTableState(tablePath: String,
                             recordKeyField: String,
                             preCombineFieldOpt: Option[String],
                             usesVirtualKeys: Boolean,
-                            recordPayloadClassName: String)
+                            recordPayloadClassName: String,
+                            metadataConfig: HoodieMetadataConfig)
 
 /**
  * Hoodie BaseRelation which extends [[PrunedFilteredScan]].
@@ -135,7 +136,7 @@ abstract class HoodieBaseRelation(val sqlContext: SQLContext,
     val internalSchemaFromMeta = try {
       schemaUtil.getTableInternalSchemaFromCommitMetadata.orElse(InternalSchema.getEmptyInternalSchema)
     } catch {
-      case _ => InternalSchema.getEmptyInternalSchema
+      case _: Exception => InternalSchema.getEmptyInternalSchema
     }
     (avroSchema, internalSchemaFromMeta)
   }
@@ -339,7 +340,8 @@ abstract class HoodieBaseRelation(val sqlContext: SQLContext,
       recordKeyField = recordKeyField,
       preCombineFieldOpt = preCombineFieldOpt,
       usesVirtualKeys = !tableConfig.populateMetaFields(),
-      recordPayloadClassName = tableConfig.getPayloadClass
+      recordPayloadClassName = tableConfig.getPayloadClass,
+      metadataConfig = fileIndex.metadataConfig
     )
   }
 
diff --git a/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/HoodieFileIndex.scala b/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/HoodieFileIndex.scala
index 82cd1f4019..2cea67d275 100644
--- a/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/HoodieFileIndex.scala
+++ b/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/HoodieFileIndex.scala
@@ -26,7 +26,7 @@ import org.apache.hudi.common.util.StringUtils
 import org.apache.hudi.exception.HoodieException
 import org.apache.hudi.keygen.constant.KeyGeneratorOptions
 import org.apache.hudi.keygen.{TimestampBasedAvroKeyGenerator, TimestampBasedKeyGenerator}
-import org.apache.hudi.metadata.{HoodieMetadataPayload, HoodieTableMetadata}
+import org.apache.hudi.metadata.{HoodieMetadataPayload, HoodieTableMetadata, HoodieTableMetadataUtil, MetadataPartitionType}
 import org.apache.spark.internal.Logging
 import org.apache.spark.sql.catalyst.InternalRow
 import org.apache.spark.sql.catalyst.expressions.{And, Expression, Literal}
@@ -195,15 +195,14 @@ case class HoodieFileIndex(spark: SparkSession,
    * @return list of pruned (data-skipped) candidate base-files' names
    */
   private def lookupCandidateFilesInMetadataTable(queryFilters: Seq[Expression]): Try[Option[Set[String]]] = Try {
-    val fs = metaClient.getFs
-    val metadataTablePath = HoodieTableMetadata.getMetadataTableBasePath(basePath)
-
-    if (!isDataSkippingEnabled || !fs.exists(new Path(metadataTablePath)) || queryFilters.isEmpty) {
+    if (!isDataSkippingEnabled || queryFilters.isEmpty || !HoodieTableMetadataUtil.getCompletedMetadataPartitions(metaClient.getTableConfig)
+      .contains(HoodieTableMetadataUtil.PARTITION_NAME_COLUMN_STATS)) {
       Option.empty
     } else {
-      val colStatsDF: DataFrame = readColumnStatsIndex(spark, metadataTablePath)
       val queryReferencedColumns = collectReferencedColumns(spark, queryFilters, schema)
 
+      val colStatsDF: DataFrame = readColumnStatsIndex(spark, basePath, metadataConfig, queryReferencedColumns)
+
       // Persist DF to avoid re-computing column statistics unraveling
       withPersistence(colStatsDF) {
         val transposedColStatsDF: DataFrame = transposeColumnStatsIndex(spark, colStatsDF, queryReferencedColumns, schema)
diff --git a/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/HoodieMergeOnReadRDD.scala b/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/HoodieMergeOnReadRDD.scala
index d40cf49434..c0c47cff42 100644
--- a/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/HoodieMergeOnReadRDD.scala
+++ b/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/HoodieMergeOnReadRDD.scala
@@ -23,7 +23,7 @@ import org.apache.avro.generic.{GenericRecord, GenericRecordBuilder, IndexedReco
 import org.apache.hadoop.conf.Configuration
 import org.apache.hadoop.fs.Path
 import org.apache.hadoop.mapred.JobConf
-import org.apache.hudi.HoodieConversionUtils.toScalaOption
+import org.apache.hudi.HoodieConversionUtils.{toJavaOption, toScalaOption}
 import org.apache.hudi.HoodieMergeOnReadRDD.{AvroDeserializerSupport, collectFieldOrdinals, getPartitionPath, projectAvro, projectAvroUnsafe, projectRowUnsafe, resolveAvroSchemaNullability}
 import org.apache.hudi.MergeOnReadSnapshotRelation.getFilePath
 import org.apache.hudi.common.config.HoodieMetadataConfig
@@ -324,17 +324,23 @@ private object HoodieMergeOnReadRDD {
     val fs = FSUtils.getFs(tablePath, hadoopConf)
 
     if (HoodieTableMetadata.isMetadataTable(tablePath)) {
-      val metadataConfig = HoodieMetadataConfig.newBuilder().enable(true).build()
+      val metadataConfig = tableState.metadataConfig
       val dataTableBasePath = getDataTableBasePathFromMetadataTable(tablePath)
       val metadataTable = new HoodieBackedTableMetadata(
         new HoodieLocalEngineContext(hadoopConf), metadataConfig,
         dataTableBasePath,
         hadoopConf.get(HoodieRealtimeConfig.SPILLABLE_MAP_BASE_PATH_PROP, HoodieRealtimeConfig.DEFAULT_SPILLABLE_MAP_BASE_PATH))
 
+      // We have to force full-scan for the MT log record reader, to make sure
+      // we can iterate over all of the partitions, since by default some of the partitions (Column Stats,
+      // Bloom Filter) are in "point-lookup" mode
+      val forceFullScan = true
+
       // NOTE: In case of Metadata Table partition path equates to partition name (since there's just one level
       //       of indirection among MT partitions)
       val relativePartitionPath = getRelativePartitionPath(new Path(tablePath), partitionPath)
-      metadataTable.getLogRecordScanner(logFiles.asJava, relativePartitionPath).getLeft
+      metadataTable.getLogRecordScanner(logFiles.asJava, relativePartitionPath, toJavaOption(Some(forceFullScan)))
+        .getLeft
     } else {
       val logRecordScannerBuilder = HoodieMergedLogRecordScanner.newBuilder()
         .withFileSystem(fs)
diff --git a/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/HoodieSparkSqlWriter.scala b/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/HoodieSparkSqlWriter.scala
index f1948d80c8..278454dbc3 100644
--- a/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/HoodieSparkSqlWriter.scala
+++ b/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/HoodieSparkSqlWriter.scala
@@ -23,6 +23,7 @@ import org.apache.hadoop.conf.Configuration
 import org.apache.hadoop.fs.{FileSystem, Path}
 import org.apache.hadoop.hive.conf.HiveConf
 import org.apache.hudi.DataSourceWriteOptions._
+import org.apache.hudi.HoodieConversionUtils.toProperties
 import org.apache.hudi.HoodieWriterUtils._
 import org.apache.hudi.avro.HoodieAvroUtils
 import org.apache.hudi.client.{HoodieWriteResult, SparkRDDWriteClient}
@@ -360,7 +361,7 @@ object HoodieSparkSqlWriter {
         None
       }
     } catch {
-      case _ => None
+      case _: Exception => None
     }
   }
 
@@ -568,12 +569,6 @@ object HoodieSparkSqlWriter {
     (syncHiveSuccess, common.util.Option.ofNullable(instantTime))
   }
 
-  def toProperties(params: Map[String, String]): TypedProperties = {
-    val props = new TypedProperties()
-    params.foreach(kv => props.setProperty(kv._1, kv._2))
-    props
-  }
-
   private def handleSaveModes(spark: SparkSession, mode: SaveMode, tablePath: Path, tableConfig: HoodieTableConfig, tableName: String,
                               operation: WriteOperationType, fs: FileSystem): Unit = {
     if (mode == SaveMode.Append && tableExists) {
diff --git a/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/HoodieWriterUtils.scala b/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/HoodieWriterUtils.scala
index 81199dbca9..d9fe010fe7 100644
--- a/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/HoodieWriterUtils.scala
+++ b/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/HoodieWriterUtils.scala
@@ -91,12 +91,6 @@ object HoodieWriterUtils {
     Map() ++ hoodieConfig.getProps.asScala ++ globalProps ++ DataSourceOptionsHelper.translateConfigurations(parameters)
   }
 
-  def toProperties(params: Map[String, String]): TypedProperties = {
-    val props = new TypedProperties()
-    params.foreach(kv => props.setProperty(kv._1, kv._2))
-    props
-  }
-
   /**
    * Get the partition columns to stored to hoodie.properties.
    * @param parameters
diff --git a/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/SparkHoodieTableFileIndex.scala b/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/SparkHoodieTableFileIndex.scala
index a06ffffe50..1305323bd1 100644
--- a/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/SparkHoodieTableFileIndex.scala
+++ b/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/SparkHoodieTableFileIndex.scala
@@ -308,7 +308,7 @@ object SparkHoodieTableFileIndex {
   }
 
   private def deduceQueryType(configProperties: TypedProperties): HoodieTableQueryType = {
-    configProperties.asScala(QUERY_TYPE.key) match {
+    configProperties.asScala.getOrElse(QUERY_TYPE.key, QUERY_TYPE.defaultValue) match {
       case QUERY_TYPE_SNAPSHOT_OPT_VAL => HoodieTableQueryType.SNAPSHOT
       case QUERY_TYPE_INCREMENTAL_OPT_VAL => HoodieTableQueryType.INCREMENTAL
       case QUERY_TYPE_READ_OPTIMIZED_OPT_VAL => HoodieTableQueryType.READ_OPTIMIZED
diff --git a/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/spark/sql/hudi/streaming/HoodieStreamSource.scala b/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/spark/sql/hudi/streaming/HoodieStreamSource.scala
index 4e46233c35..5a2b30fae1 100644
--- a/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/spark/sql/hudi/streaming/HoodieStreamSource.scala
+++ b/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/spark/sql/hudi/streaming/HoodieStreamSource.scala
@@ -154,6 +154,7 @@ class HoodieStreamSource(
     } else {
       // Consume the data between (startCommitTime, endCommitTime]
       val incParams = parameters ++ Map(
+        DataSourceReadOptions.QUERY_TYPE.key -> DataSourceReadOptions.QUERY_TYPE_INCREMENTAL_OPT_VAL,
         DataSourceReadOptions.BEGIN_INSTANTTIME.key -> startCommitTime(startOffset),
         DataSourceReadOptions.END_INSTANTTIME.key -> endOffset.commitTime
       )
diff --git a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/TestHoodieFileIndex.scala b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/TestHoodieFileIndex.scala
index a11c2f73f9..3090500941 100644
--- a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/TestHoodieFileIndex.scala
+++ b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/TestHoodieFileIndex.scala
@@ -372,7 +372,11 @@ class TestHoodieFileIndex extends HoodieClientTestBase {
     val props = Map[String, String](
       "path" -> basePath,
       QUERY_TYPE.key -> QUERY_TYPE_SNAPSHOT_OPT_VAL,
-      DataSourceReadOptions.ENABLE_DATA_SKIPPING.key -> "true"
+      DataSourceReadOptions.ENABLE_DATA_SKIPPING.key -> "true",
+      // NOTE: Metadata Table has to be enabled on the read path as well
+      HoodieMetadataConfig.ENABLE.key -> "true",
+      HoodieMetadataConfig.ENABLE_METADATA_INDEX_COLUMN_STATS.key -> "true",
+      HoodieMetadataConfig.ENABLE_METADATA_INDEX_COLUMN_STATS_FOR_ALL_COLUMNS.key -> "true"
     )
 
     val fileIndex = HoodieFileIndex(spark, metaClient, Option.empty, props, NoopCache)
diff --git a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/functional/TestColumnStatsIndex.scala b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/functional/TestColumnStatsIndex.scala
index 5d10a1d1f4..8f827f13d2 100644
--- a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/functional/TestColumnStatsIndex.scala
+++ b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/functional/TestColumnStatsIndex.scala
@@ -22,11 +22,11 @@ import org.apache.hadoop.conf.Configuration
 import org.apache.hadoop.fs.{FileSystem, LocatedFileStatus, Path}
 import org.apache.hudi.ColumnStatsIndexSupport.composeIndexSchema
 import org.apache.hudi.DataSourceWriteOptions.{PRECOMBINE_FIELD, RECORDKEY_FIELD}
+import org.apache.hudi.HoodieConversionUtils.toProperties
 import org.apache.hudi.common.config.HoodieMetadataConfig
 import org.apache.hudi.common.table.{HoodieTableConfig, HoodieTableMetaClient}
 import org.apache.hudi.common.util.ParquetUtils
 import org.apache.hudi.config.{HoodieStorageConfig, HoodieWriteConfig}
-import org.apache.hudi.metadata.HoodieTableMetadata
 import org.apache.hudi.testutils.HoodieClientTestBase
 import org.apache.hudi.{ColumnStatsIndexSupport, DataSourceWriteOptions}
 import org.apache.spark.sql._
@@ -34,6 +34,8 @@ import org.apache.spark.sql.functions.typedLit
 import org.apache.spark.sql.types._
 import org.junit.jupiter.api.Assertions.{assertEquals, assertNotNull, assertTrue}
 import org.junit.jupiter.api._
+import org.junit.jupiter.params.ParameterizedTest
+import org.junit.jupiter.params.provider.ValueSource
 
 import java.math.BigInteger
 import java.sql.{Date, Timestamp}
@@ -69,8 +71,9 @@ class TestColumnStatsIndex extends HoodieClientTestBase with ColumnStatsIndexSup
     cleanupSparkContexts()
   }
 
-  @Test
-  def testMetadataColumnStatsIndex(): Unit = {
+  @ParameterizedTest
+  @ValueSource(booleans = Array(true, false))
+  def testMetadataColumnStatsIndex(forceFullLogScan: Boolean): Unit = {
     val opts = Map(
       "hoodie.insert.shuffle.parallelism" -> "4",
       "hoodie.upsert.shuffle.parallelism" -> "4",
@@ -80,6 +83,7 @@ class TestColumnStatsIndex extends HoodieClientTestBase with ColumnStatsIndexSup
       HoodieMetadataConfig.ENABLE.key -> "true",
       HoodieMetadataConfig.ENABLE_METADATA_INDEX_COLUMN_STATS.key -> "true",
       HoodieMetadataConfig.ENABLE_METADATA_INDEX_COLUMN_STATS_FOR_ALL_COLUMNS.key -> "true",
+      HoodieMetadataConfig.ENABLE_FULL_SCAN_LOG_FILES.key -> forceFullLogScan.toString,
       HoodieTableConfig.POPULATE_META_FIELDS.key -> "true"
     )
 
@@ -104,9 +108,11 @@ class TestColumnStatsIndex extends HoodieClientTestBase with ColumnStatsIndexSup
 
     metaClient = HoodieTableMetaClient.reload(metaClient)
 
-    val metadataTablePath = HoodieTableMetadata.getMetadataTableBasePath(basePath)
+    val metadataConfig = HoodieMetadataConfig.newBuilder()
+      .fromProperties(toProperties(opts))
+      .build()
 
-    val colStatsDF = readColumnStatsIndex(spark, metadataTablePath)
+    val colStatsDF = readColumnStatsIndex(spark, basePath, metadataConfig, sourceTableSchema.fieldNames)
     val transposedColStatsDF = transposeColumnStatsIndex(spark, colStatsDF, sourceTableSchema.fieldNames, sourceTableSchema)
 
     val expectedColStatsSchema = composeIndexSchema(sourceTableSchema.fieldNames, sourceTableSchema)
@@ -146,7 +152,7 @@ class TestColumnStatsIndex extends HoodieClientTestBase with ColumnStatsIndexSup
 
     metaClient = HoodieTableMetaClient.reload(metaClient)
 
-    val updatedColStatsDF = readColumnStatsIndex(spark, metadataTablePath)
+    val updatedColStatsDF = readColumnStatsIndex(spark, basePath, metadataConfig, sourceTableSchema.fieldNames)
     val transposedUpdatedColStatsDF = transposeColumnStatsIndex(spark, updatedColStatsDF, sourceTableSchema.fieldNames, sourceTableSchema)
 
     val expectedColStatsIndexUpdatedDF =
diff --git a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/functional/TestLayoutOptimization.scala b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/functional/TestLayoutOptimization.scala
index 188ba6745c..17715627fe 100644
--- a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/functional/TestLayoutOptimization.scala
+++ b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/functional/TestLayoutOptimization.scala
@@ -51,17 +51,20 @@ class TestLayoutOptimization extends HoodieClientTestBase {
       .add("c7", BinaryType)
       .add("c8", ByteType)
 
+  val metadataOpts = Map(
+    HoodieMetadataConfig.ENABLE.key -> "true",
+    HoodieMetadataConfig.ENABLE_METADATA_INDEX_COLUMN_STATS.key -> "true"
+  )
+
   val commonOpts = Map(
     "hoodie.insert.shuffle.parallelism" -> "4",
     "hoodie.upsert.shuffle.parallelism" -> "4",
     "hoodie.bulkinsert.shuffle.parallelism" -> "4",
-    HoodieMetadataConfig.ENABLE.key -> "true",
-    HoodieMetadataConfig.ENABLE_METADATA_INDEX_COLUMN_STATS.key -> "true",
     DataSourceWriteOptions.RECORDKEY_FIELD.key() -> "_row_key",
     DataSourceWriteOptions.PARTITIONPATH_FIELD.key() -> "partition",
     DataSourceWriteOptions.PRECOMBINE_FIELD.key() -> "timestamp",
     HoodieWriteConfig.TBL_NAME.key -> "hoodie_test"
-  )
+  ) ++ metadataOpts
 
   @BeforeEach
   override def setUp() {
@@ -134,6 +137,7 @@ class TestLayoutOptimization extends HoodieClientTestBase {
     val readDfSkip =
       spark.read
         .option(DataSourceReadOptions.ENABLE_DATA_SKIPPING.key(), "true")
+        .options(metadataOpts)
         .format("hudi")
         .load(basePath)
 
diff --git a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/functional/TestMetadataTableWithSparkDataSource.scala b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/functional/TestMetadataTableWithSparkDataSource.scala
index ead6358f23..d648471351 100644
--- a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/functional/TestMetadataTableWithSparkDataSource.scala
+++ b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/functional/TestMetadataTableWithSparkDataSource.scala
@@ -49,19 +49,21 @@ class TestMetadataTableWithSparkDataSource extends SparkClientFunctionalTestHarn
   def testReadability(): Unit = {
     val dataGen = new HoodieTestDataGenerator()
 
-    val opts: Map[String, String] = commonOpts ++ Map(
+    val metadataOpts: Map[String, String] = Map(
       HoodieMetadataConfig.ENABLE.key -> "true",
       HoodieMetadataConfig.ENABLE_METADATA_INDEX_COLUMN_STATS.key -> "true",
-      HoodieMetadataConfig.ENABLE_METADATA_INDEX_COLUMN_STATS_FOR_ALL_COLUMNS.key -> "true",
-      HoodieMetadataConfig.COMPACT_NUM_DELTA_COMMITS.key -> "1"
+      HoodieMetadataConfig.ENABLE_METADATA_INDEX_COLUMN_STATS_FOR_ALL_COLUMNS.key -> "true"
     )
 
+    val combinedOpts: Map[String, String] = commonOpts ++ metadataOpts ++
+      Map(HoodieMetadataConfig.COMPACT_NUM_DELTA_COMMITS.key -> "1")
+
     // Insert records
     val newRecords = dataGen.generateInserts("001", 100)
     val newRecordsDF = parseRecords(recordsToStrings(newRecords).asScala)
 
     newRecordsDF.write.format(hudi)
-      .options(opts)
+      .options(combinedOpts)
       .option(DataSourceWriteOptions.OPERATION.key, DataSourceWriteOptions.INSERT_OPERATION_OPT_VAL)
       .mode(SaveMode.Append)
       .save(basePath)
@@ -71,13 +73,13 @@ class TestMetadataTableWithSparkDataSource extends SparkClientFunctionalTestHarn
     val updatedRecordsDF = parseRecords(recordsToStrings(updatedRecords).asScala)
 
     updatedRecordsDF.write.format(hudi)
-      .options(opts)
+      .options(combinedOpts)
       .option(DataSourceWriteOptions.OPERATION.key, DataSourceWriteOptions.UPSERT_OPERATION_OPT_VAL)
       .mode(SaveMode.Append)
       .save(basePath)
 
     // Files partition of MT
-    val filesPartitionDF = spark.read.format(hudi).load(s"$basePath/.hoodie/metadata/files")
+    val filesPartitionDF = spark.read.options(metadataOpts).format(hudi).load(s"$basePath/.hoodie/metadata/files")
 
     // Smoke test
     filesPartitionDF.show()
@@ -95,7 +97,7 @@ class TestMetadataTableWithSparkDataSource extends SparkClientFunctionalTestHarn
     assertEquals(expectedKeys, keys)
 
     // Column Stats Index partition of MT
-    val colStatsDF = spark.read.format(hudi).load(s"$basePath/.hoodie/metadata/column_stats")
+    val colStatsDF = spark.read.options(metadataOpts).format(hudi).load(s"$basePath/.hoodie/metadata/column_stats")
 
     // Smoke test
     colStatsDF.show()