You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hudi.apache.org by ak...@apache.org on 2023/01/19 17:10:23 UTC

[hudi] branch master updated: [HUDI-4911][HUDI-3301] Fixing `HoodieMetadataLogRecordReader` to avoid flushing cache for every lookup (#6782)

This is an automated email from the ASF dual-hosted git repository.

akudinkin pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/hudi.git


The following commit(s) were added to refs/heads/master by this push:
     new f8028a400eb [HUDI-4911][HUDI-3301] Fixing `HoodieMetadataLogRecordReader` to avoid flushing cache for every lookup (#6782)
f8028a400eb is described below

commit f8028a400ebe7a64d7745fae90e769306a8a07a7
Author: Alexey Kudinkin <al...@gmail.com>
AuthorDate: Thu Jan 19 09:10:13 2023 -0800

    [HUDI-4911][HUDI-3301] Fixing `HoodieMetadataLogRecordReader` to avoid flushing cache for every lookup (#6782)
    
    Currently, HoodieMetadataLogRecordReader is flushing cache for every lookup it's doing, which led to multiple occasions of poor performance as MT had to be re-scanned over and over again, even though nothing really changed.
    
    This PR rectifies that, additionally:
    
    Making sure LogRecordScanner is not extended for MT (there's no reason for that, instead its configuration is simply expanded to accommodate for MT use-case)
    Refining and cleaning up the API provided HoodieMetadataLogRecordReader
    Avoiding unnecessary locking
---
 .../cli/commands/TestHoodieLogFileCommand.java     |   3 +-
 .../hudi/cli/integ/ITTestRepairsCommand.java       |   5 +-
 .../common/table/log/HoodieFileSliceReader.java    |   2 +-
 .../java/org/apache/hudi/io/HoodieMergeHandle.java |   6 +-
 .../apache/hudi/io/HoodieSortedMergeHandle.java    |   3 +-
 .../HoodieLogCompactionPlanGenerator.java          |   3 +-
 .../functional/TestHoodieBackedMetadata.java       |  15 +-
 .../functional/TestHoodieBackedTableMetadata.java  |  15 +-
 .../TestHoodieClientOnMergeOnReadStorage.java      |   4 +-
 .../hudi/testutils/HoodieClientTestHarness.java    |   2 +-
 .../org/apache/hudi/BaseHoodieTableFileIndex.java  |   3 +-
 .../table/log/AbstractHoodieLogRecordReader.java   | 204 +++++++------
 .../table/log/HoodieMergedLogRecordScanner.java    | 206 ++++++++++----
 .../table/log/HoodieUnMergedLogRecordScanner.java  |  13 +-
 .../apache/hudi/common/util/CollectionUtils.java   |   4 +-
 .../util/collection/ExternalSpillableMap.java      |   2 +
 .../hudi/metadata/HoodieBackedTableMetadata.java   |  75 +++--
 .../metadata/HoodieMetadataLogRecordReader.java    | 238 ++++++++++++++++
 .../HoodieMetadataMergedLogRecordReader.java       | 254 -----------------
 .../common/functional/TestHoodieLogFormat.java     | 315 ++++++++++++++++++---
 .../apache/hudi/common/model/TestHoodieRecord.java |   3 +-
 .../hudi/common/testutils/SchemaTestUtil.java      |  25 +-
 .../common/util/collection/TestBitCaskDiskMap.java |  26 +-
 .../util/collection/TestExternalSpillableMap.java  |  27 +-
 .../util/collection/TestRocksDbBasedMap.java       |   3 +-
 .../common/util/collection/TestRocksDbDiskMap.java |  14 +-
 .../src/main/scala/org/apache/hudi/Iterators.scala |  55 ++--
 .../sql/hudi/procedure/TestRepairsProcedure.scala  |   5 +-
 28 files changed, 950 insertions(+), 580 deletions(-)

diff --git a/hudi-cli/src/test/java/org/apache/hudi/cli/commands/TestHoodieLogFileCommand.java b/hudi-cli/src/test/java/org/apache/hudi/cli/commands/TestHoodieLogFileCommand.java
index 21e6218dbe2..aff12422f6a 100644
--- a/hudi-cli/src/test/java/org/apache/hudi/cli/commands/TestHoodieLogFileCommand.java
+++ b/hudi-cli/src/test/java/org/apache/hudi/cli/commands/TestHoodieLogFileCommand.java
@@ -194,7 +194,8 @@ public class TestHoodieLogFileCommand extends CLIFunctionalTestHarness {
               .withFileExtension(HoodieLogFile.DELTA_EXTENSION)
               .withFileId("test-log-fileid1").overBaseCommit(INSTANT_TIME).withFs(fs).withSizeThreshold(500).build();
 
-      List<HoodieRecord> records1 = SchemaTestUtil.generateHoodieTestRecords(0, 100).stream().map(HoodieAvroIndexedRecord::new).collect(Collectors.toList());
+      SchemaTestUtil testUtil = new SchemaTestUtil();
+      List<HoodieRecord> records1 = testUtil.generateHoodieTestRecords(0, 100).stream().map(HoodieAvroIndexedRecord::new).collect(Collectors.toList());
       Map<HoodieLogBlock.HeaderMetadataType, String> header = new HashMap<>();
       header.put(HoodieLogBlock.HeaderMetadataType.INSTANT_TIME, INSTANT_TIME);
       header.put(HoodieLogBlock.HeaderMetadataType.SCHEMA, schema.toString());
diff --git a/hudi-cli/src/test/java/org/apache/hudi/cli/integ/ITTestRepairsCommand.java b/hudi-cli/src/test/java/org/apache/hudi/cli/integ/ITTestRepairsCommand.java
index 69db47136e9..a95ed9ff778 100644
--- a/hudi-cli/src/test/java/org/apache/hudi/cli/integ/ITTestRepairsCommand.java
+++ b/hudi-cli/src/test/java/org/apache/hudi/cli/integ/ITTestRepairsCommand.java
@@ -86,8 +86,9 @@ public class ITTestRepairsCommand extends HoodieCLIIntegrationTestBase {
     Schema schema = HoodieAvroUtils.addMetadataFields(SchemaTestUtil.getSimpleSchema());
 
     // generate 200 records
-    HoodieRecord[] hoodieRecords1 = SchemaTestUtil.generateHoodieTestRecords(0, 100, schema).toArray(new HoodieRecord[100]);
-    HoodieRecord[] hoodieRecords2 = SchemaTestUtil.generateHoodieTestRecords(100, 100, schema).toArray(new HoodieRecord[100]);
+    SchemaTestUtil testUtil = new SchemaTestUtil();
+    HoodieRecord[] hoodieRecords1 = testUtil.generateHoodieTestRecords(0, 100, schema).toArray(new HoodieRecord[100]);
+    HoodieRecord[] hoodieRecords2 = testUtil.generateHoodieTestRecords(100, 100, schema).toArray(new HoodieRecord[100]);
 
     // generate duplicates
     HoodieRecord[] dupRecords = Arrays.copyOf(hoodieRecords1, 10);
diff --git a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/common/table/log/HoodieFileSliceReader.java b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/common/table/log/HoodieFileSliceReader.java
index 35ca3d6d5ad..eb4e18366d7 100644
--- a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/common/table/log/HoodieFileSliceReader.java
+++ b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/common/table/log/HoodieFileSliceReader.java
@@ -43,7 +43,7 @@ public class HoodieFileSliceReader<T> implements Iterator<HoodieRecord<T>> {
       Iterator<HoodieRecord> baseIterator = baseFileReader.get().getRecordIterator(schema);
       while (baseIterator.hasNext()) {
         scanner.processNextRecord(baseIterator.next().wrapIntoHoodieRecordPayloadWithParams(schema, props,
-            simpleKeyGenFieldsOpt, scanner.isWithOperationField(), scanner.getPartitionName(), false));
+            simpleKeyGenFieldsOpt, scanner.isWithOperationField(), scanner.getPartitionNameOverride(), false));
       }
     }
     return new HoodieFileSliceReader(scanner.iterator());
diff --git a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/io/HoodieMergeHandle.java b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/io/HoodieMergeHandle.java
index b6364aede14..0460f88101c 100644
--- a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/io/HoodieMergeHandle.java
+++ b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/io/HoodieMergeHandle.java
@@ -410,10 +410,10 @@ public class HoodieMergeHandle<T, I, K, O> extends HoodieWriteHandle<T, I, K, O>
 
       if (keyToNewRecords instanceof ExternalSpillableMap) {
         ((ExternalSpillableMap) keyToNewRecords).close();
-      } else {
-        keyToNewRecords.clear();
       }
-      writtenRecordKeys.clear();
+
+      keyToNewRecords = null;
+      writtenRecordKeys = null;
 
       if (fileWriter != null) {
         fileWriter.close();
diff --git a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/io/HoodieSortedMergeHandle.java b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/io/HoodieSortedMergeHandle.java
index 18fe6a344db..3d3a7308bb3 100644
--- a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/io/HoodieSortedMergeHandle.java
+++ b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/io/HoodieSortedMergeHandle.java
@@ -117,13 +117,14 @@ public class HoodieSortedMergeHandle<T, I, K, O> extends HoodieMergeHandle<T, I,
             writeRecord(hoodieRecord, Option.of(hoodieRecord), writeSchema, config.getProps());
           }
           insertRecordsWritten++;
+          writtenRecordKeys.add(hoodieRecord.getRecordKey());
         }
       } catch (IOException e) {
         throw new HoodieUpsertException("Failed to close UpdateHandle", e);
       }
     }
+
     newRecordKeysSorted.clear();
-    keyToNewRecords.clear();
 
     return super.close();
   }
diff --git a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/compact/plan/generators/HoodieLogCompactionPlanGenerator.java b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/compact/plan/generators/HoodieLogCompactionPlanGenerator.java
index e2ed1a06ac9..6a5f160f6b0 100644
--- a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/compact/plan/generators/HoodieLogCompactionPlanGenerator.java
+++ b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/compact/plan/generators/HoodieLogCompactionPlanGenerator.java
@@ -29,7 +29,6 @@ import org.apache.hudi.common.model.HoodieRecordPayload;
 import org.apache.hudi.common.table.HoodieTableMetaClient;
 import org.apache.hudi.common.table.log.HoodieUnMergedLogRecordScanner;
 import org.apache.hudi.common.util.CompactionUtils;
-import org.apache.hudi.common.util.Option;
 import org.apache.hudi.config.HoodieWriteConfig;
 import org.apache.hudi.table.HoodieTable;
 import org.apache.hudi.table.action.compact.LogCompactionExecutionHelper;
@@ -94,7 +93,7 @@ public class HoodieLogCompactionPlanGenerator<T extends HoodieRecordPayload, I,
         .withUseScanV2(true)
         .withRecordMerger(writeConfig.getRecordMerger())
         .build();
-    scanner.scanInternal(Option.empty(), true);
+    scanner.scan(true);
     int totalBlocks = scanner.getCurrentInstantLogBlocks().size();
     LOG.info("Total blocks seen are " + totalBlocks);
 
diff --git a/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/client/functional/TestHoodieBackedMetadata.java b/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/client/functional/TestHoodieBackedMetadata.java
index de27d68e3f5..84e6d342883 100644
--- a/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/client/functional/TestHoodieBackedMetadata.java
+++ b/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/client/functional/TestHoodieBackedMetadata.java
@@ -87,7 +87,7 @@ import org.apache.hudi.index.HoodieIndex;
 import org.apache.hudi.io.storage.HoodieAvroHFileReader;
 import org.apache.hudi.metadata.FileSystemBackedTableMetadata;
 import org.apache.hudi.metadata.HoodieBackedTableMetadataWriter;
-import org.apache.hudi.metadata.HoodieMetadataMergedLogRecordReader;
+import org.apache.hudi.metadata.HoodieMetadataLogRecordReader;
 import org.apache.hudi.metadata.HoodieMetadataMetrics;
 import org.apache.hudi.metadata.HoodieMetadataPayload;
 import org.apache.hudi.metadata.HoodieTableMetadata;
@@ -1099,7 +1099,7 @@ public class TestHoodieBackedMetadata extends TestHoodieMetadataBase {
     if (enableMetaFields) {
       schema = HoodieAvroUtils.addMetadataFields(schema);
     }
-    HoodieMetadataMergedLogRecordReader logRecordReader = HoodieMetadataMergedLogRecordReader.newBuilder()
+    HoodieMetadataLogRecordReader logRecordReader = HoodieMetadataLogRecordReader.newBuilder()
         .withFileSystem(metadataMetaClient.getFs())
         .withBasePath(metadataMetaClient.getBasePath())
         .withLogFilePaths(logFilePaths)
@@ -1112,14 +1112,9 @@ public class TestHoodieBackedMetadata extends TestHoodieMetadataBase {
         .withDiskMapType(ExternalSpillableMap.DiskMapType.BITCASK)
         .build();
 
-    assertDoesNotThrow(() -> {
-      logRecordReader.scan();
-    }, "Metadata log records materialization failed");
-
-    for (Map.Entry<String, HoodieRecord> entry : logRecordReader.getRecords().entrySet()) {
-      assertFalse(entry.getKey().isEmpty());
-      assertFalse(entry.getValue().getRecordKey().isEmpty());
-      assertEquals(entry.getKey(), entry.getValue().getRecordKey());
+    for (HoodieRecord<? extends HoodieRecordPayload> entry : logRecordReader.getRecords()) {
+      assertFalse(entry.getRecordKey().isEmpty());
+      assertEquals(entry.getKey().getRecordKey(), entry.getRecordKey());
     }
   }
 
diff --git a/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/client/functional/TestHoodieBackedTableMetadata.java b/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/client/functional/TestHoodieBackedTableMetadata.java
index 3675f7e8f71..b1348509dfe 100644
--- a/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/client/functional/TestHoodieBackedTableMetadata.java
+++ b/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/client/functional/TestHoodieBackedTableMetadata.java
@@ -39,7 +39,7 @@ import org.apache.hudi.common.util.collection.ExternalSpillableMap;
 import org.apache.hudi.config.HoodieWriteConfig;
 import org.apache.hudi.io.storage.HoodieAvroHFileReader;
 import org.apache.hudi.metadata.HoodieBackedTableMetadata;
-import org.apache.hudi.metadata.HoodieMetadataMergedLogRecordReader;
+import org.apache.hudi.metadata.HoodieMetadataLogRecordReader;
 import org.apache.hudi.metadata.HoodieMetadataPayload;
 import org.apache.hudi.metadata.HoodieTableMetadataKeyGenerator;
 import org.apache.hudi.metadata.MetadataPartitionType;
@@ -379,7 +379,7 @@ public class TestHoodieBackedTableMetadata extends TestHoodieMetadataBase {
    */
   private void verifyMetadataMergedRecords(HoodieTableMetaClient metadataMetaClient, List<String> logFilePaths, String latestCommitTimestamp) {
     Schema schema = HoodieAvroUtils.addMetadataFields(HoodieMetadataRecord.getClassSchema());
-    HoodieMetadataMergedLogRecordReader logRecordReader = HoodieMetadataMergedLogRecordReader.newBuilder()
+    HoodieMetadataLogRecordReader logRecordReader = HoodieMetadataLogRecordReader.newBuilder()
         .withFileSystem(metadataMetaClient.getFs())
         .withBasePath(metadataMetaClient.getBasePath())
         .withLogFilePaths(logFilePaths)
@@ -392,14 +392,9 @@ public class TestHoodieBackedTableMetadata extends TestHoodieMetadataBase {
         .withDiskMapType(ExternalSpillableMap.DiskMapType.BITCASK)
         .build();
 
-    assertDoesNotThrow(() -> {
-      logRecordReader.scan();
-    }, "Metadata log records materialization failed");
-
-    for (Map.Entry<String, HoodieRecord> entry : logRecordReader.getRecords().entrySet()) {
-      assertFalse(entry.getKey().isEmpty());
-      assertFalse(entry.getValue().getRecordKey().isEmpty());
-      assertEquals(entry.getKey(), entry.getValue().getRecordKey());
+    for (HoodieRecord<?> entry : logRecordReader.getRecords()) {
+      assertFalse(entry.getRecordKey().isEmpty());
+      assertEquals(entry.getKey().getRecordKey(), entry.getRecordKey());
     }
   }
 
diff --git a/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/client/functional/TestHoodieClientOnMergeOnReadStorage.java b/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/client/functional/TestHoodieClientOnMergeOnReadStorage.java
index 32a7fbd7100..c623e13bddb 100644
--- a/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/client/functional/TestHoodieClientOnMergeOnReadStorage.java
+++ b/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/client/functional/TestHoodieClientOnMergeOnReadStorage.java
@@ -447,7 +447,7 @@ public class TestHoodieClientOnMergeOnReadStorage extends HoodieClientTestBase {
             .withUseScanV2(true)
             .withRecordMerger(HoodieRecordUtils.loadRecordMerger(HoodieAvroRecordMerger.class.getName()))
             .build();
-        scanner.scanInternal(Option.empty(), true);
+        scanner.scan(true);
         List<String> prevInstants = scanner.getValidBlockInstants();
         HoodieUnMergedLogRecordScanner scanner2 = HoodieUnMergedLogRecordScanner.newBuilder()
             .withFileSystem(metaClient.getFs())
@@ -461,7 +461,7 @@ public class TestHoodieClientOnMergeOnReadStorage extends HoodieClientTestBase {
             .withUseScanV2(true)
             .withRecordMerger(HoodieRecordUtils.loadRecordMerger(HoodieAvroRecordMerger.class.getName()))
             .build();
-        scanner2.scanInternal(Option.empty(), true);
+        scanner2.scan(true);
         List<String> currentInstants = scanner2.getValidBlockInstants();
         assertEquals(prevInstants, currentInstants);
       });
diff --git a/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/testutils/HoodieClientTestHarness.java b/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/testutils/HoodieClientTestHarness.java
index 9d305777adf..7cebf894a2a 100644
--- a/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/testutils/HoodieClientTestHarness.java
+++ b/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/testutils/HoodieClientTestHarness.java
@@ -228,11 +228,11 @@ public abstract class HoodieClientTestHarness extends HoodieCommonTestHarness {
       LOG.info("Clearing sql context cache of spark-session used in previous test-case");
       sqlContext.clearCache();
       sqlContext = null;
+      sparkSession = null;
     }
 
     if (jsc != null) {
       LOG.info("Closing spark context used in previous test-case");
-      jsc.close();
       jsc.stop();
       jsc = null;
     }
diff --git a/hudi-common/src/main/java/org/apache/hudi/BaseHoodieTableFileIndex.java b/hudi-common/src/main/java/org/apache/hudi/BaseHoodieTableFileIndex.java
index cef86109ff8..c00bf7ea997 100644
--- a/hudi-common/src/main/java/org/apache/hudi/BaseHoodieTableFileIndex.java
+++ b/hudi-common/src/main/java/org/apache/hudi/BaseHoodieTableFileIndex.java
@@ -51,6 +51,7 @@ import java.util.Arrays;
 import java.util.Collection;
 import java.util.Collections;
 import java.util.HashMap;
+import java.util.HashSet;
 import java.util.List;
 import java.util.Map;
 import java.util.Objects;
@@ -342,7 +343,7 @@ public abstract class BaseHoodieTableFileIndex implements AutoCloseable {
             .collect(Collectors.toMap(Pair::getKey, p -> p.getRight().get()));
 
     Set<Path> missingPartitionPaths =
-        CollectionUtils.diff(partitionPaths, cachedPartitionPaths.keySet());
+        CollectionUtils.diffSet(new HashSet<>(partitionPaths), cachedPartitionPaths.keySet());
 
     // NOTE: We're constructing a mapping of absolute form of the partition-path into
     //       its relative one, such that we don't need to reconstruct these again later on
diff --git a/hudi-common/src/main/java/org/apache/hudi/common/table/log/AbstractHoodieLogRecordReader.java b/hudi-common/src/main/java/org/apache/hudi/common/table/log/AbstractHoodieLogRecordReader.java
index ca53802a3af..83172ecb7ae 100644
--- a/hudi-common/src/main/java/org/apache/hudi/common/table/log/AbstractHoodieLogRecordReader.java
+++ b/hudi-common/src/main/java/org/apache/hudi/common/table/log/AbstractHoodieLogRecordReader.java
@@ -28,19 +28,15 @@ import org.apache.hudi.common.model.HoodieRecord.HoodieRecordType;
 import org.apache.hudi.common.model.HoodieRecordMerger;
 import org.apache.hudi.common.table.HoodieTableConfig;
 import org.apache.hudi.common.table.HoodieTableMetaClient;
-import org.apache.hudi.common.table.log.block.HoodieAvroDataBlock;
 import org.apache.hudi.common.table.log.block.HoodieCommandBlock;
 import org.apache.hudi.common.table.log.block.HoodieDataBlock;
 import org.apache.hudi.common.table.log.block.HoodieDeleteBlock;
-import org.apache.hudi.common.table.log.block.HoodieHFileDataBlock;
 import org.apache.hudi.common.table.log.block.HoodieLogBlock;
-import org.apache.hudi.common.table.log.block.HoodieParquetDataBlock;
 import org.apache.hudi.common.table.timeline.HoodieTimeline;
 import org.apache.hudi.common.util.ClosableIterator;
 import org.apache.hudi.common.util.ClosableIteratorWithSchema;
 import org.apache.hudi.common.util.InternalSchemaCache;
 import org.apache.hudi.common.util.Option;
-import org.apache.hudi.common.util.ValidationUtils;
 import org.apache.hudi.common.util.collection.CloseableMappingIterator;
 import org.apache.hudi.common.util.collection.Pair;
 import org.apache.hudi.exception.HoodieException;
@@ -77,6 +73,7 @@ import static org.apache.hudi.common.table.log.block.HoodieLogBlock.HeaderMetada
 import static org.apache.hudi.common.table.log.block.HoodieLogBlock.HeaderMetadataType.TARGET_INSTANT_TIME;
 import static org.apache.hudi.common.table.log.block.HoodieLogBlock.HoodieLogBlockType.COMMAND_BLOCK;
 import static org.apache.hudi.common.table.log.block.HoodieLogBlock.HoodieLogBlockType.CORRUPT_BLOCK;
+import static org.apache.hudi.common.util.ValidationUtils.checkState;
 
 /**
  * Implements logic to scan log blocks and expose valid and deleted log records to subclass implementation. Subclass is
@@ -102,13 +99,16 @@ public abstract class AbstractHoodieLogRecordReader {
   protected final HoodieTableMetaClient hoodieTableMetaClient;
   // Merge strategy to use when combining records from log
   private final String payloadClassFQN;
-  // preCombine field
+  // Record's key/partition-path fields
+  private final String recordKeyField;
+  private final Option<String> partitionPathFieldOpt;
+  // Partition name override
+  private final Option<String> partitionNameOverrideOpt;
+  // Pre-combining field
   protected final String preCombineField;
   // Stateless component for merging records
   protected final HoodieRecordMerger recordMerger;
   private final TypedProperties payloadProps;
-  // simple key gen fields
-  private Option<Pair<String, String>> simpleKeyGenFields = Option.empty();
   // Log File Paths
   protected final List<String> logFilePaths;
   // Read Lazily flag
@@ -127,9 +127,7 @@ public abstract class AbstractHoodieLogRecordReader {
   // Total log files read - for metrics
   private AtomicLong totalLogFiles = new AtomicLong(0);
   // Internal schema, used to support full schema evolution.
-  private InternalSchema internalSchema;
-  // Hoodie table path.
-  private final String path;
+  private final InternalSchema internalSchema;
   // Total log blocks read - for metrics
   private AtomicLong totalLogBlocks = new AtomicLong(0);
   // Total log records read - for metrics
@@ -142,35 +140,26 @@ public abstract class AbstractHoodieLogRecordReader {
   private Deque<HoodieLogBlock> currentInstantLogBlocks = new ArrayDeque<>();
   // Enables full scan of log records
   protected final boolean forceFullScan;
-  private int totalScannedLogFiles;
   // Progress
   private float progress = 0.0f;
-  // Partition name
-  private Option<String> partitionName;
   // Populate meta fields for the records
-  private boolean populateMetaFields = true;
+  private final boolean populateMetaFields;
   // Record type read from log block
   protected final HoodieRecordType recordType;
   // Collect all the block instants after scanning all the log files.
-  private List<String> validBlockInstants = new ArrayList<>();
+  private final List<String> validBlockInstants = new ArrayList<>();
   // Use scanV2 method.
-  private boolean useScanV2;
-
-  protected AbstractHoodieLogRecordReader(FileSystem fs, String basePath, List<String> logFilePaths,
-                                          Schema readerSchema,
-                                          String latestInstantTime, boolean readBlocksLazily, boolean reverseReader,
-                                          int bufferSize, Option<InstantRange> instantRange,
-                                          boolean withOperationField, HoodieRecordMerger recordMerger) {
-    this(fs, basePath, logFilePaths, readerSchema, latestInstantTime, readBlocksLazily, reverseReader, bufferSize,
-        instantRange, withOperationField, true, Option.empty(), InternalSchema.getEmptyInternalSchema(), false, recordMerger);
-  }
+  private final boolean useScanV2;
 
   protected AbstractHoodieLogRecordReader(FileSystem fs, String basePath, List<String> logFilePaths,
                                           Schema readerSchema, String latestInstantTime, boolean readBlocksLazily,
                                           boolean reverseReader, int bufferSize, Option<InstantRange> instantRange,
                                           boolean withOperationField, boolean forceFullScan,
-                                          Option<String> partitionName, InternalSchema internalSchema,
-                                          boolean useScanV2, HoodieRecordMerger recordMerger) {
+                                          Option<String> partitionNameOverride,
+                                          InternalSchema internalSchema,
+                                          Option<String> keyFieldOverride,
+                                          boolean useScanV2,
+                                          HoodieRecordMerger recordMerger) {
     this.readerSchema = readerSchema;
     this.latestInstantTime = latestInstantTime;
     this.hoodieTableMetaClient = HoodieTableMetaClient.builder().setConf(fs.getConf()).setBasePath(basePath).build();
@@ -195,44 +184,48 @@ public abstract class AbstractHoodieLogRecordReader {
     this.withOperationField = withOperationField;
     this.forceFullScan = forceFullScan;
     this.internalSchema = internalSchema == null ? InternalSchema.getEmptyInternalSchema() : internalSchema;
-    this.path = basePath;
     this.useScanV2 = useScanV2;
 
-    // Key fields when populate meta fields is disabled (that is, virtual keys enabled)
-    if (!tableConfig.populateMetaFields()) {
-      this.populateMetaFields = false;
-      this.simpleKeyGenFields = Option.of(
-          Pair.of(tableConfig.getRecordKeyFieldProp(), tableConfig.getPartitionFieldProp()));
-    }
-    this.partitionName = partitionName;
-    this.recordType = recordMerger.getRecordType();
-  }
+    if (keyFieldOverride.isPresent()) {
+      // NOTE: This branch specifically is leveraged handling Metadata Table
+      //       log-block merging sequence. Here we do
+      //         - Override the record-key field (which isn't configured t/h table-config)
+      //         - Override partition-path value w/ static "partition-name" (in MT all partitions
+      //         are static, like "files", "col_stats", etc)
+      checkState(partitionNameOverride.isPresent());
 
-  protected String getKeyField() {
-    if (this.populateMetaFields) {
-      return HoodieRecord.RECORD_KEY_METADATA_FIELD;
+      this.populateMetaFields = false;
+      this.recordKeyField = keyFieldOverride.get();
+      this.partitionPathFieldOpt = Option.empty();
+    } else if (tableConfig.populateMetaFields()) {
+      this.populateMetaFields = true;
+      this.recordKeyField = HoodieRecord.RECORD_KEY_METADATA_FIELD;
+      this.partitionPathFieldOpt = Option.of(HoodieRecord.PARTITION_PATH_METADATA_FIELD);
+    } else {
+      this.populateMetaFields = false;
+      this.recordKeyField = tableConfig.getRecordKeyFieldProp();
+      this.partitionPathFieldOpt = Option.of(tableConfig.getPartitionFieldProp());
     }
-    ValidationUtils.checkState(this.simpleKeyGenFields.isPresent());
-    return this.simpleKeyGenFields.get().getKey();
-  }
-
-  public synchronized void scan() {
-    scanInternal(Option.empty(), false);
-  }
 
-  public synchronized void scan(List<String> keys) {
-    scanInternal(Option.of(new KeySpec(keys, true)), false);
+    this.partitionNameOverrideOpt = partitionNameOverride;
+    this.recordType = recordMerger.getRecordType();
   }
 
-  public synchronized void scanInternal(Option<KeySpec> keySpecOpt, boolean skipProcessingBlocks) {
-    if (useScanV2) {
-      scanInternalV2(keySpecOpt, skipProcessingBlocks);
-    } else {
-      scanInternal(keySpecOpt);
+  /**
+   * @param keySpecOpt specifies target set of keys to be scanned
+   * @param skipProcessingBlocks controls, whether (delta) blocks have to actually be processed
+   */
+  protected final void scanInternal(Option<KeySpec> keySpecOpt, boolean skipProcessingBlocks) {
+    synchronized (this) {
+      if (useScanV2) {
+        scanInternalV2(keySpecOpt, skipProcessingBlocks);
+      } else {
+        scanInternalV1(keySpecOpt);
+      }
     }
   }
 
-  private synchronized void scanInternal(Option<KeySpec> keySpecOpt) {
+  private void scanInternalV1(Option<KeySpec> keySpecOpt) {
     currentInstantLogBlocks = new ArrayDeque<>();
     progress = 0.0f;
     totalLogFiles = new AtomicLong(0);
@@ -245,15 +238,10 @@ public abstract class AbstractHoodieLogRecordReader {
     HoodieTimeline completedInstantsTimeline = commitsTimeline.filterCompletedInstants();
     HoodieTimeline inflightInstantsTimeline = commitsTimeline.filterInflights();
     try {
-      // Get the key field based on populate meta fields config
-      // and the table type
-      final String keyField = getKeyField();
-
       // Iterate over the paths
-      boolean enableRecordLookups = !forceFullScan;
       logFormatReaderWrapper = new HoodieLogFormatReader(fs,
           logFilePaths.stream().map(logFile -> new HoodieLogFile(new Path(logFile))).collect(Collectors.toList()),
-          readerSchema, readBlocksLazily, reverseReader, bufferSize, enableRecordLookups, keyField, internalSchema);
+          readerSchema, readBlocksLazily, reverseReader, bufferSize, shouldLookupRecords(), recordKeyField, internalSchema);
 
       Set<HoodieLogFile> scannedLogFiles = new HashSet<>();
       while (logFormatReaderWrapper.hasNext()) {
@@ -398,7 +386,7 @@ public abstract class AbstractHoodieLogRecordReader {
     }
   }
 
-  private synchronized void scanInternalV2(Option<KeySpec> keySpecOption, boolean skipProcessingBlocks) {
+  private void scanInternalV2(Option<KeySpec> keySpecOption, boolean skipProcessingBlocks) {
     currentInstantLogBlocks = new ArrayDeque<>();
     progress = 0.0f;
     totalLogFiles = new AtomicLong(0);
@@ -411,16 +399,10 @@ public abstract class AbstractHoodieLogRecordReader {
     HoodieTimeline completedInstantsTimeline = commitsTimeline.filterCompletedInstants();
     HoodieTimeline inflightInstantsTimeline = commitsTimeline.filterInflights();
     try {
-
-      // Get the key field based on populate meta fields config
-      // and the table type
-      final String keyField = getKeyField();
-
-      boolean enableRecordLookups = !forceFullScan;
       // Iterate over the paths
       logFormatReaderWrapper = new HoodieLogFormatReader(fs,
           logFilePaths.stream().map(logFile -> new HoodieLogFile(new Path(logFile))).collect(Collectors.toList()),
-          readerSchema, readBlocksLazily, reverseReader, bufferSize, enableRecordLookups, keyField, internalSchema);
+          readerSchema, readBlocksLazily, reverseReader, bufferSize, shouldLookupRecords(), recordKeyField, internalSchema);
 
       /**
        * Scanning log blocks and placing the compacted blocks at the right place require two traversals.
@@ -638,25 +620,28 @@ public abstract class AbstractHoodieLogRecordReader {
    * handle it.
    */
   private void processDataBlock(HoodieDataBlock dataBlock, Option<KeySpec> keySpecOpt) throws Exception {
+    checkState(partitionNameOverrideOpt.isPresent() || partitionPathFieldOpt.isPresent(),
+        "Either partition-name override or partition-path field had to be present");
+
+    Option<Pair<String, String>> recordKeyPartitionPathFieldPair = populateMetaFields
+        ? Option.empty()
+        : Option.of(Pair.of(recordKeyField, partitionPathFieldOpt.orElse(null)));
+
     try (ClosableIteratorWithSchema<HoodieRecord> recordIterator = getRecordsIterator(dataBlock, keySpecOpt)) {
       while (recordIterator.hasNext()) {
         HoodieRecord completedRecord = recordIterator.next()
             .wrapIntoHoodieRecordPayloadWithParams(recordIterator.getSchema(),
                 hoodieTableMetaClient.getTableConfig().getProps(),
-                this.simpleKeyGenFields,
+                recordKeyPartitionPathFieldPair,
                 this.withOperationField,
-                this.partitionName,
-                getPopulateMetaFields());
+                this.partitionNameOverrideOpt,
+                populateMetaFields);
         processNextRecord(completedRecord);
         totalLogRecords.incrementAndGet();
       }
     }
   }
 
-  protected boolean getPopulateMetaFields() {
-    return this.populateMetaFields;
-  }
-
   /**
    * Process next record.
    *
@@ -682,13 +667,9 @@ public abstract class AbstractHoodieLogRecordReader {
       HoodieLogBlock lastBlock = logBlocks.pollLast();
       switch (lastBlock.getBlockType()) {
         case AVRO_DATA_BLOCK:
-          processDataBlock((HoodieAvroDataBlock) lastBlock, keySpecOpt);
-          break;
         case HFILE_DATA_BLOCK:
-          processDataBlock((HoodieHFileDataBlock) lastBlock, keySpecOpt);
-          break;
         case PARQUET_DATA_BLOCK:
-          processDataBlock((HoodieParquetDataBlock) lastBlock, keySpecOpt);
+          processDataBlock((HoodieDataBlock) lastBlock, keySpecOpt);
           break;
         case DELETE_BLOCK:
           Arrays.stream(((HoodieDeleteBlock) lastBlock).getRecordsToDelete()).forEach(this::processNextDeletedRecord);
@@ -704,6 +685,12 @@ public abstract class AbstractHoodieLogRecordReader {
     progress = (numLogFilesSeen - 1) / logFilePaths.size();
   }
 
+  private boolean shouldLookupRecords() {
+    // NOTE: Point-wise record lookups are only enabled when scanner is not in
+    //       a full-scan mode
+    return !forceFullScan;
+  }
+
   /**
    * Return progress of scanning as a float between 0.0 to 1.0.
    */
@@ -727,8 +714,8 @@ public abstract class AbstractHoodieLogRecordReader {
     return payloadClassFQN;
   }
 
-  public Option<String> getPartitionName() {
-    return partitionName;
+  public Option<String> getPartitionNameOverride() {
+    return partitionNameOverrideOpt;
   }
 
   public long getTotalRollbacks() {
@@ -750,13 +737,52 @@ public abstract class AbstractHoodieLogRecordReader {
   /**
    * Key specification with a list of column names.
    */
-  protected static class KeySpec {
-    private final List<String> keys;
-    private final boolean fullKey;
+  protected interface KeySpec {
+    List<String> getKeys();
+
+    boolean isFullKey();
 
-    public KeySpec(List<String> keys, boolean fullKey) {
+    static KeySpec fullKeySpec(List<String> keys) {
+      return new FullKeySpec(keys);
+    }
+
+    static KeySpec prefixKeySpec(List<String> keyPrefixes) {
+      return new PrefixKeySpec(keyPrefixes);
+    }
+  }
+
+  private static class FullKeySpec implements KeySpec {
+    private final List<String> keys;
+    private FullKeySpec(List<String> keys) {
       this.keys = keys;
-      this.fullKey = fullKey;
+    }
+
+    @Override
+    public List<String> getKeys() {
+      return keys;
+    }
+
+    @Override
+    public boolean isFullKey() {
+      return true;
+    }
+  }
+
+  private static class PrefixKeySpec implements KeySpec {
+    private final List<String> keysPrefixes;
+
+    private PrefixKeySpec(List<String> keysPrefixes) {
+      this.keysPrefixes = keysPrefixes;
+    }
+
+    @Override
+    public List<String> getKeys() {
+      return keysPrefixes;
+    }
+
+    @Override
+    public boolean isFullKey() {
+      return false;
     }
   }
 
@@ -774,7 +800,7 @@ public abstract class AbstractHoodieLogRecordReader {
     if (keySpecOpt.isPresent()) {
       KeySpec keySpec = keySpecOpt.get();
       blockRecordsIterator = (ClosableIterator) dataBlock
-          .getRecordIterator(keySpec.keys, keySpec.fullKey, recordType);
+          .getRecordIterator(keySpec.getKeys(), keySpec.isFullKey(), recordType);
     } else {
       blockRecordsIterator = (ClosableIterator) dataBlock.getRecordIterator(recordType);
     }
diff --git a/hudi-common/src/main/java/org/apache/hudi/common/table/log/HoodieMergedLogRecordScanner.java b/hudi-common/src/main/java/org/apache/hudi/common/table/log/HoodieMergedLogRecordScanner.java
index 3ff1432f640..c41d78b2808 100644
--- a/hudi-common/src/main/java/org/apache/hudi/common/table/log/HoodieMergedLogRecordScanner.java
+++ b/hudi-common/src/main/java/org/apache/hudi/common/table/log/HoodieMergedLogRecordScanner.java
@@ -22,8 +22,8 @@ import org.apache.hudi.common.config.HoodieCommonConfig;
 import org.apache.hudi.common.model.DeleteRecord;
 import org.apache.hudi.common.model.HoodieEmptyRecord;
 import org.apache.hudi.common.model.HoodieKey;
-import org.apache.hudi.common.model.HoodieRecordMerger;
 import org.apache.hudi.common.model.HoodieRecord;
+import org.apache.hudi.common.model.HoodieRecordMerger;
 import org.apache.hudi.common.model.HoodieRecord.HoodieRecordType;
 import org.apache.hudi.common.table.cdc.HoodieCDCUtils;
 import org.apache.hudi.common.util.CollectionUtils;
@@ -44,14 +44,19 @@ import org.apache.hadoop.fs.Path;
 import org.apache.log4j.LogManager;
 import org.apache.log4j.Logger;
 
+import javax.annotation.concurrent.NotThreadSafe;
+import java.io.Closeable;
 import java.io.IOException;
+import java.util.Collections;
+import java.util.HashSet;
 import java.util.Iterator;
 import java.util.List;
 import java.util.Map;
+import java.util.Objects;
+import java.util.Set;
 import java.util.stream.Collectors;
 
 import static org.apache.hudi.common.fs.FSUtils.getRelativePartitionPath;
-import static org.apache.hudi.common.util.ValidationUtils.checkState;
 
 /**
  * Scans through all the blocks in a list of HoodieLogFile and builds up a compacted/merged list of records which will
@@ -65,40 +70,43 @@ import static org.apache.hudi.common.util.ValidationUtils.checkState;
  * <p>
  * This results in two I/O passes over the log file.
  */
-
+@NotThreadSafe
 public class HoodieMergedLogRecordScanner extends AbstractHoodieLogRecordReader
-    implements Iterable<HoodieRecord> {
+    implements Iterable<HoodieRecord>, Closeable {
 
   private static final Logger LOG = LogManager.getLogger(HoodieMergedLogRecordScanner.class);
   // A timer for calculating elapsed time in millis
   public final HoodieTimer timer = new HoodieTimer();
-  // Final map of compacted/merged records
-  protected final ExternalSpillableMap<String, HoodieRecord> records;
+  // Map of compacted/merged records
+  private final ExternalSpillableMap<String, HoodieRecord> records;
+  // Set of already scanned prefixes allowing us to avoid scanning same prefixes again
+  private final Set<String> scannedPrefixes;
   // count of merged records in log
   private long numMergedRecordsInLog;
-  private long maxMemorySizeInBytes;
+  private final long maxMemorySizeInBytes;
   // Stores the total time taken to perform reading and merging of log blocks
   private long totalTimeTakenToReadAndMergeBlocks;
 
   @SuppressWarnings("unchecked")
-  protected HoodieMergedLogRecordScanner(FileSystem fs, String basePath, List<String> logFilePaths, Schema readerSchema,
-                                         String latestInstantTime, Long maxMemorySizeInBytes, boolean readBlocksLazily,
-                                         boolean reverseReader, int bufferSize, String spillableMapBasePath,
-                                         Option<InstantRange> instantRange,
-                                         ExternalSpillableMap.DiskMapType diskMapType,
-                                         boolean isBitCaskDiskMapCompressionEnabled,
-                                         boolean withOperationField, boolean forceFullScan,
-                                         Option<String> partitionName, InternalSchema internalSchema,
-                                         boolean useScanV2, HoodieRecordMerger recordMerger) {
+  private HoodieMergedLogRecordScanner(FileSystem fs, String basePath, List<String> logFilePaths, Schema readerSchema,
+                                       String latestInstantTime, Long maxMemorySizeInBytes, boolean readBlocksLazily,
+                                       boolean reverseReader, int bufferSize, String spillableMapBasePath,
+                                       Option<InstantRange> instantRange,
+                                       ExternalSpillableMap.DiskMapType diskMapType,
+                                       boolean isBitCaskDiskMapCompressionEnabled,
+                                       boolean withOperationField, boolean forceFullScan,
+                                       Option<String> partitionName,
+                                       InternalSchema internalSchema,
+                                       Option<String> keyFieldOverride,
+                                       boolean useScanV2, HoodieRecordMerger recordMerger) {
     super(fs, basePath, logFilePaths, readerSchema, latestInstantTime, readBlocksLazily, reverseReader, bufferSize,
-        instantRange, withOperationField,
-        forceFullScan, partitionName, internalSchema, useScanV2, recordMerger);
+        instantRange, withOperationField, forceFullScan, partitionName, internalSchema, keyFieldOverride, useScanV2, recordMerger);
     try {
+      this.maxMemorySizeInBytes = maxMemorySizeInBytes;
       // Store merged records for all versions for this log file, set the in-memory footprint to maxInMemoryMapSize
       this.records = new ExternalSpillableMap<>(maxMemorySizeInBytes, spillableMapBasePath, new DefaultSizeEstimator(),
           new HoodieRecordSizeEstimator(readerSchema), diskMapType, isBitCaskDiskMapCompressionEnabled);
-
-      this.maxMemorySizeInBytes = maxMemorySizeInBytes;
+      this.scannedPrefixes = new HashSet<>();
     } catch (IOException e) {
       throw new HoodieIOException("IOException when creating ExternalSpillableMap at " + spillableMapBasePath, e);
     }
@@ -108,30 +116,106 @@ public class HoodieMergedLogRecordScanner extends AbstractHoodieLogRecordReader
     }
   }
 
-  protected void performScan() {
+  /**
+   * Scans delta-log files processing blocks
+   */
+  public final void scan() {
+    scan(false);
+  }
+
+  public final void scan(boolean skipProcessingBlocks) {
+    if (forceFullScan) {
+      // NOTE: When full-scan is enforced, scanning is invoked upfront (during initialization)
+      return;
+    }
+
+    scanInternal(Option.empty(), skipProcessingBlocks);
+  }
+
+  /**
+   * Provides incremental scanning capability where only provided keys will be looked
+   * up in the delta-log files, scanned and subsequently materialized into the internal
+   * cache
+   *
+   * @param keys to be looked up
+   */
+  public void scanByFullKeys(List<String> keys) {
+    // We can skip scanning in case reader is in full-scan mode, in which case all blocks
+    // are processed upfront (no additional scanning is necessary)
+    if (forceFullScan) {
+      return; // no-op
+    }
+
+    List<String> missingKeys = keys.stream()
+        .filter(key -> !records.containsKey(key))
+        .collect(Collectors.toList());
+
+    if (missingKeys.isEmpty()) {
+      // All the required records are already fetched, no-op
+      return;
+    }
+
+    scanInternal(Option.of(KeySpec.fullKeySpec(missingKeys)), false);
+  }
+
+  /**
+   * Provides incremental scanning capability where only keys matching provided key-prefixes
+   * will be looked up in the delta-log files, scanned and subsequently materialized into
+   * the internal cache
+   *
+   * @param keyPrefixes to be looked up
+   */
+  public void scanByKeyPrefixes(List<String> keyPrefixes) {
+    // We can skip scanning in case reader is in full-scan mode, in which case all blocks
+    // are processed upfront (no additional scanning is necessary)
+    if (forceFullScan) {
+      return;
+    }
+
+    List<String> missingKeyPrefixes = keyPrefixes.stream()
+        .filter(keyPrefix ->
+            // NOTE: We can skip scanning the prefixes that have already
+            //       been covered by the previous scans
+            scannedPrefixes.stream().noneMatch(keyPrefix::startsWith))
+        .collect(Collectors.toList());
+
+    if (missingKeyPrefixes.isEmpty()) {
+      // All the required records are already fetched, no-op
+      return;
+    }
+
+    // NOTE: When looking up by key-prefixes unfortunately we can't short-circuit
+    //       and will have to scan every time as we can't know (based on just
+    //       the records cached) whether particular prefix was scanned or just records
+    //       matching the prefix looked up (by [[scanByFullKeys]] API)
+    scanInternal(Option.of(KeySpec.prefixKeySpec(missingKeyPrefixes)), false);
+    scannedPrefixes.addAll(missingKeyPrefixes);
+  }
+
+  private void performScan() {
     // Do the scan and merge
     timer.startTimer();
-    scan();
+
+    scanInternal(Option.empty(), false);
+
     this.totalTimeTakenToReadAndMergeBlocks = timer.endTimer();
     this.numMergedRecordsInLog = records.size();
+
     LOG.info("Number of log files scanned => " + logFilePaths.size());
     LOG.info("MaxMemoryInBytes allowed for compaction => " + maxMemorySizeInBytes);
     LOG.info("Number of entries in MemoryBasedMap in ExternalSpillableMap => " + records.getInMemoryMapNumEntries());
-    LOG.info(
-        "Total size in bytes of MemoryBasedMap in ExternalSpillableMap => " + records.getCurrentInMemoryMapSize());
+    LOG.info("Total size in bytes of MemoryBasedMap in ExternalSpillableMap => " + records.getCurrentInMemoryMapSize());
     LOG.info("Number of entries in BitCaskDiskMap in ExternalSpillableMap => " + records.getDiskBasedMapNumEntries());
     LOG.info("Size of file spilled to disk => " + records.getSizeOfFileOnDiskInBytes());
   }
 
   @Override
   public Iterator<HoodieRecord> iterator() {
-    checkState(forceFullScan, "Record reader has to be in full-scan mode to use this API");
     return records.iterator();
   }
 
   public Map<String, HoodieRecord> getRecords() {
-    checkState(forceFullScan, "Record reader has to be in full-scan mode to use this API");
-    return records;
+    return Collections.unmodifiableMap(records);
   }
 
   public HoodieRecordType getRecordType() {
@@ -152,20 +236,20 @@ public class HoodieMergedLogRecordScanner extends AbstractHoodieLogRecordReader
   @Override
   protected <T> void processNextRecord(HoodieRecord<T> newRecord) throws IOException {
     String key = newRecord.getRecordKey();
-    if (records.containsKey(key)) {
-      // Merge and store the merged record. The HoodieRecordPayload implementation is free to decide what should be
-      // done when a DELETE (empty payload) is encountered before or after an insert/update.
-
-      HoodieRecord<T> oldRecord = records.get(key);
-      T oldValue = oldRecord.getData();
-      HoodieRecord<T> combinedRecord = (HoodieRecord<T>) recordMerger.merge(oldRecord, readerSchema,
+    HoodieRecord<T> prevRecord = records.get(key);
+    if (prevRecord != null) {
+      // Merge and store the combined record
+      HoodieRecord<T> combinedRecord = (HoodieRecord<T>) recordMerger.merge(prevRecord, readerSchema,
           newRecord, readerSchema, this.getPayloadProps()).get().getLeft();
-      // If combinedValue is oldValue, no need rePut oldRecord
-      if (combinedRecord.getData() != oldValue) {
-        HoodieRecord latestHoodieRecord = combinedRecord.newInstance(new HoodieKey(key, newRecord.getPartitionPath()), newRecord.getOperation());
+      // If pre-combine returns existing record, no need to update it
+      if (combinedRecord.getData() != prevRecord.getData()) {
+        HoodieRecord latestHoodieRecord =
+            combinedRecord.newInstance(new HoodieKey(key, newRecord.getPartitionPath()), newRecord.getOperation());
+
         latestHoodieRecord.unseal();
         latestHoodieRecord.setCurrentLocation(newRecord.getCurrentLocation());
         latestHoodieRecord.seal();
+
         // NOTE: Record have to be cloned here to make sure if it holds low-level engine-specific
         //       payload pointing into a shared, mutable (underlying) buffer we get a clean copy of
         //       it since these records will be put into records(Map).
@@ -215,6 +299,7 @@ public class HoodieMergedLogRecordScanner extends AbstractHoodieLogRecordReader
     return totalTimeTakenToReadAndMergeBlocks;
   }
 
+  @Override
   public void close() {
     if (records != null) {
       records.close();
@@ -225,27 +310,28 @@ public class HoodieMergedLogRecordScanner extends AbstractHoodieLogRecordReader
    * Builder used to build {@code HoodieUnMergedLogRecordScanner}.
    */
   public static class Builder extends AbstractHoodieLogRecordReader.Builder {
-    protected FileSystem fs;
-    protected String basePath;
-    protected List<String> logFilePaths;
-    protected Schema readerSchema;
+    private FileSystem fs;
+    private String basePath;
+    private List<String> logFilePaths;
+    private Schema readerSchema;
     private InternalSchema internalSchema = InternalSchema.getEmptyInternalSchema();
-    protected String latestInstantTime;
-    protected boolean readBlocksLazily;
-    protected boolean reverseReader;
-    protected int bufferSize;
+    private String latestInstantTime;
+    private boolean readBlocksLazily;
+    private boolean reverseReader;
+    private int bufferSize;
     // specific configurations
-    protected Long maxMemorySizeInBytes;
-    protected String spillableMapBasePath;
-    protected ExternalSpillableMap.DiskMapType diskMapType = HoodieCommonConfig.SPILLABLE_DISK_MAP_TYPE.defaultValue();
-    protected boolean isBitCaskDiskMapCompressionEnabled = HoodieCommonConfig.DISK_MAP_BITCASK_COMPRESSION_ENABLED.defaultValue();
+    private Long maxMemorySizeInBytes;
+    private String spillableMapBasePath;
+    private ExternalSpillableMap.DiskMapType diskMapType = HoodieCommonConfig.SPILLABLE_DISK_MAP_TYPE.defaultValue();
+    private boolean isBitCaskDiskMapCompressionEnabled = HoodieCommonConfig.DISK_MAP_BITCASK_COMPRESSION_ENABLED.defaultValue();
     // incremental filtering
-    protected Option<InstantRange> instantRange = Option.empty();
-    protected String partitionName;
-    // auto scan default true
-    private boolean autoScan = true;
+    private Option<InstantRange> instantRange = Option.empty();
+    private String partitionName;
     // operation field default false
     private boolean withOperationField = false;
+    private String keyFieldOverride;
+    // By default, we're doing a full-scan
+    private boolean forceFullScan = true;
     // Use scanV2 method.
     private boolean useScanV2 = false;
     private HoodieRecordMerger recordMerger;
@@ -355,6 +441,16 @@ public class HoodieMergedLogRecordScanner extends AbstractHoodieLogRecordReader
       return this;
     }
 
+    public Builder withKeyFiledOverride(String keyFieldOverride) {
+      this.keyFieldOverride = Objects.requireNonNull(keyFieldOverride);
+      return this;
+    }
+
+    public Builder withForceFullScan(boolean forceFullScan) {
+      this.forceFullScan = forceFullScan;
+      return this;
+    }
+
     @Override
     public HoodieMergedLogRecordScanner build() {
       if (this.partitionName == null && CollectionUtils.nonEmpty(this.logFilePaths)) {
@@ -365,8 +461,8 @@ public class HoodieMergedLogRecordScanner extends AbstractHoodieLogRecordReader
       return new HoodieMergedLogRecordScanner(fs, basePath, logFilePaths, readerSchema,
           latestInstantTime, maxMemorySizeInBytes, readBlocksLazily, reverseReader,
           bufferSize, spillableMapBasePath, instantRange,
-          diskMapType, isBitCaskDiskMapCompressionEnabled, withOperationField, true,
-          Option.ofNullable(partitionName), internalSchema, useScanV2, recordMerger);
+          diskMapType, isBitCaskDiskMapCompressionEnabled, withOperationField, forceFullScan,
+          Option.ofNullable(partitionName), internalSchema, Option.ofNullable(keyFieldOverride), useScanV2, recordMerger);
     }
   }
 }
diff --git a/hudi-common/src/main/java/org/apache/hudi/common/table/log/HoodieUnMergedLogRecordScanner.java b/hudi-common/src/main/java/org/apache/hudi/common/table/log/HoodieUnMergedLogRecordScanner.java
index c5468f00512..2a7c91641e1 100644
--- a/hudi-common/src/main/java/org/apache/hudi/common/table/log/HoodieUnMergedLogRecordScanner.java
+++ b/hudi-common/src/main/java/org/apache/hudi/common/table/log/HoodieUnMergedLogRecordScanner.java
@@ -44,10 +44,21 @@ public class HoodieUnMergedLogRecordScanner extends AbstractHoodieLogRecordReade
                                          LogRecordScannerCallback callback, Option<InstantRange> instantRange, InternalSchema internalSchema,
                                          boolean useScanV2, HoodieRecordMerger recordMerger) {
     super(fs, basePath, logFilePaths, readerSchema, latestInstantTime, readBlocksLazily, reverseReader, bufferSize, instantRange,
-        false, true, Option.empty(), internalSchema, useScanV2, recordMerger);
+        false, true, Option.empty(), internalSchema, Option.empty(), useScanV2, recordMerger);
     this.callback = callback;
   }
 
+  /**
+   * Scans delta-log files processing blocks
+   */
+  public final void scan() {
+    scan(false);
+  }
+
+  public final void scan(boolean skipProcessingBlocks) {
+    scanInternal(Option.empty(), skipProcessingBlocks);
+  }
+
   /**
    * Returns the builder for {@code HoodieUnMergedLogRecordScanner}.
    */
diff --git a/hudi-common/src/main/java/org/apache/hudi/common/util/CollectionUtils.java b/hudi-common/src/main/java/org/apache/hudi/common/util/CollectionUtils.java
index f491adc6cab..3faddb91b4d 100644
--- a/hudi-common/src/main/java/org/apache/hudi/common/util/CollectionUtils.java
+++ b/hudi-common/src/main/java/org/apache/hudi/common/util/CollectionUtils.java
@@ -166,7 +166,7 @@ public class CollectionUtils {
   /**
    * Returns difference b/w {@code one} {@link Set} of elements and {@code another}
    */
-  public static <E> Set<E> diff(Collection<E> one, Collection<E> another) {
+  public static <E> Set<E> diffSet(Set<E> one, Set<E> another) {
     Set<E> diff = new HashSet<>(one);
     diff.removeAll(another);
     return diff;
@@ -178,7 +178,7 @@ public class CollectionUtils {
    * NOTE: This is less optimal counterpart to {@link #diff(Collection, Collection)}, accepting {@link List}
    *       as a holding collection to support duplicate elements use-cases
    */
-  public static <E> List<E> diff(List<E> one, List<E> another) {
+  public static <E> List<E> diff(Collection<E> one, Collection<E> another) {
     List<E> diff = new ArrayList<>(one);
     diff.removeAll(another);
     return diff;
diff --git a/hudi-common/src/main/java/org/apache/hudi/common/util/collection/ExternalSpillableMap.java b/hudi-common/src/main/java/org/apache/hudi/common/util/collection/ExternalSpillableMap.java
index 218f0d9f16e..ee930e588d0 100644
--- a/hudi-common/src/main/java/org/apache/hudi/common/util/collection/ExternalSpillableMap.java
+++ b/hudi-common/src/main/java/org/apache/hudi/common/util/collection/ExternalSpillableMap.java
@@ -24,6 +24,7 @@ import org.apache.hudi.exception.HoodieIOException;
 import org.apache.log4j.LogManager;
 import org.apache.log4j.Logger;
 
+import javax.annotation.concurrent.NotThreadSafe;
 import java.io.IOException;
 import java.io.Serializable;
 import java.util.ArrayList;
@@ -51,6 +52,7 @@ import java.util.stream.Stream;
  * map may occupy more memory than is available, resulting in OOM. However, if the spill threshold is too low, we spill
  * frequently and incur unnecessary disk writes.
  */
+@NotThreadSafe
 public class ExternalSpillableMap<T extends Serializable, R extends Serializable> implements Map<T, R>, Serializable {
 
   // Find the actual estimated payload size after inserting N records
diff --git a/hudi-common/src/main/java/org/apache/hudi/metadata/HoodieBackedTableMetadata.java b/hudi-common/src/main/java/org/apache/hudi/metadata/HoodieBackedTableMetadata.java
index 2721795081c..26780265d03 100644
--- a/hudi-common/src/main/java/org/apache/hudi/metadata/HoodieBackedTableMetadata.java
+++ b/hudi-common/src/main/java/org/apache/hudi/metadata/HoodieBackedTableMetadata.java
@@ -46,6 +46,7 @@ import org.apache.hudi.common.table.timeline.HoodieTimeline;
 import org.apache.hudi.common.table.timeline.TimelineMetadataUtils;
 import org.apache.hudi.common.table.view.HoodieTableFileSystemView;
 import org.apache.hudi.common.util.ClosableIterator;
+import org.apache.hudi.common.util.CollectionUtils;
 import org.apache.hudi.common.util.HoodieTimer;
 import org.apache.hudi.common.util.Option;
 import org.apache.hudi.common.util.SpillableMapUtils;
@@ -75,7 +76,6 @@ import java.util.stream.Collectors;
 
 import static org.apache.hudi.common.util.CollectionUtils.isNullOrEmpty;
 import static org.apache.hudi.common.util.CollectionUtils.toStream;
-import static org.apache.hudi.common.util.ValidationUtils.checkArgument;
 import static org.apache.hudi.metadata.HoodieTableMetadataUtil.PARTITION_NAME_BLOOM_FILTERS;
 import static org.apache.hudi.metadata.HoodieTableMetadataUtil.PARTITION_NAME_COLUMN_STATS;
 import static org.apache.hudi.metadata.HoodieTableMetadataUtil.PARTITION_NAME_FILES;
@@ -99,7 +99,7 @@ public class HoodieBackedTableMetadata extends BaseTableMetadata {
   private final boolean reuse;
 
   // Readers for the latest file slice corresponding to file groups in the metadata partition
-  private final Map<Pair<String, String>, Pair<HoodieSeekingFileReader<?>, HoodieMetadataMergedLogRecordReader>> partitionReaders =
+  private final Map<Pair<String, String>, Pair<HoodieSeekingFileReader<?>, HoodieMetadataLogRecordReader>> partitionReaders =
       new ConcurrentHashMap<>();
 
   public HoodieBackedTableMetadata(HoodieEngineContext engineContext, HoodieMetadataConfig metadataConfig,
@@ -177,13 +177,13 @@ public class HoodieBackedTableMetadata extends BaseTableMetadata {
             (SerializableFunction<FileSlice, Iterator<HoodieRecord<HoodieMetadataPayload>>>) fileSlice -> {
               // NOTE: Since this will be executed by executors, we can't access previously cached
               //       readers, and therefore have to always open new ones
-              Pair<HoodieSeekingFileReader<?>, HoodieMetadataMergedLogRecordReader> readers =
+              Pair<HoodieSeekingFileReader<?>, HoodieMetadataLogRecordReader> readers =
                   openReaders(partitionName, fileSlice);
               try {
                 List<Long> timings = new ArrayList<>();
 
                 HoodieSeekingFileReader<?> baseFileReader = readers.getKey();
-                HoodieMetadataMergedLogRecordReader logRecordScanner = readers.getRight();
+                HoodieMetadataLogRecordReader logRecordScanner = readers.getRight();
 
                 if (baseFileReader == null && logRecordScanner == null) {
                   // TODO: what do we do if both does not exist? should we throw an exception and let caller do the fallback ?
@@ -223,12 +223,12 @@ public class HoodieBackedTableMetadata extends BaseTableMetadata {
     List<Pair<String, Option<HoodieRecord<HoodieMetadataPayload>>>> result = new ArrayList<>();
     AtomicInteger fileSlicesKeysCount = new AtomicInteger();
     partitionFileSliceToKeysMap.forEach((partitionFileSlicePair, fileSliceKeys) -> {
-      Pair<HoodieSeekingFileReader<?>, HoodieMetadataMergedLogRecordReader> readers =
+      Pair<HoodieSeekingFileReader<?>, HoodieMetadataLogRecordReader> readers =
           getOrCreateReaders(partitionName, partitionFileSlicePair.getRight());
       try {
         List<Long> timings = new ArrayList<>();
         HoodieSeekingFileReader<?> baseFileReader = readers.getKey();
-        HoodieMetadataMergedLogRecordReader logRecordScanner = readers.getRight();
+        HoodieMetadataLogRecordReader logRecordScanner = readers.getRight();
         if (baseFileReader == null && logRecordScanner == null) {
           return;
         }
@@ -255,38 +255,35 @@ public class HoodieBackedTableMetadata extends BaseTableMetadata {
     return result;
   }
 
-  private Map<String, Option<HoodieRecord<HoodieMetadataPayload>>> readLogRecords(HoodieMetadataMergedLogRecordReader logRecordScanner,
+  private Map<String, Option<HoodieRecord<HoodieMetadataPayload>>> readLogRecords(HoodieMetadataLogRecordReader logRecordReader,
                                                                                   List<String> keys,
                                                                                   boolean fullKey,
                                                                                   List<Long> timings) {
     HoodieTimer timer = HoodieTimer.start();
 
-    if (logRecordScanner == null) {
+    if (logRecordReader == null) {
       timings.add(timer.endTimer());
       return Collections.emptyMap();
     }
 
-    String partitionName = logRecordScanner.getPartitionName().get();
+    Map<String, Option<HoodieRecord<HoodieMetadataPayload>>> logRecords = new HashMap<>(keys.size());
 
-    Map<String, Option<HoodieRecord<HoodieMetadataPayload>>> logRecords = new HashMap<>();
-    if (isFullScanAllowedForPartition(partitionName)) {
-      checkArgument(fullKey, "If full-scan is required, only full keys could be used!");
-      // Path which does full scan of log files
-      for (String key : keys) {
-        logRecords.put(key, logRecordScanner.getRecordByKey(key).get(0).getValue());
-      }
-    } else {
-      // This path will do seeks pertaining to the keys passed in
-      List<Pair<String, Option<HoodieRecord<HoodieMetadataPayload>>>> logRecordsList =
-          fullKey ? logRecordScanner.getRecordsByKeys(keys)
-              : logRecordScanner.getRecordsByKeyPrefixes(keys)
-                  .stream()
-                  .map(record -> Pair.of(record.getRecordKey(), Option.of(record)))
-                  .collect(Collectors.toList());
-
-      for (Pair<String, Option<HoodieRecord<HoodieMetadataPayload>>> entry : logRecordsList) {
-        logRecords.put(entry.getKey(), entry.getValue());
-      }
+    // First, fetch the keys being looked up
+    List<Pair<String, Option<HoodieRecord<HoodieMetadataPayload>>>> logRecordsList =
+        (fullKey ? logRecordReader.getRecordsByKeys(keys) : logRecordReader.getRecordsByKeyPrefixes(keys))
+                .stream()
+                .map(record -> Pair.of(record.getRecordKey(), Option.of(record)))
+                .collect(Collectors.toList());
+
+    // Second, back-fill keys not present in the log-blocks (such that map holds
+    // a record for every key being looked up)
+    List<String> missingKeys = CollectionUtils.diff(keys, logRecords.keySet());
+    for (String key : missingKeys) {
+      logRecords.put(key, Option.empty());
+    }
+
+    for (Pair<String, Option<HoodieRecord<HoodieMetadataPayload>>> entry : logRecordsList) {
+      logRecords.put(entry.getKey(), entry.getValue());
     }
 
     timings.add(timer.endTimer());
@@ -415,7 +412,7 @@ public class HoodieBackedTableMetadata extends BaseTableMetadata {
    * @param slice            - The file slice to open readers for
    * @return File reader and the record scanner pair for the requested file slice
    */
-  private Pair<HoodieSeekingFileReader<?>, HoodieMetadataMergedLogRecordReader> getOrCreateReaders(String partitionName, FileSlice slice) {
+  private Pair<HoodieSeekingFileReader<?>, HoodieMetadataLogRecordReader> getOrCreateReaders(String partitionName, FileSlice slice) {
     if (reuse) {
       Pair<String, String> key = Pair.of(partitionName, slice.getFileId());
       return partitionReaders.computeIfAbsent(key, ignored -> openReaders(partitionName, slice));
@@ -424,7 +421,7 @@ public class HoodieBackedTableMetadata extends BaseTableMetadata {
     }
   }
 
-  private Pair<HoodieSeekingFileReader<?>, HoodieMetadataMergedLogRecordReader> openReaders(String partitionName, FileSlice slice) {
+  private Pair<HoodieSeekingFileReader<?>, HoodieMetadataLogRecordReader> openReaders(String partitionName, FileSlice slice) {
     try {
       HoodieTimer timer = HoodieTimer.start();
       // Open base file reader
@@ -434,9 +431,9 @@ public class HoodieBackedTableMetadata extends BaseTableMetadata {
 
       // Open the log record scanner using the log files from the latest file slice
       List<HoodieLogFile> logFiles = slice.getLogFiles().collect(Collectors.toList());
-      Pair<HoodieMetadataMergedLogRecordReader, Long> logRecordScannerOpenTimePair =
+      Pair<HoodieMetadataLogRecordReader, Long> logRecordScannerOpenTimePair =
           getLogRecordScanner(logFiles, partitionName, Option.empty());
-      HoodieMetadataMergedLogRecordReader logRecordScanner = logRecordScannerOpenTimePair.getKey();
+      HoodieMetadataLogRecordReader logRecordScanner = logRecordScannerOpenTimePair.getKey();
       final long logScannerOpenMs = logRecordScannerOpenTimePair.getValue();
 
       metrics.ifPresent(metrics -> metrics.updateMetrics(HoodieMetadataMetrics.SCAN_STR,
@@ -489,9 +486,9 @@ public class HoodieBackedTableMetadata extends BaseTableMetadata {
     return validInstantTimestamps;
   }
 
-  public Pair<HoodieMetadataMergedLogRecordReader, Long> getLogRecordScanner(List<HoodieLogFile> logFiles,
-                                                                             String partitionName,
-                                                                             Option<Boolean> allowFullScanOverride) {
+  public Pair<HoodieMetadataLogRecordReader, Long> getLogRecordScanner(List<HoodieLogFile> logFiles,
+                                                                       String partitionName,
+                                                                       Option<Boolean> allowFullScanOverride) {
     HoodieTimer timer = HoodieTimer.start();
     List<String> sortedLogFilePaths = logFiles.stream()
         .sorted(HoodieLogFile.getLogFileComparator())
@@ -510,7 +507,7 @@ public class HoodieBackedTableMetadata extends BaseTableMetadata {
     // Load the schema
     Schema schema = HoodieAvroUtils.addMetadataFields(HoodieMetadataRecord.getClassSchema());
     HoodieCommonConfig commonConfig = HoodieCommonConfig.newBuilder().fromProperties(metadataConfig.getProps()).build();
-    HoodieMetadataMergedLogRecordReader logRecordScanner = HoodieMetadataMergedLogRecordReader.newBuilder()
+    HoodieMetadataLogRecordReader logRecordScanner = HoodieMetadataLogRecordReader.newBuilder()
         .withFileSystem(metadataMetaClient.getFs())
         .withBasePath(metadataBasePath)
         .withLogFilePaths(sortedLogFilePaths)
@@ -522,7 +519,7 @@ public class HoodieBackedTableMetadata extends BaseTableMetadata {
         .withDiskMapType(commonConfig.getSpillableDiskMapType())
         .withBitCaskDiskMapCompressionEnabled(commonConfig.isBitCaskDiskMapCompressionEnabled())
         .withLogBlockTimestamps(validInstantTimestamps)
-        .allowFullScan(allowFullScan)
+        .enableFullScan(allowFullScan)
         .withPartition(partitionName)
         .withUseScanV2(metadataConfig.getUseLogRecordReaderScanV2())
         .build();
@@ -588,7 +585,7 @@ public class HoodieBackedTableMetadata extends BaseTableMetadata {
    * @param partitionFileSlicePair - Partition and FileSlice
    */
   private synchronized void close(Pair<String, String> partitionFileSlicePair) {
-    Pair<HoodieSeekingFileReader<?>, HoodieMetadataMergedLogRecordReader> readers =
+    Pair<HoodieSeekingFileReader<?>, HoodieMetadataLogRecordReader> readers =
         partitionReaders.remove(partitionFileSlicePair);
     closeReader(readers);
   }
@@ -603,7 +600,7 @@ public class HoodieBackedTableMetadata extends BaseTableMetadata {
     partitionReaders.clear();
   }
 
-  private void closeReader(Pair<HoodieSeekingFileReader<?>, HoodieMetadataMergedLogRecordReader> readers) {
+  private void closeReader(Pair<HoodieSeekingFileReader<?>, HoodieMetadataLogRecordReader> readers) {
     if (readers != null) {
       try {
         if (readers.getKey() != null) {
diff --git a/hudi-common/src/main/java/org/apache/hudi/metadata/HoodieMetadataLogRecordReader.java b/hudi-common/src/main/java/org/apache/hudi/metadata/HoodieMetadataLogRecordReader.java
new file mode 100644
index 00000000000..48b9d66f89b
--- /dev/null
+++ b/hudi-common/src/main/java/org/apache/hudi/metadata/HoodieMetadataLogRecordReader.java
@@ -0,0 +1,238 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hudi.metadata;
+
+import org.apache.avro.Schema;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hudi.common.model.HoodieAvroRecordMerger;
+import org.apache.hudi.common.model.HoodieRecord;
+import org.apache.hudi.common.table.log.HoodieMergedLogRecordScanner;
+import org.apache.hudi.common.table.log.InstantRange;
+import org.apache.hudi.common.util.Option;
+import org.apache.hudi.common.util.collection.ExternalSpillableMap;
+
+import javax.annotation.concurrent.ThreadSafe;
+import java.io.Closeable;
+import java.io.IOException;
+import java.util.Collections;
+import java.util.List;
+import java.util.Map;
+import java.util.Objects;
+import java.util.Set;
+import java.util.function.Predicate;
+import java.util.stream.Collectors;
+
+/**
+ * Metadata log-block records reading implementation, internally relying on
+ * {@link HoodieMergedLogRecordScanner} to merge corresponding Metadata Table's delta log-blocks
+ * sequence
+ */
+@ThreadSafe
+public class HoodieMetadataLogRecordReader implements Closeable {
+
+  private final HoodieMergedLogRecordScanner logRecordScanner;
+
+  private HoodieMetadataLogRecordReader(HoodieMergedLogRecordScanner logRecordScanner) {
+    this.logRecordScanner = logRecordScanner;
+  }
+
+  /**
+   * Returns the builder for {@code HoodieMetadataMergedLogRecordScanner}.
+   */
+  public static HoodieMetadataLogRecordReader.Builder newBuilder() {
+    return new HoodieMetadataLogRecordReader.Builder();
+  }
+
+  @SuppressWarnings("unchecked")
+  public List<HoodieRecord<HoodieMetadataPayload>> getRecords() {
+    // NOTE: Locking is necessary since we're accessing [[HoodieMetadataLogRecordReader]]
+    //       materialized state, to make sure there's no concurrent access
+    synchronized (this) {
+      logRecordScanner.scan();
+      return logRecordScanner.getRecords().values()
+          .stream()
+          .map(record -> (HoodieRecord<HoodieMetadataPayload>) record)
+          .collect(Collectors.toList());
+    }
+  }
+
+  @SuppressWarnings("unchecked")
+  public List<HoodieRecord<HoodieMetadataPayload>> getRecordsByKeyPrefixes(List<String> keyPrefixes) {
+    if (keyPrefixes.isEmpty()) {
+      return Collections.emptyList();
+    }
+
+    // NOTE: Locking is necessary since we're accessing [[HoodieMetadataLogRecordReader]]
+    //       materialized state, to make sure there's no concurrent access
+    synchronized (this) {
+      logRecordScanner.scanByKeyPrefixes(keyPrefixes);
+      Map<String, HoodieRecord> allRecords = logRecordScanner.getRecords();
+
+      Predicate<String> p = createPrefixMatchingPredicate(keyPrefixes);
+      return allRecords.entrySet()
+          .stream()
+          .filter(r -> r != null && p.test(r.getKey()))
+          .map(r -> (HoodieRecord<HoodieMetadataPayload>) r.getValue())
+          .collect(Collectors.toList());
+    }
+  }
+
+  /**
+   * Fetches records identified by the provided list of keys in case these are present in
+   * the delta-log blocks
+   */
+  @SuppressWarnings("unchecked")
+  public List<HoodieRecord<HoodieMetadataPayload>> getRecordsByKeys(List<String> keys) {
+    if (keys.isEmpty()) {
+      return Collections.emptyList();
+    }
+
+    // NOTE: Locking is necessary since we're accessing [[HoodieMetadataLogRecordReader]]
+    //       materialized state, to make sure there's no concurrent access
+    synchronized (this) {
+      logRecordScanner.scanByFullKeys(keys);
+      Map<String, HoodieRecord> allRecords = logRecordScanner.getRecords();
+      return keys.stream()
+          .map(key -> (HoodieRecord<HoodieMetadataPayload>) allRecords.get(key))
+          .filter(Objects::nonNull)
+          .collect(Collectors.toList());
+    }
+  }
+
+  @Override
+  public void close() throws IOException {
+    logRecordScanner.close();
+  }
+
+  private static Predicate<String> createPrefixMatchingPredicate(List<String> keyPrefixes) {
+    if (keyPrefixes.size() == 1) {
+      String keyPrefix = keyPrefixes.get(0);
+      return key -> key.startsWith(keyPrefix);
+    }
+
+    return key -> keyPrefixes.stream().anyMatch(key::startsWith);
+  }
+
+  /**
+   * Builder used to build {@code HoodieMetadataMergedLogRecordScanner}.
+   */
+  public static class Builder {
+    private final HoodieMergedLogRecordScanner.Builder scannerBuilder =
+        new HoodieMergedLogRecordScanner.Builder()
+            .withKeyFiledOverride(HoodieMetadataPayload.KEY_FIELD_NAME)
+            // NOTE: Merging of Metadata Table's records is currently handled using {@code HoodieAvroRecordMerger}
+            //       for compatibility purposes; In the future it {@code HoodieMetadataPayload} semantic
+            //       will be migrated to its own custom instance of {@code RecordMerger}
+            .withRecordMerger(new HoodieAvroRecordMerger())
+            .withReadBlocksLazily(true)
+            .withReverseReader(false)
+            .withOperationField(false);
+
+    public Builder withFileSystem(FileSystem fs) {
+      scannerBuilder.withFileSystem(fs);
+      return this;
+    }
+
+    public Builder withBasePath(String basePath) {
+      scannerBuilder.withBasePath(basePath);
+      return this;
+    }
+
+    public Builder withLogFilePaths(List<String> logFilePaths) {
+      scannerBuilder.withLogFilePaths(logFilePaths);
+      return this;
+    }
+
+    public Builder withReaderSchema(Schema schema) {
+      scannerBuilder.withReaderSchema(schema);
+      return this;
+    }
+
+    public Builder withLatestInstantTime(String latestInstantTime) {
+      scannerBuilder.withLatestInstantTime(latestInstantTime);
+      return this;
+    }
+
+    public Builder withBufferSize(int bufferSize) {
+      scannerBuilder.withBufferSize(bufferSize);
+      return this;
+    }
+
+    public Builder withPartition(String partitionName) {
+      scannerBuilder.withPartition(partitionName);
+      return this;
+    }
+
+    public Builder withMaxMemorySizeInBytes(Long maxMemorySizeInBytes) {
+      scannerBuilder.withMaxMemorySizeInBytes(maxMemorySizeInBytes);
+      return this;
+    }
+
+    public Builder withSpillableMapBasePath(String spillableMapBasePath) {
+      scannerBuilder.withSpillableMapBasePath(spillableMapBasePath);
+      return this;
+    }
+
+    public Builder withDiskMapType(ExternalSpillableMap.DiskMapType diskMapType) {
+      scannerBuilder.withDiskMapType(diskMapType);
+      return this;
+    }
+
+    public Builder withBitCaskDiskMapCompressionEnabled(boolean isBitCaskDiskMapCompressionEnabled) {
+      scannerBuilder.withBitCaskDiskMapCompressionEnabled(isBitCaskDiskMapCompressionEnabled);
+      return this;
+    }
+
+    public Builder withLogBlockTimestamps(Set<String> validLogBlockTimestamps) {
+      scannerBuilder.withInstantRange(Option.of(new ExplicitMatchRange(validLogBlockTimestamps)));
+      return this;
+    }
+
+    public Builder enableFullScan(boolean enableFullScan) {
+      scannerBuilder.withForceFullScan(enableFullScan);
+      return this;
+    }
+
+    public Builder withUseScanV2(boolean useScanV2) {
+      scannerBuilder.withUseScanV2(useScanV2);
+      return this;
+    }
+
+    public HoodieMetadataLogRecordReader build() {
+      return new HoodieMetadataLogRecordReader(scannerBuilder.build());
+    }
+  }
+
+  /**
+   * Class to assist in checking if an instant is part of a set of instants.
+   */
+  private static class ExplicitMatchRange extends InstantRange {
+    Set<String> instants;
+
+    public ExplicitMatchRange(Set<String> instants) {
+      super(Collections.min(instants), Collections.max(instants));
+      this.instants = instants;
+    }
+
+    @Override
+    public boolean isInRange(String instant) {
+      return this.instants.contains(instant);
+    }
+  }
+}
diff --git a/hudi-common/src/main/java/org/apache/hudi/metadata/HoodieMetadataMergedLogRecordReader.java b/hudi-common/src/main/java/org/apache/hudi/metadata/HoodieMetadataMergedLogRecordReader.java
deleted file mode 100644
index aec9877f07d..00000000000
--- a/hudi-common/src/main/java/org/apache/hudi/metadata/HoodieMetadataMergedLogRecordReader.java
+++ /dev/null
@@ -1,254 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *      http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.hudi.metadata;
-
-import org.apache.avro.Schema;
-import org.apache.hadoop.fs.FileSystem;
-import org.apache.hudi.common.config.HoodieMetadataConfig;
-import org.apache.hudi.common.model.HoodieAvroRecordMerger;
-import org.apache.hudi.common.model.HoodieRecord;
-import org.apache.hudi.common.table.log.HoodieMergedLogRecordScanner;
-import org.apache.hudi.common.table.log.InstantRange;
-import org.apache.hudi.common.util.HoodieRecordUtils;
-import org.apache.hudi.common.util.Option;
-import org.apache.hudi.common.util.collection.ExternalSpillableMap;
-import org.apache.hudi.common.util.collection.Pair;
-import org.apache.hudi.internal.schema.InternalSchema;
-
-import org.apache.log4j.LogManager;
-import org.apache.log4j.Logger;
-
-import java.util.Collections;
-import java.util.List;
-import java.util.Objects;
-import java.util.Set;
-import java.util.stream.Collectors;
-
-import static org.apache.hudi.common.util.ValidationUtils.checkState;
-
-/**
- * A {@code HoodieMergedLogRecordScanner} implementation which only merged records matching providing keys. This is
- * useful in limiting memory usage when only a small subset of updates records are to be read.
- */
-public class HoodieMetadataMergedLogRecordReader extends HoodieMergedLogRecordScanner {
-
-  private static final Logger LOG = LogManager.getLogger(HoodieMetadataMergedLogRecordReader.class);
-
-  private HoodieMetadataMergedLogRecordReader(FileSystem fs, String basePath, String partitionName,
-                                              List<String> logFilePaths,
-                                              Schema readerSchema, String latestInstantTime,
-                                              Long maxMemorySizeInBytes, int bufferSize,
-                                              String spillableMapBasePath,
-                                              ExternalSpillableMap.DiskMapType diskMapType,
-                                              boolean isBitCaskDiskMapCompressionEnabled,
-                                              Option<InstantRange> instantRange, boolean allowFullScan, boolean useScanV2) {
-    super(fs, basePath, logFilePaths, readerSchema, latestInstantTime, maxMemorySizeInBytes, true, false, bufferSize,
-        spillableMapBasePath, instantRange, diskMapType, isBitCaskDiskMapCompressionEnabled, false, allowFullScan,
-            Option.of(partitionName), InternalSchema.getEmptyInternalSchema(), useScanV2, HoodieRecordUtils.loadRecordMerger(HoodieAvroRecordMerger.class.getName()));
-  }
-
-  /**
-   * Returns the builder for {@code HoodieMetadataMergedLogRecordScanner}.
-   */
-  public static HoodieMetadataMergedLogRecordReader.Builder newBuilder() {
-    return new HoodieMetadataMergedLogRecordReader.Builder();
-  }
-
-  /**
-   * Retrieve a record given its key.
-   *
-   * @param key Key of the record to retrieve
-   * @return {@code HoodieRecord} if key was found else {@code Option.empty()}
-   */
-  public synchronized List<Pair<String, Option<HoodieRecord<HoodieMetadataPayload>>>> getRecordByKey(String key) {
-    checkState(forceFullScan, "Record reader has to be in full-scan mode to use this API");
-    return Collections.singletonList(Pair.of(key, Option.ofNullable((HoodieRecord) records.get(key))));
-  }
-
-  @SuppressWarnings("unchecked")
-  public List<HoodieRecord<HoodieMetadataPayload>> getRecordsByKeyPrefixes(List<String> keyPrefixes) {
-    // Following operations have to be atomic, otherwise concurrent
-    // readers would race with each other and could crash when
-    // processing log block records as part of scan.
-    synchronized (this) {
-      records.clear();
-      scanInternal(Option.of(new KeySpec(keyPrefixes, false)), false);
-      return records.values().stream()
-          .filter(Objects::nonNull)
-          .map(record -> (HoodieRecord<HoodieMetadataPayload>) record)
-          .collect(Collectors.toList());
-    }
-  }
-
-  @SuppressWarnings("unchecked")
-  public synchronized List<Pair<String, Option<HoodieRecord<HoodieMetadataPayload>>>> getRecordsByKeys(List<String> keys) {
-    // Following operations have to be atomic, otherwise concurrent
-    // readers would race with each other and could crash when
-    // processing log block records as part of scan.
-    synchronized (this) {
-      records.clear();
-      scan(keys);
-      return keys.stream()
-          .map(key -> Pair.of(key, Option.ofNullable((HoodieRecord<HoodieMetadataPayload>) records.get(key))))
-          .collect(Collectors.toList());
-    }
-  }
-
-  @Override
-  protected boolean getPopulateMetaFields() {
-    return this.hoodieTableMetaClient.getTableConfig().populateMetaFields() && super.getPopulateMetaFields();
-  }
-
-  @Override
-  protected String getKeyField() {
-    return HoodieMetadataPayload.KEY_FIELD_NAME;
-  }
-
-  /**
-   * Builder used to build {@code HoodieMetadataMergedLogRecordScanner}.
-   */
-  public static class Builder extends HoodieMergedLogRecordScanner.Builder {
-
-    private boolean allowFullScan = HoodieMetadataConfig.ENABLE_FULL_SCAN_LOG_FILES.defaultValue();
-
-    // Use scanV2 method.
-    private boolean useScanV2 = false;
-
-    @Override
-    public Builder withFileSystem(FileSystem fs) {
-      this.fs = fs;
-      return this;
-    }
-
-    @Override
-    public Builder withBasePath(String basePath) {
-      this.basePath = basePath;
-      return this;
-    }
-
-    @Override
-    public Builder withLogFilePaths(List<String> logFilePaths) {
-      this.logFilePaths = logFilePaths;
-      return this;
-    }
-
-    @Override
-    public Builder withReaderSchema(Schema schema) {
-      this.readerSchema = schema;
-      return this;
-    }
-
-    @Override
-    public Builder withInternalSchema(InternalSchema internalSchema) {
-      throw new UnsupportedOperationException();
-    }
-
-    @Override
-    public Builder withLatestInstantTime(String latestInstantTime) {
-      this.latestInstantTime = latestInstantTime;
-      return this;
-    }
-
-    @Override
-    public Builder withReadBlocksLazily(boolean readBlocksLazily) {
-      throw new UnsupportedOperationException();
-    }
-
-    @Override
-    public Builder withReverseReader(boolean reverseReader) {
-      throw new UnsupportedOperationException();
-    }
-
-    @Override
-    public Builder withBufferSize(int bufferSize) {
-      this.bufferSize = bufferSize;
-      return this;
-    }
-
-    @Override
-    public Builder withPartition(String partitionName) {
-      this.partitionName = partitionName;
-      return this;
-    }
-
-    @Override
-    public Builder withMaxMemorySizeInBytes(Long maxMemorySizeInBytes) {
-      this.maxMemorySizeInBytes = maxMemorySizeInBytes;
-      return this;
-    }
-
-    @Override
-    public Builder withSpillableMapBasePath(String spillableMapBasePath) {
-      this.spillableMapBasePath = spillableMapBasePath;
-      return this;
-    }
-
-    @Override
-    public Builder withDiskMapType(ExternalSpillableMap.DiskMapType diskMapType) {
-      this.diskMapType = diskMapType;
-      return this;
-    }
-
-    @Override
-    public Builder withBitCaskDiskMapCompressionEnabled(boolean isBitCaskDiskMapCompressionEnabled) {
-      this.isBitCaskDiskMapCompressionEnabled = isBitCaskDiskMapCompressionEnabled;
-      return this;
-    }
-
-    public Builder withLogBlockTimestamps(Set<String> validLogBlockTimestamps) {
-      withInstantRange(Option.of(new ExplicitMatchRange(validLogBlockTimestamps)));
-      return this;
-    }
-
-    public Builder allowFullScan(boolean enableFullScan) {
-      this.allowFullScan = enableFullScan;
-      return this;
-    }
-
-    @Override
-    public Builder withUseScanV2(boolean useScanV2) {
-      this.useScanV2 = useScanV2;
-      return this;
-    }
-
-    @Override
-    public HoodieMetadataMergedLogRecordReader build() {
-      return new HoodieMetadataMergedLogRecordReader(fs, basePath, partitionName, logFilePaths, readerSchema,
-          latestInstantTime, maxMemorySizeInBytes, bufferSize, spillableMapBasePath,
-          diskMapType, isBitCaskDiskMapCompressionEnabled, instantRange, allowFullScan, useScanV2);
-    }
-  }
-
-  /**
-   * Class to assist in checking if an instant is part of a set of instants.
-   */
-  private static class ExplicitMatchRange extends InstantRange {
-    Set<String> instants;
-
-    public ExplicitMatchRange(Set<String> instants) {
-      super(Collections.min(instants), Collections.max(instants));
-      this.instants = instants;
-    }
-
-    @Override
-    public boolean isInRange(String instant) {
-      return this.instants.contains(instant);
-    }
-  }
-}
diff --git a/hudi-common/src/test/java/org/apache/hudi/common/functional/TestHoodieLogFormat.java b/hudi-common/src/test/java/org/apache/hudi/common/functional/TestHoodieLogFormat.java
index 1d9c5f58240..66b1c25cef7 100755
--- a/hudi-common/src/test/java/org/apache/hudi/common/functional/TestHoodieLogFormat.java
+++ b/hudi-common/src/test/java/org/apache/hudi/common/functional/TestHoodieLogFormat.java
@@ -93,6 +93,7 @@ import java.util.ArrayList;
 import java.util.Arrays;
 import java.util.Collection;
 import java.util.Collections;
+import java.util.Comparator;
 import java.util.HashMap;
 import java.util.HashSet;
 import java.util.List;
@@ -107,6 +108,7 @@ import static org.junit.jupiter.api.Assertions.assertFalse;
 import static org.junit.jupiter.api.Assertions.assertNotEquals;
 import static org.junit.jupiter.api.Assertions.assertNotNull;
 import static org.junit.jupiter.api.Assertions.assertNull;
+import static org.junit.jupiter.api.Assertions.assertSame;
 import static org.junit.jupiter.api.Assertions.assertThrows;
 import static org.junit.jupiter.api.Assertions.assertTrue;
 import static org.junit.jupiter.params.provider.Arguments.arguments;
@@ -622,28 +624,14 @@ public class TestHoodieLogFormat extends HoodieCommonTestHarness {
                                                   boolean readBlocksLazily,
                                                   boolean useScanv2)
       throws IOException, URISyntaxException, InterruptedException {
-    Writer writer =
-        HoodieLogFormat.newWriterBuilder().onParentPath(partitionPath).withFileExtension(HoodieLogFile.DELTA_EXTENSION)
-            .withSizeThreshold(1024).withFileId("test-fileid1").overBaseCommit("100").withFs(fs).build();
+
+    // Generate 4 delta-log files w/ random records
     Schema schema = HoodieAvroUtils.addMetadataFields(getSimpleSchema());
-    Map<HoodieLogBlock.HeaderMetadataType, String> header = new HashMap<>();
-    header.put(HoodieLogBlock.HeaderMetadataType.INSTANT_TIME, "100");
-    header.put(HoodieLogBlock.HeaderMetadataType.SCHEMA, getSimpleSchema().toString());
+    SchemaTestUtil testUtil = new SchemaTestUtil();
+    List<IndexedRecord> genRecords = testUtil.generateHoodieTestRecords(0, 400);
+
+    Set<HoodieLogFile> logFiles = writeLogFiles(partitionPath, schema, genRecords, 4);
 
-    Set<HoodieLogFile> logFiles = new HashSet<>();
-    List<List<IndexedRecord>> allRecords = new ArrayList<>();
-    // create 4 log files
-    while (writer.getLogFile().getLogVersion() != 4) {
-      logFiles.add(writer.getLogFile());
-      List<IndexedRecord> records1 = SchemaTestUtil.generateHoodieTestRecords(0, 100);
-      List<IndexedRecord> copyOfRecords1 = records1.stream()
-          .map(record -> HoodieAvroUtils.rewriteRecord((GenericRecord) record, schema)).collect(Collectors.toList());
-      allRecords.add(copyOfRecords1);
-      header.put(HoodieLogBlock.HeaderMetadataType.SCHEMA, schema.toString());
-      HoodieDataBlock dataBlock = getDataBlock(DEFAULT_DATA_BLOCK_TYPE, records1, header);
-      writer.appendBlock(dataBlock);
-    }
-    writer.close();
     FileCreateUtils.createDeltaCommit(basePath, "100", fs);
     // scan all log blocks (across multiple log files)
     HoodieMergedLogRecordScanner scanner = HoodieMergedLogRecordScanner.newBuilder()
@@ -671,11 +659,190 @@ public class TestHoodieLogFormat extends HoodieCommonTestHarness {
           ((HoodieAvroRecord) record).getData().getInsertValue(schema).get());
     }
 
-    assertEquals(scannedRecords.size(), allRecords.stream().mapToLong(Collection::size).sum(),
+    assertEquals(sort(genRecords), sort(scannedRecords),
         "Scanner records count should be the same as appended records");
     scanner.close();
   }
 
+  @ParameterizedTest
+  @MethodSource("testArguments")
+  public void testBasicAppendAndPartialScanning(ExternalSpillableMap.DiskMapType diskMapType,
+                                                boolean isCompressionEnabled,
+                                                boolean readBlocksLazily,
+                                                boolean useScanV2)
+      throws IOException, URISyntaxException, InterruptedException {
+    // Generate 3 delta-log files w/ random records
+    Schema schema = HoodieAvroUtils.addMetadataFields(getSimpleSchema());
+    SchemaTestUtil testUtil = new SchemaTestUtil();
+    List<IndexedRecord> genRecords = testUtil.generateHoodieTestRecords(0, 300);
+
+    Set<HoodieLogFile> logFiles = writeLogFiles(partitionPath, schema, genRecords, 3);
+
+    FileCreateUtils.createDeltaCommit(basePath, "100", fs);
+
+    // scan all log blocks (across multiple log files)
+    HoodieMergedLogRecordScanner scanner = HoodieMergedLogRecordScanner.newBuilder()
+        .withFileSystem(fs)
+        .withBasePath(basePath)
+        .withLogFilePaths(
+            logFiles.stream()
+                .map(logFile -> logFile.getPath().toString()).collect(Collectors.toList()))
+        .withReaderSchema(schema)
+        .withLatestInstantTime("100")
+        .withMaxMemorySizeInBytes(1024L)
+        .withReadBlocksLazily(readBlocksLazily)
+        .withReverseReader(false)
+        .withBufferSize(BUFFER_SIZE)
+        .withSpillableMapBasePath(spillableBasePath)
+        .withDiskMapType(diskMapType)
+        .withBitCaskDiskMapCompressionEnabled(isCompressionEnabled)
+        .withUseScanV2(useScanV2)
+        .withRecordMerger(HoodieRecordUtils.loadRecordMerger(HoodieAvroRecordMerger.class.getName()))
+        .withForceFullScan(false)
+        .build();
+
+    List<String> sampledRecordKeys = Arrays.asList(
+        "b190b1fb-392b-4ceb-932d-a72c906127c2",
+        "409e9ad3-5def-45e7-9180-ef579c1c220b",
+        "e6b31f1c-60a8-4577-acf5-7e8ea318b08b",
+        "0c477a9e-e602-4642-8e96-1cfd357b4ba0",
+        "ea076c17-32ae-4659-8caf-6ad538b4dd8d",
+        "7a943e09-3856-4874-83a1-8ee93e158f94",
+        "9cbff584-d8a4-4b05-868b-dc917d6cf841",
+        "bda0b0d8-0c56-43b0-89f9-e090d924586b",
+        "ee118fb3-69cb-4705-a8c4-88a18e8aa1b7",
+        "cb1fbe4d-06c3-4c9c-aea7-2665ffa8b205"
+    );
+
+    List<IndexedRecord> sampledRecords = genRecords.stream()
+        .filter(r -> sampledRecordKeys.contains(((GenericRecord) r).get(HoodieRecord.RECORD_KEY_METADATA_FIELD).toString()))
+        .collect(Collectors.toList());
+
+    //
+    // Step 1: Scan by a list of keys
+    //
+
+    scanner.scanByFullKeys(sampledRecordKeys);
+
+    List<HoodieRecord> scannedHoodieRecords = new ArrayList<>();
+    List<IndexedRecord> scannedAvroRecords = new ArrayList<>();
+    for (HoodieRecord record : scanner) {
+      scannedHoodieRecords.add(record);
+      scannedAvroRecords.add((IndexedRecord)
+          ((HoodieAvroRecord) record).getData().getInsertValue(schema).get());
+    }
+
+    assertEquals(sort(sampledRecords), sort(scannedAvroRecords));
+
+    //
+    // Step 2: Scan by the same list of keys (no new scanning should be performed,
+    //         in this case, and same _objects_ have to be returned)
+    //
+
+    scanner.scanByFullKeys(sampledRecordKeys);
+
+    List<HoodieRecord> newScannedHoodieRecords = new ArrayList<>();
+    for (HoodieRecord record : scanner) {
+      newScannedHoodieRecords.add(record);
+    }
+
+    assertEquals(scannedHoodieRecords.size(), newScannedHoodieRecords.size());
+
+    for (int i = 0; i < scannedHoodieRecords.size(); ++i) {
+      assertSame(scannedHoodieRecords.get(i), newScannedHoodieRecords.get(i), "Objects have to be identical");
+    }
+
+    scanner.close();
+  }
+
+  @ParameterizedTest
+  @MethodSource("testArguments")
+  public void testBasicAppendAndPartialScanningByKeyPrefixes(ExternalSpillableMap.DiskMapType diskMapType,
+                                                             boolean isCompressionEnabled,
+                                                             boolean readBlocksLazily,
+                                                             boolean useScanV2)
+      throws IOException, URISyntaxException, InterruptedException {
+    // Generate 3 delta-log files w/ random records
+    Schema schema = HoodieAvroUtils.addMetadataFields(getSimpleSchema());
+    SchemaTestUtil testUtil = new SchemaTestUtil();
+    List<IndexedRecord> genRecords = testUtil.generateHoodieTestRecords(0, 300);
+
+    Set<HoodieLogFile> logFiles = writeLogFiles(partitionPath, schema, genRecords, 3);
+
+    FileCreateUtils.createDeltaCommit(basePath, "100", fs);
+
+    // scan all log blocks (across multiple log files)
+    HoodieMergedLogRecordScanner scanner = HoodieMergedLogRecordScanner.newBuilder()
+        .withFileSystem(fs)
+        .withBasePath(basePath)
+        .withLogFilePaths(
+            logFiles.stream()
+                .map(logFile -> logFile.getPath().toString()).collect(Collectors.toList()))
+        .withReaderSchema(schema)
+        .withLatestInstantTime("100")
+        .withMaxMemorySizeInBytes(1024L)
+        .withReadBlocksLazily(readBlocksLazily)
+        .withReverseReader(false)
+        .withBufferSize(BUFFER_SIZE)
+        .withSpillableMapBasePath(spillableBasePath)
+        .withDiskMapType(diskMapType)
+        .withBitCaskDiskMapCompressionEnabled(isCompressionEnabled)
+        .withUseScanV2(useScanV2)
+        .withRecordMerger(HoodieRecordUtils.loadRecordMerger(HoodieAvroRecordMerger.class.getName()))
+        .withForceFullScan(false)
+        .build();
+
+
+    List<String> sampledRecordKeys = Arrays.asList(
+        "00509b14-3d1a-4283-9a8c-c72b971a9d06",
+        "006b2f57-9bf7-4634-910c-c91542ea61e5",
+        "007fc45d-7ce2-45be-8765-0b9082412518",
+        "00826e50-73b4-4cb0-9d5a-375554d5e0f7"
+    );
+
+    List<IndexedRecord> sampledRecords = genRecords.stream()
+        .filter(r -> sampledRecordKeys.contains(((GenericRecord) r).get(HoodieRecord.RECORD_KEY_METADATA_FIELD).toString()))
+        .collect(Collectors.toList());
+
+    List<String> sampledKeyPrefixes = Collections.singletonList("00");
+
+    //
+    // Step 1: Scan by a list of keys
+    //
+
+    scanner.scanByKeyPrefixes(sampledKeyPrefixes);
+
+    List<HoodieRecord> scannedHoodieRecords = new ArrayList<>();
+    List<IndexedRecord> scannedAvroRecords = new ArrayList<>();
+    for (HoodieRecord record : scanner) {
+      scannedHoodieRecords.add(record);
+      scannedAvroRecords.add((IndexedRecord)
+          ((HoodieAvroRecord) record).getData().getInsertValue(schema).get());
+    }
+
+    assertEquals(sort(sampledRecords), sort(scannedAvroRecords));
+
+    //
+    // Step 2: Scan by the same list of keys (no new scanning should be performed,
+    //         in this case, and same _objects_ have to be returned)
+    //
+
+    scanner.scanByKeyPrefixes(sampledKeyPrefixes);
+
+    List<HoodieRecord> newScannedHoodieRecords = new ArrayList<>();
+    for (HoodieRecord record : scanner) {
+      newScannedHoodieRecords.add(record);
+    }
+
+    assertEquals(scannedHoodieRecords.size(), newScannedHoodieRecords.size());
+
+    for (int i = 0; i < scannedHoodieRecords.size(); ++i) {
+      assertSame(scannedHoodieRecords.get(i), newScannedHoodieRecords.get(i), "Objects have to be identical");
+    }
+
+    scanner.close();
+  }
+
   @Test
   public void testAppendAndReadOnCorruptedLog() throws IOException, URISyntaxException, InterruptedException {
     HoodieLogFile logFile = addValidBlock("test-fileId1", "100", 100);
@@ -858,8 +1025,10 @@ public class TestHoodieLogFormat extends HoodieCommonTestHarness {
     Writer writer =
         HoodieLogFormat.newWriterBuilder().onParentPath(partitionPath).withFileExtension(HoodieLogFile.DELTA_EXTENSION)
             .withFileId("test-fileid1").overBaseCommit("100").withFs(fs).withSizeThreshold(500).build();
+    SchemaTestUtil testUtil = new SchemaTestUtil();
+
     // Write 1
-    List<IndexedRecord> records1 = SchemaTestUtil.generateHoodieTestRecords(0, 100);
+    List<IndexedRecord> records1 = testUtil.generateHoodieTestRecords(0, 100);
     List<IndexedRecord> copyOfRecords1 = records1.stream()
         .map(record -> HoodieAvroUtils.rewriteRecord((GenericRecord) record, schema)).collect(Collectors.toList());
 
@@ -870,7 +1039,7 @@ public class TestHoodieLogFormat extends HoodieCommonTestHarness {
     writer.appendBlock(dataBlock);
 
     // Write 2
-    List<IndexedRecord> records2 = SchemaTestUtil.generateHoodieTestRecords(0, 100);
+    List<IndexedRecord> records2 = testUtil.generateHoodieTestRecords(0, 100);
     List<IndexedRecord> copyOfRecords2 = records2.stream()
         .map(record -> HoodieAvroUtils.rewriteRecord((GenericRecord) record, schema)).collect(Collectors.toList());
     header.put(HoodieLogBlock.HeaderMetadataType.SCHEMA, schema.toString());
@@ -926,7 +1095,8 @@ public class TestHoodieLogFormat extends HoodieCommonTestHarness {
             .withFileId("test-fileid1").overBaseCommit("100").withFs(fs).build();
 
     // Write 1
-    List<IndexedRecord> records1 = SchemaTestUtil.generateHoodieTestRecords(0, 100);
+    SchemaTestUtil testUtil = new SchemaTestUtil();
+    List<IndexedRecord> records1 = testUtil.generateHoodieTestRecords(0, 100);
     List<IndexedRecord> copyOfRecords1 = records1.stream()
         .map(record -> HoodieAvroUtils.rewriteRecord((GenericRecord) record, schema)).collect(Collectors.toList());
     Map<HoodieLogBlock.HeaderMetadataType, String> header = new HashMap<>();
@@ -939,7 +1109,7 @@ public class TestHoodieLogFormat extends HoodieCommonTestHarness {
     // Write 2
     header = new HashMap<>();
     header.put(HoodieLogBlock.HeaderMetadataType.INSTANT_TIME, "101");
-    List<IndexedRecord> records2 = SchemaTestUtil.generateHoodieTestRecords(0, 100);
+    List<IndexedRecord> records2 = testUtil.generateHoodieTestRecords(0, 100);
     header.put(HoodieLogBlock.HeaderMetadataType.SCHEMA, schema.toString());
     dataBlock = getDataBlock(DEFAULT_DATA_BLOCK_TYPE, records2, header);
     writer.appendBlock(dataBlock);
@@ -956,7 +1126,7 @@ public class TestHoodieLogFormat extends HoodieCommonTestHarness {
     // Write 3
     header = new HashMap<>();
     header.put(HoodieLogBlock.HeaderMetadataType.INSTANT_TIME, "102");
-    List<IndexedRecord> records3 = SchemaTestUtil.generateHoodieTestRecords(0, 100);
+    List<IndexedRecord> records3 = testUtil.generateHoodieTestRecords(0, 100);
     List<IndexedRecord> copyOfRecords3 = records3.stream()
         .map(record -> HoodieAvroUtils.rewriteRecord((GenericRecord) record, schema)).collect(Collectors.toList());
     header.put(HoodieLogBlock.HeaderMetadataType.SCHEMA, schema.toString());
@@ -1011,7 +1181,8 @@ public class TestHoodieLogFormat extends HoodieCommonTestHarness {
             .withFileId("test-fileid1").overBaseCommit("100").withFs(fs).build();
 
     // Write 1
-    List<IndexedRecord> records1 = SchemaTestUtil.generateHoodieTestRecords(0, 100);
+    SchemaTestUtil testUtil = new SchemaTestUtil();
+    List<IndexedRecord> records1 = testUtil.generateHoodieTestRecords(0, 100);
     List<IndexedRecord> copyOfRecords1 = records1.stream()
         .map(record -> HoodieAvroUtils.rewriteRecord((GenericRecord) record, schema)).collect(Collectors.toList());
     Map<HoodieLogBlock.HeaderMetadataType, String> header = new HashMap<>();
@@ -1046,7 +1217,7 @@ public class TestHoodieLogFormat extends HoodieCommonTestHarness {
             .withFileId("test-fileid1").overBaseCommit("100").withFs(fs).build();
     // Write 3
     header.put(HoodieLogBlock.HeaderMetadataType.INSTANT_TIME, "103");
-    List<IndexedRecord> records3 = SchemaTestUtil.generateHoodieTestRecords(0, 100);
+    List<IndexedRecord> records3 = testUtil.generateHoodieTestRecords(0, 100);
     List<IndexedRecord> copyOfRecords3 = records3.stream()
         .map(record -> HoodieAvroUtils.rewriteRecord((GenericRecord) record, schema)).collect(Collectors.toList());
 
@@ -1103,7 +1274,8 @@ public class TestHoodieLogFormat extends HoodieCommonTestHarness {
             .withFileId("test-fileid1").overBaseCommit("100").withFs(fs).build();
 
     // Write 1
-    List<IndexedRecord> records1 = SchemaTestUtil.generateHoodieTestRecords(0, 100);
+    SchemaTestUtil testUtil = new SchemaTestUtil();
+    List<IndexedRecord> records1 = testUtil.generateHoodieTestRecords(0, 100);
     List<IndexedRecord> copyOfRecords1 = records1.stream()
         .map(record -> HoodieAvroUtils.rewriteRecord((GenericRecord) record, schema)).collect(Collectors.toList());
     Map<HoodieLogBlock.HeaderMetadataType, String> header = new HashMap<>();
@@ -1114,7 +1286,7 @@ public class TestHoodieLogFormat extends HoodieCommonTestHarness {
 
     // Write 2
     header.put(HoodieLogBlock.HeaderMetadataType.INSTANT_TIME, "101");
-    List<IndexedRecord> records2 = SchemaTestUtil.generateHoodieTestRecords(0, 100);
+    List<IndexedRecord> records2 = testUtil.generateHoodieTestRecords(0, 100);
     List<IndexedRecord> copyOfRecords2 = records2.stream()
         .map(record -> HoodieAvroUtils.rewriteRecord((GenericRecord) record, schema)).collect(Collectors.toList());
     dataBlock = getDataBlock(DEFAULT_DATA_BLOCK_TYPE, records2, header);
@@ -1244,7 +1416,8 @@ public class TestHoodieLogFormat extends HoodieCommonTestHarness {
             .withFileId("test-fileid1").overBaseCommit("100").withFs(fs).build();
 
     // Write 1
-    List<IndexedRecord> records1 = SchemaTestUtil.generateHoodieTestRecords(0, 100);
+    SchemaTestUtil testUtil = new SchemaTestUtil();
+    List<IndexedRecord> records1 = testUtil.generateHoodieTestRecords(0, 100);
     List<IndexedRecord> copyOfRecords1 = records1.stream()
         .map(record -> HoodieAvroUtils.rewriteRecord((GenericRecord) record, schema)).collect(Collectors.toList());
     Map<HoodieLogBlock.HeaderMetadataType, String> header = new HashMap<>();
@@ -1255,7 +1428,7 @@ public class TestHoodieLogFormat extends HoodieCommonTestHarness {
 
     // Write 2
     header.put(HoodieLogBlock.HeaderMetadataType.INSTANT_TIME, "101");
-    List<IndexedRecord> records2 = SchemaTestUtil.generateHoodieTestRecords(0, 100);
+    List<IndexedRecord> records2 = testUtil.generateHoodieTestRecords(0, 100);
     List<IndexedRecord> copyOfRecords2 = records2.stream()
         .map(record -> HoodieAvroUtils.rewriteRecord((GenericRecord) record, schema)).collect(Collectors.toList());
     dataBlock = getDataBlock(DEFAULT_DATA_BLOCK_TYPE, records2, header);
@@ -1367,7 +1540,8 @@ public class TestHoodieLogFormat extends HoodieCommonTestHarness {
             .withFileId("test-fileid1").overBaseCommit("100").withFs(fs).build();
 
     // Write 1
-    List<IndexedRecord> records1 = SchemaTestUtil.generateHoodieTestRecords(0, 100);
+    SchemaTestUtil testUtil = new SchemaTestUtil();
+    List<IndexedRecord> records1 = testUtil.generateHoodieTestRecords(0, 100);
     List<IndexedRecord> copyOfRecords1 = records1.stream()
         .map(record -> HoodieAvroUtils.rewriteRecord((GenericRecord) record, schema)).collect(Collectors.toList());
     Map<HoodieLogBlock.HeaderMetadataType, String> header = new HashMap<>();
@@ -1379,7 +1553,7 @@ public class TestHoodieLogFormat extends HoodieCommonTestHarness {
     writer.appendBlock(dataBlock);
 
     // Write 2
-    List<IndexedRecord> records2 = SchemaTestUtil.generateHoodieTestRecords(0, 100);
+    List<IndexedRecord> records2 = testUtil.generateHoodieTestRecords(0, 100);
     header.put(HoodieLogBlock.HeaderMetadataType.SCHEMA, schema.toString());
     dataBlock = getDataBlock(DEFAULT_DATA_BLOCK_TYPE, records2, header);
     writer.appendBlock(dataBlock);
@@ -1457,7 +1631,8 @@ public class TestHoodieLogFormat extends HoodieCommonTestHarness {
             .withFileId("test-fileid1").overBaseCommit("100").withFs(fs).build();
 
     // Write 1
-    List<IndexedRecord> records1 = SchemaTestUtil.generateHoodieTestRecords(0, 100);
+    SchemaTestUtil testUtil = new SchemaTestUtil();
+    List<IndexedRecord> records1 = testUtil.generateHoodieTestRecords(0, 100);
     List<IndexedRecord> copyOfRecords1 = records1.stream()
         .map(record -> HoodieAvroUtils.rewriteRecord((GenericRecord) record, schema)).collect(Collectors.toList());
     Map<HoodieLogBlock.HeaderMetadataType, String> header = new HashMap<>();
@@ -1524,7 +1699,8 @@ public class TestHoodieLogFormat extends HoodieCommonTestHarness {
             .withFileId("test-fileid1").overBaseCommit("100").withFs(fs).build();
 
     // Write 1
-    List<IndexedRecord> records1 = SchemaTestUtil.generateHoodieTestRecords(0, 100);
+    SchemaTestUtil testUtil = new SchemaTestUtil();
+    List<IndexedRecord> records1 = testUtil.generateHoodieTestRecords(0, 100);
     Map<HoodieLogBlock.HeaderMetadataType, String> header = new HashMap<>();
     header.put(HoodieLogBlock.HeaderMetadataType.INSTANT_TIME, "100");
     header.put(HoodieLogBlock.HeaderMetadataType.SCHEMA, schema.toString());
@@ -1584,7 +1760,8 @@ public class TestHoodieLogFormat extends HoodieCommonTestHarness {
             .withFileId("test-fileid1").overBaseCommit("100").withFs(fs).build();
 
     // Write 1
-    List<IndexedRecord> records1 = SchemaTestUtil.generateHoodieTestRecords(0, 100);
+    SchemaTestUtil testUtil = new SchemaTestUtil();
+    List<IndexedRecord> records1 = testUtil.generateHoodieTestRecords(0, 100);
     List<IndexedRecord> copyOfRecords1 = records1.stream()
         .map(record -> HoodieAvroUtils.rewriteRecord((GenericRecord) record, schema)).collect(Collectors.toList());
     Map<HoodieLogBlock.HeaderMetadataType, String> header = new HashMap<>();
@@ -1656,7 +1833,8 @@ public class TestHoodieLogFormat extends HoodieCommonTestHarness {
             .withFileId("test-fileid1").overBaseCommit("100").withFs(fs).build();
 
     // Write 1
-    List<IndexedRecord> records1 = SchemaTestUtil.generateHoodieTestRecords(0, 100);
+    SchemaTestUtil testUtil = new SchemaTestUtil();
+    List<IndexedRecord> records1 = testUtil.generateHoodieTestRecords(0, 100);
     Map<HoodieLogBlock.HeaderMetadataType, String> header = new HashMap<>();
     header.put(HoodieLogBlock.HeaderMetadataType.INSTANT_TIME, "100");
     header.put(HoodieLogBlock.HeaderMetadataType.SCHEMA, schema.toString());
@@ -1770,7 +1948,8 @@ public class TestHoodieLogFormat extends HoodieCommonTestHarness {
             .withFileId("test-fileid1").overBaseCommit("100").withFs(fs).build();
 
     // Write 1st data blocks multiple times.
-    List<IndexedRecord> records1 = SchemaTestUtil.generateHoodieTestRecords(0, 100);
+    SchemaTestUtil testUtil = new SchemaTestUtil();
+    List<IndexedRecord> records1 = testUtil.generateHoodieTestRecords(0, 100);
     Set<String> recordKeysOfFirstTwoBatches = records1.stream()
         .map(record -> HoodieAvroUtils.rewriteRecord((GenericRecord) record, schema)
             .get(HoodieRecord.RECORD_KEY_METADATA_FIELD).toString()).collect(Collectors.toSet());
@@ -1783,7 +1962,7 @@ public class TestHoodieLogFormat extends HoodieCommonTestHarness {
     FileCreateUtils.createDeltaCommit(basePath, "100", fs);
 
     // Write 2nd data block
-    List<IndexedRecord> records2 = SchemaTestUtil.generateHoodieTestRecords(0, 100);
+    List<IndexedRecord> records2 = testUtil.generateHoodieTestRecords(0, 100);
     recordKeysOfFirstTwoBatches.addAll(records2.stream()
         .map(record -> HoodieAvroUtils.rewriteRecord((GenericRecord) record, schema)
             .get(HoodieRecord.RECORD_KEY_METADATA_FIELD).toString()).collect(Collectors.toList()));
@@ -1796,7 +1975,7 @@ public class TestHoodieLogFormat extends HoodieCommonTestHarness {
     FileCreateUtils.createDeltaCommit(basePath, "101", fs);
 
     // Write 3rd data block
-    List<IndexedRecord> records3 = SchemaTestUtil.generateHoodieTestRecords(0, 100);
+    List<IndexedRecord> records3 = testUtil.generateHoodieTestRecords(0, 100);
     Set<String> recordKeysOfFirstThreeBatches = new HashSet<>(recordKeysOfFirstTwoBatches);
     recordKeysOfFirstThreeBatches.addAll(records3.stream()
         .map(record -> HoodieAvroUtils.rewriteRecord((GenericRecord) record, schema)
@@ -1865,7 +2044,7 @@ public class TestHoodieLogFormat extends HoodieCommonTestHarness {
     FileCreateUtils.createDeltaCommit(basePath, "104", fs);
 
     // Write 6th data block
-    List<IndexedRecord> records6 = SchemaTestUtil.generateHoodieTestRecords(0, 100);
+    List<IndexedRecord> records6 = testUtil.generateHoodieTestRecords(0, 100);
     header = new HashMap<>();
     header.put(HoodieLogBlock.HeaderMetadataType.INSTANT_TIME, "105");
     header.put(HoodieLogBlock.HeaderMetadataType.SCHEMA, schema.toString());
@@ -1875,7 +2054,7 @@ public class TestHoodieLogFormat extends HoodieCommonTestHarness {
     FileCreateUtils.createDeltaCommit(basePath, "105", fs);
 
     // Write 7th data block
-    List<IndexedRecord> records7 = SchemaTestUtil.generateHoodieTestRecords(0, 100);
+    List<IndexedRecord> records7 = testUtil.generateHoodieTestRecords(0, 100);
     header = new HashMap<>();
     header.put(HoodieLogBlock.HeaderMetadataType.INSTANT_TIME, "106");
     header.put(HoodieLogBlock.HeaderMetadataType.SCHEMA, schema.toString());
@@ -1885,7 +2064,7 @@ public class TestHoodieLogFormat extends HoodieCommonTestHarness {
     FileCreateUtils.createDeltaCommit(basePath, "106", fs);
 
     // Write 8th data block
-    List<IndexedRecord> records8 = SchemaTestUtil.generateHoodieTestRecords(0, 100);
+    List<IndexedRecord> records8 = testUtil.generateHoodieTestRecords(0, 100);
     header = new HashMap<>();
     header.put(HoodieLogBlock.HeaderMetadataType.INSTANT_TIME, "107");
     header.put(HoodieLogBlock.HeaderMetadataType.SCHEMA, schema.toString());
@@ -1964,7 +2143,8 @@ public class TestHoodieLogFormat extends HoodieCommonTestHarness {
     try {
       // Write one Data block with same InstantTime (written in same batch)
       Schema schema = HoodieAvroUtils.addMetadataFields(getSimpleSchema());
-      List<IndexedRecord> records = SchemaTestUtil.generateHoodieTestRecords(0, 101);
+      SchemaTestUtil testUtil = new SchemaTestUtil();
+      List<IndexedRecord> records = testUtil.generateHoodieTestRecords(0, 101);
       List<IndexedRecord> records2 = new ArrayList<>(records);
 
       // Write1 with numRecordsInLog1 records written to log.1
@@ -2360,12 +2540,12 @@ public class TestHoodieLogFormat extends HoodieCommonTestHarness {
     }
   }
 
-  private HoodieDataBlock getDataBlock(HoodieLogBlockType dataBlockType, List<IndexedRecord> records,
+  private static HoodieDataBlock getDataBlock(HoodieLogBlockType dataBlockType, List<IndexedRecord> records,
                                        Map<HeaderMetadataType, String> header) {
     return getDataBlock(dataBlockType, records.stream().map(HoodieAvroIndexedRecord::new).collect(Collectors.toList()), header, new Path("dummy_path"));
   }
 
-  private HoodieDataBlock getDataBlock(HoodieLogBlockType dataBlockType, List<HoodieRecord> records,
+  private static HoodieDataBlock getDataBlock(HoodieLogBlockType dataBlockType, List<HoodieRecord> records,
                                        Map<HeaderMetadataType, String> header, Path pathForReader) {
     switch (dataBlockType) {
       case CDC_DATA_BLOCK:
@@ -2417,6 +2597,41 @@ public class TestHoodieLogFormat extends HoodieCommonTestHarness {
     );
   }
 
+  private static Set<HoodieLogFile> writeLogFiles(Path partitionPath,
+                                                  Schema schema,
+                                                  List<IndexedRecord> records,
+                                                  int numFiles) throws IOException, InterruptedException {
+    Writer writer =
+        HoodieLogFormat.newWriterBuilder().onParentPath(partitionPath).withFileExtension(HoodieLogFile.DELTA_EXTENSION)
+            .withSizeThreshold(1024).withFileId("test-fileid1").overBaseCommit("100").withFs(fs).build();
+    Map<HoodieLogBlock.HeaderMetadataType, String> header = new HashMap<>();
+    header.put(HoodieLogBlock.HeaderMetadataType.INSTANT_TIME, "100");
+    header.put(HoodieLogBlock.HeaderMetadataType.SCHEMA, schema.toString());
+
+    Set<HoodieLogFile> logFiles = new HashSet<>();
+
+    // Create log files
+    int recordsPerFile = records.size() / numFiles;
+    int filesWritten = 0;
+
+    while (filesWritten < numFiles) {
+      int targetRecordsCount = filesWritten == numFiles - 1
+          ? recordsPerFile + (records.size() % recordsPerFile)
+          : recordsPerFile;
+      int offset = filesWritten * recordsPerFile;
+      List<IndexedRecord> targetRecords = records.subList(offset, offset + targetRecordsCount);
+
+      logFiles.add(writer.getLogFile());
+      writer.appendBlock(getDataBlock(DEFAULT_DATA_BLOCK_TYPE, targetRecords, header));
+
+      filesWritten++;
+    }
+
+    writer.close();
+
+    return logFiles;
+  }
+
   /**
    * Utility to convert the given iterator to a List.
    */
@@ -2427,4 +2642,10 @@ public class TestHoodieLogFormat extends HoodieCommonTestHarness {
     itr.forEachRemaining(r -> elements.add(r.getData()));
     return elements;
   }
+
+  private static List<IndexedRecord> sort(List<IndexedRecord> records) {
+    List<IndexedRecord> copy = new ArrayList<>(records);
+    copy.sort(Comparator.comparing(r -> ((GenericRecord) r).get(HoodieRecord.RECORD_KEY_METADATA_FIELD).toString()));
+    return copy;
+  }
 }
diff --git a/hudi-common/src/test/java/org/apache/hudi/common/model/TestHoodieRecord.java b/hudi-common/src/test/java/org/apache/hudi/common/model/TestHoodieRecord.java
index b6bbc34cc3d..1b4774b2160 100644
--- a/hudi-common/src/test/java/org/apache/hudi/common/model/TestHoodieRecord.java
+++ b/hudi-common/src/test/java/org/apache/hudi/common/model/TestHoodieRecord.java
@@ -42,7 +42,8 @@ public class TestHoodieRecord {
 
   @BeforeEach
   public void setUp() throws Exception {
-    final List<IndexedRecord> indexedRecords = SchemaTestUtil.generateHoodieTestRecords(0, 1);
+    SchemaTestUtil testUtil = new SchemaTestUtil();
+    final List<IndexedRecord> indexedRecords = testUtil.generateHoodieTestRecords(0, 1);
     final List<HoodieRecord> hoodieRecords =
         indexedRecords.stream().map(r -> new HoodieAvroRecord(new HoodieKey(UUID.randomUUID().toString(), "0000/00/00"),
             new AvroBinaryTestPayload(Option.of((GenericRecord) r)))).collect(Collectors.toList());
diff --git a/hudi-common/src/test/java/org/apache/hudi/common/testutils/SchemaTestUtil.java b/hudi-common/src/test/java/org/apache/hudi/common/testutils/SchemaTestUtil.java
index 70d5a1bb3e9..69a2881f400 100644
--- a/hudi-common/src/test/java/org/apache/hudi/common/testutils/SchemaTestUtil.java
+++ b/hudi-common/src/test/java/org/apache/hudi/common/testutils/SchemaTestUtil.java
@@ -53,10 +53,11 @@ import java.util.Iterator;
 import java.util.List;
 import java.util.Map;
 import java.util.Random;
-import java.util.UUID;
 import java.util.stream.Collectors;
 import java.util.stream.Stream;
 
+import static org.apache.hudi.common.testutils.HoodieTestDataGenerator.genPseudoRandomUUID;
+
 /**
  * A utility class for testing schema.
  */
@@ -64,6 +65,10 @@ public final class SchemaTestUtil {
 
   private static final String RESOURCE_SAMPLE_DATA = "/sample.data";
 
+  private final Random random = new Random(0xDEED);
+
+  public SchemaTestUtil() {}
+
   public static Schema getSimpleSchema() throws IOException {
     return new Schema.Parser().parse(SchemaTestUtil.class.getResourceAsStream("/simple-test.avsc"));
   }
@@ -131,25 +136,24 @@ public final class SchemaTestUtil {
     return fs.getPath(array[1]);
   }
 
-  public static List<IndexedRecord> generateHoodieTestRecords(int from, int limit)
+  public List<IndexedRecord> generateHoodieTestRecords(int from, int limit)
       throws IOException, URISyntaxException {
     List<IndexedRecord> records = generateTestRecords(from, limit);
     String instantTime = HoodieActiveTimeline.createNewInstantTime();
     Schema hoodieFieldsSchema = HoodieAvroUtils.addMetadataFields(getSimpleSchema());
     return records.stream().map(s -> HoodieAvroUtils.rewriteRecord((GenericRecord) s, hoodieFieldsSchema)).map(p -> {
-      p.put(HoodieRecord.RECORD_KEY_METADATA_FIELD, UUID.randomUUID().toString());
+      p.put(HoodieRecord.RECORD_KEY_METADATA_FIELD, genRandomUUID());
       p.put(HoodieRecord.PARTITION_PATH_METADATA_FIELD, "0000/00/00");
       p.put(HoodieRecord.COMMIT_TIME_METADATA_FIELD, instantTime);
       return p;
     }).collect(Collectors.toList());
-
   }
 
-  public static List<HoodieRecord> generateHoodieTestRecords(int from, int limit, Schema schema)
+  public List<HoodieRecord> generateHoodieTestRecords(int from, int limit, Schema schema)
       throws IOException, URISyntaxException {
     List<IndexedRecord> records = generateTestRecords(from, limit);
     return records.stream().map(s -> HoodieAvroUtils.rewriteRecord((GenericRecord) s, schema))
-        .map(p -> convertToHoodieRecords(p, UUID.randomUUID().toString(), "000/00/00")).collect(Collectors.toList());
+        .map(p -> convertToHoodieRecords(p, genRandomUUID(), "000/00/00")).collect(Collectors.toList());
   }
 
   private static HoodieRecord convertToHoodieRecords(IndexedRecord iRecord, String key, String partitionPath) {
@@ -169,11 +173,10 @@ public final class SchemaTestUtil {
 
   }
 
-  public static List<HoodieRecord> generateHoodieTestRecordsWithoutHoodieMetadata(int from, int limit)
+  public List<HoodieRecord> generateHoodieTestRecordsWithoutHoodieMetadata(int from, int limit)
       throws IOException, URISyntaxException {
-
     List<IndexedRecord> iRecords = generateTestRecords(from, limit);
-    return iRecords.stream().map(r -> new HoodieAvroRecord<>(new HoodieKey(UUID.randomUUID().toString(), "0000/00/00"),
+    return iRecords.stream().map(r -> new HoodieAvroRecord<>(new HoodieKey(genRandomUUID(), "0000/00/00"),
         new HoodieAvroPayload(Option.of((GenericRecord) r)))).collect(Collectors.toList());
   }
 
@@ -358,4 +361,8 @@ public final class SchemaTestUtil {
       };
     }
   }
+
+  private String genRandomUUID() {
+    return genPseudoRandomUUID(random).toString();
+  }
 }
diff --git a/hudi-common/src/test/java/org/apache/hudi/common/util/collection/TestBitCaskDiskMap.java b/hudi-common/src/test/java/org/apache/hudi/common/util/collection/TestBitCaskDiskMap.java
index 9bbe4277162..ae62a718400 100755
--- a/hudi-common/src/test/java/org/apache/hudi/common/util/collection/TestBitCaskDiskMap.java
+++ b/hudi-common/src/test/java/org/apache/hudi/common/util/collection/TestBitCaskDiskMap.java
@@ -75,7 +75,8 @@ public class TestBitCaskDiskMap extends HoodieCommonTestHarness {
   @ValueSource(booleans = {false, true})
   public void testSimpleInsert(boolean isCompressionEnabled) throws IOException, URISyntaxException {
     BitCaskDiskMap records = new BitCaskDiskMap<>(basePath, isCompressionEnabled);
-    List<IndexedRecord> iRecords = SchemaTestUtil.generateHoodieTestRecords(0, 100);
+    SchemaTestUtil testUtil = new SchemaTestUtil();
+    List<IndexedRecord> iRecords = testUtil.generateHoodieTestRecords(0, 100);
     List<String> recordKeys = SpillableMapTestUtils.upsertRecords(iRecords, records);
 
     Map<String, IndexedRecord> originalRecords = iRecords.stream()
@@ -100,7 +101,8 @@ public class TestBitCaskDiskMap extends HoodieCommonTestHarness {
   @ValueSource(booleans = {false, true})
   public void testSimpleInsertWithoutHoodieMetadata(boolean isCompressionEnabled) throws IOException, URISyntaxException {
     BitCaskDiskMap records = new BitCaskDiskMap<>(basePath, isCompressionEnabled);
-    List<HoodieRecord> hoodieRecords = SchemaTestUtil.generateHoodieTestRecordsWithoutHoodieMetadata(0, 1000);
+    SchemaTestUtil testUtil = new SchemaTestUtil();
+    List<HoodieRecord> hoodieRecords = testUtil.generateHoodieTestRecordsWithoutHoodieMetadata(0, 1000);
     Set<String> recordKeys = new HashSet<>();
     // insert generated records into the map
     hoodieRecords.forEach(r -> {
@@ -126,7 +128,8 @@ public class TestBitCaskDiskMap extends HoodieCommonTestHarness {
     Schema schema = HoodieAvroUtils.addMetadataFields(getSimpleSchema());
 
     BitCaskDiskMap records = new BitCaskDiskMap<>(basePath, isCompressionEnabled);
-    List<IndexedRecord> iRecords = SchemaTestUtil.generateHoodieTestRecords(0, 100);
+    SchemaTestUtil testUtil = new SchemaTestUtil();
+    List<IndexedRecord> iRecords = testUtil.generateHoodieTestRecords(0, 100);
 
     // perform some inserts
     List<String> recordKeys = SpillableMapTestUtils.upsertRecords(iRecords, records);
@@ -137,7 +140,7 @@ public class TestBitCaskDiskMap extends HoodieCommonTestHarness {
 
     // generate updates from inserts
     List<IndexedRecord> updatedRecords = SchemaTestUtil.updateHoodieTestRecords(recordKeys,
-        SchemaTestUtil.generateHoodieTestRecords(0, 100), HoodieActiveTimeline.createNewInstantTime());
+        testUtil.generateHoodieTestRecords(0, 100), HoodieActiveTimeline.createNewInstantTime());
     String newCommitTime =
         ((GenericRecord) updatedRecords.get(0)).get(HoodieRecord.COMMIT_TIME_METADATA_FIELD).toString();
 
@@ -169,7 +172,8 @@ public class TestBitCaskDiskMap extends HoodieCommonTestHarness {
     Schema schema = SchemaTestUtil.getSimpleSchema();
 
     // Test sizeEstimator without hoodie metadata fields
-    List<HoodieRecord> hoodieRecords = SchemaTestUtil.generateHoodieTestRecords(0, 1, schema);
+    SchemaTestUtil testUtil = new SchemaTestUtil();
+    List<HoodieRecord> hoodieRecords = testUtil.generateHoodieTestRecords(0, 1, schema);
 
     long payloadSize =
         SpillableMapUtils.computePayloadSize(hoodieRecords.remove(0), new HoodieRecordSizeEstimator(schema));
@@ -177,7 +181,7 @@ public class TestBitCaskDiskMap extends HoodieCommonTestHarness {
 
     // Test sizeEstimator with hoodie metadata fields
     schema = HoodieAvroUtils.addMetadataFields(schema);
-    hoodieRecords = SchemaTestUtil.generateHoodieTestRecords(0, 1, schema);
+    hoodieRecords = testUtil.generateHoodieTestRecords(0, 1, schema);
     payloadSize = SpillableMapUtils.computePayloadSize(hoodieRecords.remove(0), new HoodieRecordSizeEstimator(schema));
     assertTrue(payloadSize > 0);
 
@@ -185,7 +189,7 @@ public class TestBitCaskDiskMap extends HoodieCommonTestHarness {
 
     // Test sizeEstimator without hoodie metadata fields and without schema object in the payload
     schema = SchemaTestUtil.getSimpleSchema();
-    List<IndexedRecord> indexedRecords = SchemaTestUtil.generateHoodieTestRecords(0, 1);
+    List<IndexedRecord> indexedRecords = testUtil.generateHoodieTestRecords(0, 1);
     hoodieRecords =
         indexedRecords.stream().map(r -> new HoodieAvroRecord<>(new HoodieKey(UUID.randomUUID().toString(), "0000/00/00"),
             new AvroBinaryTestPayload(Option.of((GenericRecord) r)))).collect(Collectors.toList());
@@ -194,7 +198,7 @@ public class TestBitCaskDiskMap extends HoodieCommonTestHarness {
 
     // Test sizeEstimator with hoodie metadata fields and without schema object in the payload
     final Schema simpleSchemaWithMetadata = HoodieAvroUtils.addMetadataFields(SchemaTestUtil.getSimpleSchema());
-    indexedRecords = SchemaTestUtil.generateHoodieTestRecords(0, 1);
+    indexedRecords = testUtil.generateHoodieTestRecords(0, 1);
     hoodieRecords = indexedRecords.stream()
         .map(r -> new HoodieAvroRecord<>(new HoodieKey(UUID.randomUUID().toString(), "0000/00/00"),
             new AvroBinaryTestPayload(
@@ -208,7 +212,8 @@ public class TestBitCaskDiskMap extends HoodieCommonTestHarness {
   @ValueSource(booleans = {false, true})
   public void testPutAll(boolean isCompressionEnabled) throws IOException, URISyntaxException {
     BitCaskDiskMap<String, HoodieRecord> records = new BitCaskDiskMap<>(basePath, isCompressionEnabled);
-    List<IndexedRecord> iRecords = SchemaTestUtil.generateHoodieTestRecords(0, 100);
+    SchemaTestUtil testUtil = new SchemaTestUtil();
+    List<IndexedRecord> iRecords = testUtil.generateHoodieTestRecords(0, 100);
     Map<String, HoodieRecord> recordMap = new HashMap<>();
     iRecords.forEach(r -> {
       String key = ((GenericRecord) r).get(HoodieRecord.RECORD_KEY_METADATA_FIELD).toString();
@@ -235,7 +240,8 @@ public class TestBitCaskDiskMap extends HoodieCommonTestHarness {
   public void testSizeEstimatorPerformance() throws IOException, URISyntaxException {
     // Test sizeEstimatorPerformance with simpleSchema
     Schema schema = SchemaTestUtil.getSimpleSchema();
-    List<HoodieRecord> hoodieRecords = SchemaTestUtil.generateHoodieTestRecords(0, 1, schema);
+    SchemaTestUtil testUtil = new SchemaTestUtil();
+    List<HoodieRecord> hoodieRecords = testUtil.generateHoodieTestRecords(0, 1, schema);
     HoodieRecordSizeEstimator sizeEstimator = new HoodieRecordSizeEstimator<>(schema);
     HoodieRecord record = hoodieRecords.remove(0);
     long startTime = System.currentTimeMillis();
diff --git a/hudi-common/src/test/java/org/apache/hudi/common/util/collection/TestExternalSpillableMap.java b/hudi-common/src/test/java/org/apache/hudi/common/util/collection/TestExternalSpillableMap.java
index e33baf1493a..4cd34dbdab1 100644
--- a/hudi-common/src/test/java/org/apache/hudi/common/util/collection/TestExternalSpillableMap.java
+++ b/hudi-common/src/test/java/org/apache/hudi/common/util/collection/TestExternalSpillableMap.java
@@ -82,7 +82,8 @@ public class TestExternalSpillableMap extends HoodieCommonTestHarness {
         new ExternalSpillableMap<>(16L, basePath, new DefaultSizeEstimator(),
             new HoodieRecordSizeEstimator(schema), diskMapType, isCompressionEnabled); // 16B
 
-    List<IndexedRecord> iRecords = SchemaTestUtil.generateHoodieTestRecords(0, 100);
+    SchemaTestUtil testUtil = new SchemaTestUtil();
+    List<IndexedRecord> iRecords = testUtil.generateHoodieTestRecords(0, 100);
     List<String> recordKeys = SpillableMapTestUtils.upsertRecords(iRecords, records);
     assert (recordKeys.size() == 100);
 
@@ -115,7 +116,8 @@ public class TestExternalSpillableMap extends HoodieCommonTestHarness {
         new ExternalSpillableMap<>(16L, basePath, new DefaultSizeEstimator(),
             new HoodieRecordSizeEstimator(schema), diskMapType, isCompressionEnabled); // 16B
 
-    List<IndexedRecord> iRecords = SchemaTestUtil.generateHoodieTestRecords(0, 100);
+    SchemaTestUtil testUtil = new SchemaTestUtil();
+    List<IndexedRecord> iRecords = testUtil.generateHoodieTestRecords(0, 100);
     List<String> recordKeys = SpillableMapTestUtils.upsertRecords(iRecords, records);
     assert (recordKeys.size() == 100);
     Iterator<HoodieRecord<? extends HoodieRecordPayload>> itr = records.iterator();
@@ -124,7 +126,7 @@ public class TestExternalSpillableMap extends HoodieCommonTestHarness {
       assert recordKeys.contains(rec.getRecordKey());
     }
     List<IndexedRecord> updatedRecords = SchemaTestUtil.updateHoodieTestRecords(recordKeys,
-        SchemaTestUtil.generateHoodieTestRecords(0, 100), HoodieActiveTimeline.createNewInstantTime());
+        testUtil.generateHoodieTestRecords(0, 100), HoodieActiveTimeline.createNewInstantTime());
 
     // update records already inserted
     SpillableMapTestUtils.upsertRecords(updatedRecords, records);
@@ -154,7 +156,8 @@ public class TestExternalSpillableMap extends HoodieCommonTestHarness {
         new ExternalSpillableMap<>(16L, basePath, new DefaultSizeEstimator(),
             new HoodieRecordSizeEstimator(schema), diskMapType, isCompressionEnabled); // 16B
 
-    List<IndexedRecord> iRecords = SchemaTestUtil.generateHoodieTestRecords(0, 100);
+    SchemaTestUtil testUtil = new SchemaTestUtil();
+    List<IndexedRecord> iRecords = testUtil.generateHoodieTestRecords(0, 100);
     // insert a bunch of records so that values spill to disk too
     List<String> recordKeys = SpillableMapTestUtils.upsertRecords(iRecords, records);
     IndexedRecord inMemoryRecord = iRecords.get(0);
@@ -210,7 +213,8 @@ public class TestExternalSpillableMap extends HoodieCommonTestHarness {
         failureOutputPath, new DefaultSizeEstimator(),
         new HoodieRecordSizeEstimator(schema), diskMapType, isCompressionEnabled); // 16B
 
-    List<IndexedRecord> iRecords = SchemaTestUtil.generateHoodieTestRecords(0, 100);
+    SchemaTestUtil testUtil = new SchemaTestUtil();
+    List<IndexedRecord> iRecords = testUtil.generateHoodieTestRecords(0, 100);
     List<String> recordKeys = SpillableMapTestUtils.upsertRecords(iRecords, records);
     assert (recordKeys.size() == 100);
     Iterator<HoodieRecord<? extends HoodieRecordPayload>> itr = records.iterator();
@@ -236,7 +240,8 @@ public class TestExternalSpillableMap extends HoodieCommonTestHarness {
     List<String> recordKeys = new ArrayList<>();
     // Ensure we spill to disk
     while (records.getDiskBasedMapNumEntries() < 1) {
-      List<IndexedRecord> iRecords = SchemaTestUtil.generateHoodieTestRecords(0, 100);
+      SchemaTestUtil testUtil = new SchemaTestUtil();
+      List<IndexedRecord> iRecords = testUtil.generateHoodieTestRecords(0, 100);
       recordKeys.addAll(SpillableMapTestUtils.upsertRecords(iRecords, records));
     }
 
@@ -288,10 +293,12 @@ public class TestExternalSpillableMap extends HoodieCommonTestHarness {
         new ExternalSpillableMap<>(16L, basePath, new DefaultSizeEstimator(),
             new HoodieRecordSizeEstimator(schema), diskMapType, isCompressionEnabled); // 16B
 
+    SchemaTestUtil testUtil = new SchemaTestUtil();
     List<String> recordKeys = new ArrayList<>();
+
     // Ensure we spill to disk
     while (records.getDiskBasedMapNumEntries() < 1) {
-      List<HoodieRecord> hoodieRecords = SchemaTestUtil.generateHoodieTestRecordsWithoutHoodieMetadata(0, 100);
+      List<HoodieRecord> hoodieRecords = testUtil.generateHoodieTestRecordsWithoutHoodieMetadata(0, 100);
       hoodieRecords.stream().forEach(r -> {
         records.put(r.getRecordKey(), r);
         recordKeys.add(r.getRecordKey());
@@ -354,16 +361,18 @@ public class TestExternalSpillableMap extends HoodieCommonTestHarness {
             new HoodieRecordSizeEstimator(schema), diskMapType, isCompressionEnabled);
 
     List<String> recordKeys = new ArrayList<>();
+    SchemaTestUtil testUtil = new SchemaTestUtil();
 
     // Put a single record. Payload size estimation happens as part of this initial put.
-    HoodieRecord seedRecord = SchemaTestUtil.generateHoodieTestRecordsWithoutHoodieMetadata(0, 1).get(0);
+    HoodieRecord seedRecord = testUtil.generateHoodieTestRecordsWithoutHoodieMetadata(0, 1).get(0);
     records.put(seedRecord.getRecordKey(), seedRecord);
 
     // Remove the key immediately to make the map empty again.
     records.remove(seedRecord.getRecordKey());
 
     // Verify payload size re-estimation does not throw exception
-    List<HoodieRecord> hoodieRecords = SchemaTestUtil.generateHoodieTestRecordsWithoutHoodieMetadata(0, 250);
+    SchemaTestUtil testUtilx = new SchemaTestUtil();
+    List<HoodieRecord> hoodieRecords = testUtil.generateHoodieTestRecordsWithoutHoodieMetadata(0, 250);
     hoodieRecords.stream().forEach(hoodieRecord -> {
       assertDoesNotThrow(() -> {
         records.put(hoodieRecord.getRecordKey(), hoodieRecord);
diff --git a/hudi-common/src/test/java/org/apache/hudi/common/util/collection/TestRocksDbBasedMap.java b/hudi-common/src/test/java/org/apache/hudi/common/util/collection/TestRocksDbBasedMap.java
index 5b71b5ec242..96a106ec901 100644
--- a/hudi-common/src/test/java/org/apache/hudi/common/util/collection/TestRocksDbBasedMap.java
+++ b/hudi-common/src/test/java/org/apache/hudi/common/util/collection/TestRocksDbBasedMap.java
@@ -50,7 +50,8 @@ public class TestRocksDbBasedMap extends HoodieCommonTestHarness {
   @Test
   public void testSimple() throws IOException, URISyntaxException {
     RocksDbDiskMap records = new RocksDbDiskMap(basePath);
-    List<IndexedRecord> iRecords = SchemaTestUtil.generateHoodieTestRecords(0, 100);
+    SchemaTestUtil testUtil = new SchemaTestUtil();
+    List<IndexedRecord> iRecords = testUtil.generateHoodieTestRecords(0, 100);
     ((GenericRecord) iRecords.get(0)).get(HoodieRecord.COMMIT_TIME_METADATA_FIELD).toString();
     List<String> recordKeys = SpillableMapTestUtils.upsertRecords(iRecords, records);
 
diff --git a/hudi-common/src/test/java/org/apache/hudi/common/util/collection/TestRocksDbDiskMap.java b/hudi-common/src/test/java/org/apache/hudi/common/util/collection/TestRocksDbDiskMap.java
index 31daaab2136..e4dd59fd0a1 100644
--- a/hudi-common/src/test/java/org/apache/hudi/common/util/collection/TestRocksDbDiskMap.java
+++ b/hudi-common/src/test/java/org/apache/hudi/common/util/collection/TestRocksDbDiskMap.java
@@ -104,7 +104,8 @@ public class TestRocksDbDiskMap extends HoodieCommonTestHarness {
   @Test
   public void testSimpleInsertWithoutHoodieMetadata() throws IOException, URISyntaxException {
     RocksDbDiskMap rocksDBBasedMap = new RocksDbDiskMap<>(basePath);
-    List<HoodieRecord> hoodieRecords = SchemaTestUtil.generateHoodieTestRecordsWithoutHoodieMetadata(0, 1000);
+    SchemaTestUtil testUtil = new SchemaTestUtil();
+    List<HoodieRecord> hoodieRecords = testUtil.generateHoodieTestRecordsWithoutHoodieMetadata(0, 1000);
     Set<String> recordKeys = new HashSet<>();
     // insert generated records into the map
     hoodieRecords.forEach(r -> {
@@ -128,14 +129,15 @@ public class TestRocksDbDiskMap extends HoodieCommonTestHarness {
     Schema schema = HoodieAvroUtils.addMetadataFields(getSimpleSchema());
 
     RocksDbDiskMap rocksDBBasedMap = new RocksDbDiskMap<>(basePath);
-    List<IndexedRecord> insertedRecords = SchemaTestUtil.generateHoodieTestRecords(0, 100);
+    SchemaTestUtil testUtil = new SchemaTestUtil();
+    List<IndexedRecord> insertedRecords = testUtil.generateHoodieTestRecords(0, 100);
     List<String> recordKeys = SpillableMapTestUtils.upsertRecords(insertedRecords, rocksDBBasedMap);
     String oldCommitTime =
         ((GenericRecord) insertedRecords.get(0)).get(HoodieRecord.COMMIT_TIME_METADATA_FIELD).toString();
 
     // generate updates from inserts for first 50 keys / subset of keys
     List<IndexedRecord> updatedRecords = SchemaTestUtil.updateHoodieTestRecords(recordKeys.subList(0, 50),
-        SchemaTestUtil.generateHoodieTestRecords(0, 50), HoodieActiveTimeline.createNewInstantTime());
+        testUtil.generateHoodieTestRecords(0, 50), HoodieActiveTimeline.createNewInstantTime());
     String newCommitTime =
         ((GenericRecord) updatedRecords.get(0)).get(HoodieRecord.COMMIT_TIME_METADATA_FIELD).toString();
 
@@ -161,7 +163,8 @@ public class TestRocksDbDiskMap extends HoodieCommonTestHarness {
   @Test
   public void testPutAll() throws IOException, URISyntaxException {
     RocksDbDiskMap<String, HoodieRecord> rocksDBBasedMap = new RocksDbDiskMap<>(basePath);
-    List<IndexedRecord> iRecords = SchemaTestUtil.generateHoodieTestRecords(0, 100);
+    SchemaTestUtil testUtil = new SchemaTestUtil();
+    List<IndexedRecord> iRecords = testUtil.generateHoodieTestRecords(0, 100);
     Map<String, HoodieRecord> recordMap = new HashMap<>();
     iRecords.forEach(r -> {
       String key = ((GenericRecord) r).get(HoodieRecord.RECORD_KEY_METADATA_FIELD).toString();
@@ -193,7 +196,8 @@ public class TestRocksDbDiskMap extends HoodieCommonTestHarness {
   }
 
   private  List<String> setupMapWithRecords(RocksDbDiskMap rocksDBBasedMap, int numRecords) throws IOException, URISyntaxException {
-    List<IndexedRecord> iRecords = SchemaTestUtil.generateHoodieTestRecords(0, numRecords);
+    SchemaTestUtil testUtil = new SchemaTestUtil();
+    List<IndexedRecord> iRecords = testUtil.generateHoodieTestRecords(0, numRecords);
     List<String> recordKeys = SpillableMapTestUtils.upsertRecords(iRecords, rocksDBBasedMap);
     // Ensure the number of records is correct
     assertEquals(rocksDBBasedMap.size(), recordKeys.size());
diff --git a/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/Iterators.scala b/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/Iterators.scala
index d81745156a6..20ef9ded7c4 100644
--- a/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/Iterators.scala
+++ b/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/Iterators.scala
@@ -50,6 +50,7 @@ import org.apache.spark.sql.types.StructType
 import java.io.Closeable
 import scala.annotation.tailrec
 import scala.collection.JavaConverters._
+import scala.collection.mutable
 import scala.util.Try
 
 /**
@@ -61,7 +62,7 @@ class LogFileIterator(split: HoodieMergeOnReadFileSplit,
                       requiredSchema: HoodieTableSchema,
                       tableState: HoodieTableState,
                       config: Configuration)
-  extends CachingIterator[InternalRow] with Closeable with AvroDeserializerSupport {
+  extends CachingIterator[InternalRow] with AvroDeserializerSupport {
 
   protected val maxCompactionMemoryInBytes: Long = getMaxCompactionMemoryInBytes(new JobConf(config))
 
@@ -83,27 +84,27 @@ class LogFileIterator(split: HoodieMergeOnReadFileSplit,
   protected val requiredSchemaSafeAvroProjection: SafeAvroProjection = SafeAvroProjection.create(logFileReaderAvroSchema, avroSchema)
   protected val requiredSchemaUnsafeRowProjection: UnsafeProjection = HoodieCatalystExpressionUtils.generateUnsafeProjection(logFileReaderStructType, structTypeSchema)
 
-  // TODO: now logScanner with internalSchema support column project, we may no need projectAvroUnsafe
-  private var logScanner = {
+  private val logRecords = {
     val internalSchema = tableSchema.internalSchema.getOrElse(InternalSchema.getEmptyInternalSchema)
+
     scanLog(split.logFiles, getPartitionPath(split), logFileReaderAvroSchema, tableState,
       maxCompactionMemoryInBytes, config, internalSchema)
   }
 
-  private val logRecords = logScanner.getRecords.asScala
-
   def logRecordsPairIterator(): Iterator[(String, HoodieRecord[_])] = {
     logRecords.iterator
   }
 
   // NOTE: This have to stay lazy to make sure it's initialized only at the point where it's
   //       going to be used, since we modify `logRecords` before that and therefore can't do it any earlier
-  protected lazy val logRecordsIterator: Iterator[Option[HoodieRecord[_]]] = logRecords.iterator.map {
+  protected lazy val logRecordsIterator: Iterator[Option[HoodieRecord[_]]] =
+    logRecords.iterator.map {
       case (_, record: HoodieSparkRecord) => Option(record)
       case (_, _: HoodieEmptyRecord[_]) => Option.empty
       case (_, record) =>
         toScalaOption(record.toIndexedRecord(logFileReaderAvroSchema, payloadProps))
-  }
+
+    }
 
   protected def removeLogRecord(key: String): Option[HoodieRecord[_]] = logRecords.remove(key)
 
@@ -126,15 +127,6 @@ class LogFileIterator(split: HoodieMergeOnReadFileSplit,
       }
     }
   }
-
-  override def close(): Unit =
-    if (logScanner != null) {
-      try {
-        logScanner.close()
-      } finally {
-        logScanner = null
-      }
-    }
 }
 
 /**
@@ -261,7 +253,7 @@ object LogFileIterator {
               tableState: HoodieTableState,
               maxCompactionMemoryInBytes: Long,
               hadoopConf: Configuration,
-              internalSchema: InternalSchema = InternalSchema.getEmptyInternalSchema): HoodieMergedLogRecordScanner = {
+              internalSchema: InternalSchema = InternalSchema.getEmptyInternalSchema): mutable.Map[String, HoodieRecord[_]] = {
     val tablePath = tableState.tablePath
     val fs = FSUtils.getFs(tablePath, hadoopConf)
 
@@ -282,8 +274,16 @@ object LogFileIterator {
       // NOTE: In case of Metadata Table partition path equates to partition name (since there's just one level
       //       of indirection among MT partitions)
       val relativePartitionPath = getRelativePartitionPath(new Path(tablePath), partitionPath)
-      metadataTable.getLogRecordScanner(logFiles.asJava, relativePartitionPath, toJavaOption(Some(forceFullScan)))
-        .getLeft
+
+      val logRecordReader =
+        metadataTable.getLogRecordScanner(logFiles.asJava, relativePartitionPath, toJavaOption(Some(forceFullScan)))
+          .getLeft
+
+      val recordList = closing(logRecordReader) {
+        logRecordReader.getRecords
+      }
+
+      mutable.HashMap(recordList.asScala.map(r => (r.getRecordKey, r)): _*)
     } else {
       val logRecordScannerBuilder = HoodieMergedLogRecordScanner.newBuilder()
         .withFileSystem(fs)
@@ -316,10 +316,21 @@ object LogFileIterator {
           getRelativePartitionPath(new Path(tableState.tablePath), logFiles.head.getPath.getParent))
       }
 
-      val recordMerger = HoodieRecordUtils.createRecordMerger(tableState.tablePath, EngineType.SPARK, tableState.recordMergerImpls.asJava, tableState.recordMergerStrategy)
-      logRecordScannerBuilder.withRecordMerger(recordMerger)
+      logRecordScannerBuilder.withRecordMerger(
+        HoodieRecordUtils.createRecordMerger(tableState.tablePath, EngineType.SPARK, tableState.recordMergerImpls.asJava, tableState.recordMergerStrategy))
+
+      val scanner = logRecordScannerBuilder.build()
+
+      closing(scanner) {
+        // NOTE: We have to copy record-map (by default immutable copy is exposed)
+        mutable.HashMap(scanner.getRecords.asScala.toSeq: _*)
+      }
+    }
+  }
 
-      logRecordScannerBuilder.build()
+  def closing[T](c: Closeable)(f: => T): T = {
+    try { f } finally {
+      c.close()
     }
   }
 
diff --git a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/hudi/procedure/TestRepairsProcedure.scala b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/hudi/procedure/TestRepairsProcedure.scala
index 52570596e17..eaf977e82d1 100644
--- a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/hudi/procedure/TestRepairsProcedure.scala
+++ b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/hudi/procedure/TestRepairsProcedure.scala
@@ -488,8 +488,9 @@ class TestRepairsProcedure extends HoodieSparkProcedureTestBase {
     val schema: Schema = HoodieAvroUtils.addMetadataFields(SchemaTestUtil.getSimpleSchema)
     val testTable: HoodieSparkWriteableTestTable = HoodieSparkWriteableTestTable.of(metaClient, schema)
 
-    val hoodieRecords1 = SchemaTestUtil.generateHoodieTestRecords(0, 100, schema)
-    val hoodieRecords2 = SchemaTestUtil.generateHoodieTestRecords(100, 100, schema)
+    val testUtil = new SchemaTestUtil
+    val hoodieRecords1 = testUtil.generateHoodieTestRecords(0, 100, schema)
+    val hoodieRecords2 = testUtil.generateHoodieTestRecords(100, 100, schema)
     testTable.addCommit("20160401010101")
       .withInserts(HoodieTestDataGenerator.DEFAULT_FIRST_PARTITION_PATH, "1", hoodieRecords1)
     testTable.withInserts(HoodieTestDataGenerator.DEFAULT_FIRST_PARTITION_PATH, "2", hoodieRecords2)