You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hudi.apache.org by yi...@apache.org on 2023/01/30 04:56:35 UTC

[hudi] 02/19: [HUDI-5534] Optimizing Bloom Index lookup when using Bloom Filters from Metadata Table (#7642)

This is an automated email from the ASF dual-hosted git repository.

yihua pushed a commit to branch release-0.13.0
in repository https://gitbox.apache.org/repos/asf/hudi.git

commit 9ac8765a3a79d2cffe8a43ac6f33e3986f91baf7
Author: Alexey Kudinkin <al...@gmail.com>
AuthorDate: Fri Jan 27 15:47:12 2023 -0800

    [HUDI-5534] Optimizing Bloom Index lookup when using Bloom Filters from Metadata Table (#7642)
    
    Most recently, while trying to use Metadata Table in Bloom Index it was resulting in failures due to exhaustion of S3 connection pool no matter how (reasonably big) we're setting the pool size (we've tested up to 3k connections).
    
    This PR focuses on optimizing the Bloom Index lookup sequence in case when it's leveraging Bloom Filter partition in Metadata Table. The premise of this change is based on the following observations:
    
    Increasing the size of the batch of the requests to MT allows to amortize the cost of processing it (bigger the batch, lesser the cost).
    
    Having too few partitions in the Bloom Index path however, starts to hurt parallelism when we actually probe individual files whether they actually contain target keys or not. Solution to this is to split these 2 in different stages w/ drastically different parallelism levels: constrain parallelism when reading from MT (10s of tasks) and keep at the current level for probing individual files (100s of tasks)
    
    Current way of partitioning records (relying on Spark's default partitioner) was entailing that every Spark executor with high likelihood will be opening up (and processing) every file-group of the MT Bloom Filter partition. To alleviate that same hashing algorithm used by MT should be used to partition records into Spark's individual partitions, so that we can limit every task to open no more than 1 file-group in Bloom Filter's partition of MT
    
    To achieve that following changes in Bloom Index sequence (leveraging MT) are implemented
    
    Bloom Filter probing and actual File Probing are split into 2 separate operations (so that parallelism of each of them could be controlled individually)
    Requests to MT are replaced to invoke batch APIs
    Custom partitioner is introduced AffineBloomIndexFileGroupPartitioner repartitioning dataset of filenames with corresponding record keys in a way that is affine w/ MT Bloom Filters' partitioning (allowing us to open no more than a single file-group per Spark's task)
    Additionally, this PR addresses some of the low-hanging performance optimizations that could considerably improve performance of the Bloom Index lookup sequence like mapping file-comparison pairs to PairRDD (where key is file-name, and value is record-key) instead of RDD so that we could:
    
    Do in-partition sorting by filename (to make sure we check all records w/in the file all at once) w/in a single Spark partition instead of global one (reducing shuffling as well)
    Avoid re-shuffling (by re-mapping from RDD to PairRDD later)
---
 .../org/apache/hudi/index/HoodieIndexUtils.java    |   5 +-
 .../index/bloom/BaseHoodieBloomIndexHelper.java    |   5 +-
 .../apache/hudi/index/bloom/HoodieBloomIndex.java  |  77 +++----
 ...ion.java => HoodieBloomIndexCheckFunction.java} |  59 +++---
 .../hudi/index/bloom/HoodieGlobalBloomIndex.java   |  14 +-
 .../bloom/ListBasedHoodieBloomIndexHelper.java     |  26 +--
 .../index/bloom/TestFlinkHoodieBloomIndex.java     |   8 +-
 .../org/apache/hudi/data/HoodieJavaPairRDD.java    |   5 +
 .../java/org/apache/hudi/data/HoodieJavaRDD.java   |  14 +-
 .../bloom/BucketizedBloomCheckPartitioner.java     |  18 +-
 .../bloom/HoodieBloomFilterProbingResult.java      |  35 ++++
 .../index/bloom/HoodieBloomIndexCheckFunction.java | 120 -----------
 .../index/bloom/HoodieFileProbingFunction.java     | 143 +++++++++++++
 .../HoodieMetadataBloomFilterProbingFunction.java  | 157 ++++++++++++++
 .../HoodieMetadataBloomIndexCheckFunction.java     | 154 --------------
 .../index/bloom/SparkHoodieBloomIndexHelper.java   | 229 ++++++++++++++++++---
 .../org/apache/spark/sql/hudi/SparkAdapter.scala   |   4 +-
 .../bloom/TestBucketizedBloomCheckPartitioner.java |  43 ++--
 .../hudi/index/bloom/TestHoodieBloomIndex.java     |  10 +-
 .../index/bloom/TestHoodieGlobalBloomIndex.java    |  10 +-
 .../org/apache/hudi/common/data/HoodieData.java    |  17 +-
 .../apache/hudi/common/data/HoodieListData.java    |  10 +
 .../hudi/common/data/HoodieListPairData.java       |  23 +++
 .../apache/hudi/common/data/HoodiePairData.java    |  11 +-
 .../ThrowingConsumer.java}                         |  31 +--
 .../hudi/common/model/HoodieAvroIndexedRecord.java |   2 +-
 .../hudi/common/table/HoodieTableMetaClient.java   |   4 +
 .../common/util/collection/FlatteningIterator.java |  55 +++++
 .../common/util/collection/MappingIterator.java    |   4 +-
 .../apache/hudi/metadata/BaseTableMetadata.java    |  53 +++--
 .../hudi/metadata/HoodieBackedTableMetadata.java   |  34 ++-
 .../metadata/HoodieMetadataFileSystemView.java     |   5 +-
 .../main/java/org/apache/hudi/util/Transient.java  | 108 ++++++++++
 .../hudi/common/util/collection/TestIterators.java |  48 +++++
 .../apache/spark/sql/adapter/Spark2Adapter.scala   |   4 +-
 35 files changed, 1038 insertions(+), 507 deletions(-)

diff --git a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/index/HoodieIndexUtils.java b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/index/HoodieIndexUtils.java
index 8a8a03e1b17..2ec46f19a43 100644
--- a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/index/HoodieIndexUtils.java
+++ b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/index/HoodieIndexUtils.java
@@ -62,9 +62,8 @@ public class HoodieIndexUtils {
    * @param hoodieTable Instance of {@link HoodieTable} of interest
    * @return the list of {@link HoodieBaseFile}
    */
-  public static List<HoodieBaseFile> getLatestBaseFilesForPartition(
-      final String partition,
-      final HoodieTable hoodieTable) {
+  public static List<HoodieBaseFile> getLatestBaseFilesForPartition(String partition,
+                                                                    HoodieTable hoodieTable) {
     Option<HoodieInstant> latestCommitTime = hoodieTable.getMetaClient().getCommitsTimeline()
         .filterCompletedInstants().lastInstant();
     if (latestCommitTime.isPresent()) {
diff --git a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/index/bloom/BaseHoodieBloomIndexHelper.java b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/index/bloom/BaseHoodieBloomIndexHelper.java
index 9430d9bb5e5..f144540ed22 100644
--- a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/index/bloom/BaseHoodieBloomIndexHelper.java
+++ b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/index/bloom/BaseHoodieBloomIndexHelper.java
@@ -19,12 +19,11 @@
 
 package org.apache.hudi.index.bloom;
 
-import org.apache.hudi.common.data.HoodieData;
 import org.apache.hudi.common.data.HoodiePairData;
 import org.apache.hudi.common.engine.HoodieEngineContext;
+import org.apache.hudi.common.model.HoodieFileGroupId;
 import org.apache.hudi.common.model.HoodieKey;
 import org.apache.hudi.common.model.HoodieRecordLocation;
-import org.apache.hudi.common.util.collection.Pair;
 import org.apache.hudi.config.HoodieWriteConfig;
 import org.apache.hudi.table.HoodieTable;
 
@@ -51,7 +50,7 @@ public abstract class BaseHoodieBloomIndexHelper implements Serializable {
   public abstract HoodiePairData<HoodieKey, HoodieRecordLocation> findMatchingFilesForRecordKeys(
       HoodieWriteConfig config, HoodieEngineContext context, HoodieTable hoodieTable,
       HoodiePairData<String, String> partitionRecordKeyPairs,
-      HoodieData<Pair<String, HoodieKey>> fileComparisonPairs,
+      HoodiePairData<HoodieFileGroupId, String> fileComparisonPairs,
       Map<String, List<BloomIndexFileInfo>> partitionToFileInfo,
       Map<String, Long> recordsPerPartition);
 }
diff --git a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/index/bloom/HoodieBloomIndex.java b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/index/bloom/HoodieBloomIndex.java
index 57d9def9b42..cf067c3a991 100644
--- a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/index/bloom/HoodieBloomIndex.java
+++ b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/index/bloom/HoodieBloomIndex.java
@@ -27,6 +27,7 @@ import org.apache.hudi.common.data.HoodiePairData;
 import org.apache.hudi.common.engine.HoodieEngineContext;
 import org.apache.hudi.common.fs.FSUtils;
 import org.apache.hudi.common.model.HoodieBaseFile;
+import org.apache.hudi.common.model.HoodieFileGroupId;
 import org.apache.hudi.common.model.HoodieKey;
 import org.apache.hudi.common.model.HoodieRecord;
 import org.apache.hudi.common.model.HoodieRecordLocation;
@@ -35,7 +36,6 @@ import org.apache.hudi.common.util.collection.ImmutablePair;
 import org.apache.hudi.common.util.collection.Pair;
 import org.apache.hudi.config.HoodieIndexConfig;
 import org.apache.hudi.config.HoodieWriteConfig;
-import org.apache.hudi.exception.HoodieMetadataException;
 import org.apache.hudi.exception.MetadataNotFoundException;
 import org.apache.hudi.index.HoodieIndex;
 import org.apache.hudi.index.HoodieIndexUtils;
@@ -45,9 +45,9 @@ import org.apache.log4j.LogManager;
 import org.apache.log4j.Logger;
 
 import java.util.ArrayList;
+import java.util.Collections;
 import java.util.List;
 import java.util.Map;
-import java.util.stream.Collectors;
 import java.util.stream.Stream;
 
 import static java.util.stream.Collectors.groupingBy;
@@ -127,7 +127,7 @@ public class HoodieBloomIndex extends HoodieIndex<Object, Object> {
 
     // Step 3: Obtain a HoodieData, for each incoming record, that already exists, with the file id,
     // that contains it.
-    HoodieData<Pair<String, HoodieKey>> fileComparisonPairs =
+    HoodiePairData<HoodieFileGroupId, String> fileComparisonPairs =
         explodeRecordsWithFileComparisons(partitionToFileInfo, partitionRecordKeyPairs);
 
     return bloomIndexHelper.findMatchingFilesForRecordKeys(config, context, hoodieTable,
@@ -210,34 +210,35 @@ public class HoodieBloomIndex extends HoodieIndex<Object, Object> {
     // also obtain file ranges, if range pruning is enabled
     context.setJobStatus(this.getClass().getName(), "Load meta index key ranges for file slices: " + config.getTableName());
 
-    final String keyField = hoodieTable.getMetaClient().getTableConfig().getRecordKeyFieldProp();
-    return context.flatMap(partitions, partitionName -> {
-      // Partition and file name pairs
-      List<Pair<String, String>> partitionFileNameList = HoodieIndexUtils.getLatestBaseFilesForPartition(partitionName,
-              hoodieTable).stream().map(baseFile -> Pair.of(partitionName, baseFile.getFileName()))
-          .sorted()
-          .collect(toList());
-      if (partitionFileNameList.isEmpty()) {
-        return Stream.empty();
-      }
-      try {
-        Map<Pair<String, String>, HoodieMetadataColumnStats> fileToColumnStatsMap =
-            hoodieTable.getMetadataTable().getColumnStats(partitionFileNameList, keyField);
-        List<Pair<String, BloomIndexFileInfo>> result = new ArrayList<>();
-        for (Map.Entry<Pair<String, String>, HoodieMetadataColumnStats> entry : fileToColumnStatsMap.entrySet()) {
-          result.add(Pair.of(entry.getKey().getLeft(),
-              new BloomIndexFileInfo(
-                  FSUtils.getFileId(entry.getKey().getRight()),
-                  // NOTE: Here we assume that the type of the primary key field is string
-                  (String) unwrapStatisticValueWrapper(entry.getValue().getMinValue()),
-                  (String) unwrapStatisticValueWrapper(entry.getValue().getMaxValue())
-              )));
-        }
-        return result.stream();
-      } catch (MetadataNotFoundException me) {
-        throw new HoodieMetadataException("Unable to find column range metadata for partition:" + partitionName, me);
-      }
-    }, Math.max(partitions.size(), 1));
+    String keyField = hoodieTable.getMetaClient().getTableConfig().getRecordKeyFieldProp();
+
+    // Partition and file name pairs
+    List<Pair<String, String>> partitionFileNameList =
+        HoodieIndexUtils.getLatestBaseFilesForAllPartitions(partitions, context, hoodieTable).stream()
+            .map(partitionBaseFilePair -> Pair.of(partitionBaseFilePair.getLeft(), partitionBaseFilePair.getRight().getFileName()))
+            .sorted()
+            .collect(toList());
+
+    if (partitionFileNameList.isEmpty()) {
+      return Collections.emptyList();
+    }
+
+    Map<Pair<String, String>, HoodieMetadataColumnStats> fileToColumnStatsMap =
+        hoodieTable.getMetadataTable().getColumnStats(partitionFileNameList, keyField);
+
+    List<Pair<String, BloomIndexFileInfo>> result = new ArrayList<>(fileToColumnStatsMap.size());
+
+    for (Map.Entry<Pair<String, String>, HoodieMetadataColumnStats> entry : fileToColumnStatsMap.entrySet()) {
+      result.add(Pair.of(entry.getKey().getLeft(),
+          new BloomIndexFileInfo(
+              FSUtils.getFileId(entry.getKey().getRight()),
+              // NOTE: Here we assume that the type of the primary key field is string
+              (String) unwrapStatisticValueWrapper(entry.getValue().getMinValue()),
+              (String) unwrapStatisticValueWrapper(entry.getValue().getMaxValue())
+          )));
+    }
+
+    return result;
   }
 
   @Override
@@ -278,7 +279,7 @@ public class HoodieBloomIndex extends HoodieIndex<Object, Object> {
    * Sub-partition to ensure the records can be looked up against files & also prune file<=>record comparisons based on
    * recordKey ranges in the index info.
    */
-  HoodieData<Pair<String, HoodieKey>> explodeRecordsWithFileComparisons(
+  HoodiePairData<HoodieFileGroupId, String> explodeRecordsWithFileComparisons(
       final Map<String, List<BloomIndexFileInfo>> partitionToFileIndexInfo,
       HoodiePairData<String, String> partitionRecordKeyPairs) {
     IndexFileFilter indexFileFilter =
@@ -289,11 +290,13 @@ public class HoodieBloomIndex extends HoodieIndex<Object, Object> {
       String recordKey = partitionRecordKeyPair.getRight();
       String partitionPath = partitionRecordKeyPair.getLeft();
 
-      return indexFileFilter.getMatchingFilesAndPartition(partitionPath, recordKey).stream()
-          .map(partitionFileIdPair -> (Pair<String, HoodieKey>) new ImmutablePair<>(partitionFileIdPair.getRight(),
-              new HoodieKey(recordKey, partitionPath)))
-          .collect(Collectors.toList());
-    }).flatMap(List::iterator);
+      return indexFileFilter.getMatchingFilesAndPartition(partitionPath, recordKey)
+          .stream()
+          .map(partitionFileIdPair ->
+              new ImmutablePair<>(
+                  new HoodieFileGroupId(partitionFileIdPair.getLeft(), partitionFileIdPair.getRight()), recordKey));
+    })
+        .flatMapToPair(Stream::iterator);
   }
 
   /**
diff --git a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/index/bloom/HoodieBaseBloomIndexCheckFunction.java b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/index/bloom/HoodieBloomIndexCheckFunction.java
similarity index 61%
rename from hudi-client/hudi-client-common/src/main/java/org/apache/hudi/index/bloom/HoodieBaseBloomIndexCheckFunction.java
rename to hudi-client/hudi-client-common/src/main/java/org/apache/hudi/index/bloom/HoodieBloomIndexCheckFunction.java
index 80031f4e8f0..52b504e9ab1 100644
--- a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/index/bloom/HoodieBaseBloomIndexCheckFunction.java
+++ b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/index/bloom/HoodieBloomIndexCheckFunction.java
@@ -19,7 +19,8 @@
 package org.apache.hudi.index.bloom;
 
 import org.apache.hudi.client.utils.LazyIterableIterator;
-import org.apache.hudi.common.model.HoodieKey;
+import org.apache.hudi.common.function.SerializableFunction;
+import org.apache.hudi.common.model.HoodieFileGroupId;
 import org.apache.hudi.common.util.collection.Pair;
 import org.apache.hudi.config.HoodieWriteConfig;
 import org.apache.hudi.exception.HoodieException;
@@ -28,53 +29,67 @@ import org.apache.hudi.io.HoodieKeyLookupHandle;
 import org.apache.hudi.io.HoodieKeyLookupResult;
 import org.apache.hudi.table.HoodieTable;
 
-import java.util.function.Function;
+import java.io.Serializable;
 import java.util.ArrayList;
 import java.util.Iterator;
 import java.util.List;
+import java.util.function.Function;
 
 /**
- * Function performing actual checking of list containing (fileId, hoodieKeys) against the actual files.
+ * Function accepting a tuple of {@link HoodieFileGroupId} and a record key and producing
+ * a list of {@link HoodieKeyLookupResult} for every file identified by the file-group ids
+ *
+ * @param <I> type of the tuple of {@code (HoodieFileGroupId, <record-key>)}. Note that this is
+ *           parameterized as generic such that this code could be reused for Spark as well
  */
-public class HoodieBaseBloomIndexCheckFunction
-    implements Function<Iterator<Pair<String, HoodieKey>>, Iterator<List<HoodieKeyLookupResult>>> {
+public class HoodieBloomIndexCheckFunction<I> 
+    implements Function<Iterator<I>, Iterator<List<HoodieKeyLookupResult>>>, Serializable {
 
   private final HoodieTable hoodieTable;
 
   private final HoodieWriteConfig config;
 
-  public HoodieBaseBloomIndexCheckFunction(HoodieTable hoodieTable, HoodieWriteConfig config) {
+  private final SerializableFunction<I, HoodieFileGroupId> fileGroupIdExtractor;
+  private final SerializableFunction<I, String> recordKeyExtractor;
+
+  public HoodieBloomIndexCheckFunction(HoodieTable hoodieTable,
+                                       HoodieWriteConfig config,
+                                       SerializableFunction<I, HoodieFileGroupId> fileGroupIdExtractor,
+                                       SerializableFunction<I, String> recordKeyExtractor) {
     this.hoodieTable = hoodieTable;
     this.config = config;
+    this.fileGroupIdExtractor = fileGroupIdExtractor;
+    this.recordKeyExtractor = recordKeyExtractor;
   }
 
   @Override
-  public Iterator<List<HoodieKeyLookupResult>> apply(Iterator<Pair<String, HoodieKey>> filePartitionRecordKeyTripletItr) {
-    return new LazyKeyCheckIterator(filePartitionRecordKeyTripletItr);
+  public Iterator<List<HoodieKeyLookupResult>> apply(Iterator<I> fileGroupIdRecordKeyPairIterator) {
+    return new LazyKeyCheckIterator(fileGroupIdRecordKeyPairIterator);
   }
 
-  class LazyKeyCheckIterator extends LazyIterableIterator<Pair<String, HoodieKey>, List<HoodieKeyLookupResult>> {
+  protected class LazyKeyCheckIterator extends LazyIterableIterator<I, List<HoodieKeyLookupResult>> {
 
     private HoodieKeyLookupHandle keyLookupHandle;
 
-    LazyKeyCheckIterator(Iterator<Pair<String, HoodieKey>> filePartitionRecordKeyTripletItr) {
+    LazyKeyCheckIterator(Iterator<I> filePartitionRecordKeyTripletItr) {
       super(filePartitionRecordKeyTripletItr);
     }
 
-    @Override
-    protected void start() {
-    }
-
     @Override
     protected List<HoodieKeyLookupResult> computeNext() {
+
       List<HoodieKeyLookupResult> ret = new ArrayList<>();
       try {
         // process one file in each go.
         while (inputItr.hasNext()) {
-          Pair<String, HoodieKey> currentTuple = inputItr.next();
-          String fileId = currentTuple.getLeft();
-          String partitionPath = currentTuple.getRight().getPartitionPath();
-          String recordKey = currentTuple.getRight().getRecordKey();
+          I tuple = inputItr.next();
+
+          HoodieFileGroupId fileGroupId = fileGroupIdExtractor.apply(tuple);
+          String recordKey = recordKeyExtractor.apply(tuple);
+
+          String fileId = fileGroupId.getFileId();
+          String partitionPath = fileGroupId.getPartitionPath();
+
           Pair<String, String> partitionPathFilePair = Pair.of(partitionPath, fileId);
 
           // lazily init state
@@ -100,15 +115,13 @@ public class HoodieBaseBloomIndexCheckFunction
         }
       } catch (Throwable e) {
         if (e instanceof HoodieException) {
-          throw e;
+          throw (HoodieException) e;
         }
+
         throw new HoodieIndexException("Error checking bloom filter index. ", e);
       }
-      return ret;
-    }
 
-    @Override
-    protected void end() {
+      return ret;
     }
   }
 }
diff --git a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/index/bloom/HoodieGlobalBloomIndex.java b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/index/bloom/HoodieGlobalBloomIndex.java
index 5f2007ea536..b5604312d3f 100644
--- a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/index/bloom/HoodieGlobalBloomIndex.java
+++ b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/index/bloom/HoodieGlobalBloomIndex.java
@@ -25,6 +25,7 @@ import org.apache.hudi.common.engine.HoodieEngineContext;
 import org.apache.hudi.common.fs.FSUtils;
 import org.apache.hudi.common.model.EmptyHoodieRecordPayload;
 import org.apache.hudi.common.model.HoodieAvroRecord;
+import org.apache.hudi.common.model.HoodieFileGroupId;
 import org.apache.hudi.common.model.HoodieKey;
 import org.apache.hudi.common.model.HoodieRecord;
 import org.apache.hudi.common.model.HoodieRecordLocation;
@@ -41,7 +42,7 @@ import java.util.Arrays;
 import java.util.Collections;
 import java.util.List;
 import java.util.Map;
-import java.util.stream.Collectors;
+import java.util.stream.Stream;
 
 /**
  * This filter will only work with hoodie table since it will only load partitions
@@ -74,7 +75,7 @@ public class HoodieGlobalBloomIndex extends HoodieBloomIndex {
    */
 
   @Override
-  HoodieData<Pair<String, HoodieKey>> explodeRecordsWithFileComparisons(
+  HoodiePairData<HoodieFileGroupId, String> explodeRecordsWithFileComparisons(
       final Map<String, List<BloomIndexFileInfo>> partitionToFileIndexInfo,
       HoodiePairData<String, String> partitionRecordKeyPairs) {
 
@@ -87,10 +88,11 @@ public class HoodieGlobalBloomIndex extends HoodieBloomIndex {
       String partitionPath = partitionRecordKeyPair.getLeft();
 
       return indexFileFilter.getMatchingFilesAndPartition(partitionPath, recordKey).stream()
-          .map(partitionFileIdPair -> (Pair<String, HoodieKey>) new ImmutablePair<>(partitionFileIdPair.getRight(),
-              new HoodieKey(recordKey, partitionFileIdPair.getLeft())))
-          .collect(Collectors.toList());
-    }).flatMap(List::iterator);
+          .map(partitionFileIdPair ->
+              new ImmutablePair<>(
+                  new HoodieFileGroupId(partitionFileIdPair.getLeft(), partitionFileIdPair.getRight()), recordKey));
+    })
+        .flatMapToPair(Stream::iterator);
   }
 
   /**
diff --git a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/index/bloom/ListBasedHoodieBloomIndexHelper.java b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/index/bloom/ListBasedHoodieBloomIndexHelper.java
index cffee5ee740..b47f5cf066c 100644
--- a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/index/bloom/ListBasedHoodieBloomIndexHelper.java
+++ b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/index/bloom/ListBasedHoodieBloomIndexHelper.java
@@ -19,20 +19,20 @@
 
 package org.apache.hudi.index.bloom;
 
-import org.apache.hudi.common.data.HoodieData;
 import org.apache.hudi.common.data.HoodiePairData;
 import org.apache.hudi.common.engine.HoodieEngineContext;
+import org.apache.hudi.common.model.HoodieFileGroupId;
 import org.apache.hudi.common.model.HoodieKey;
 import org.apache.hudi.common.model.HoodieRecordLocation;
+import org.apache.hudi.common.util.CollectionUtils;
 import org.apache.hudi.common.util.collection.ImmutablePair;
 import org.apache.hudi.common.util.collection.Pair;
 import org.apache.hudi.config.HoodieWriteConfig;
 import org.apache.hudi.io.HoodieKeyLookupResult;
 import org.apache.hudi.table.HoodieTable;
 
-import java.util.ArrayList;
+import java.util.Collection;
 import java.util.Comparator;
-import java.util.Iterator;
 import java.util.List;
 import java.util.Map;
 
@@ -56,21 +56,21 @@ public class ListBasedHoodieBloomIndexHelper extends BaseHoodieBloomIndexHelper
   public HoodiePairData<HoodieKey, HoodieRecordLocation> findMatchingFilesForRecordKeys(
       HoodieWriteConfig config, HoodieEngineContext context, HoodieTable hoodieTable,
       HoodiePairData<String, String> partitionRecordKeyPairs,
-      HoodieData<Pair<String, HoodieKey>> fileComparisonPairs,
+      HoodiePairData<HoodieFileGroupId, String> fileComparisonPairs,
       Map<String, List<BloomIndexFileInfo>> partitionToFileInfo, Map<String, Long> recordsPerPartition) {
-    List<Pair<String, HoodieKey>> fileComparisonPairList =
+    List<Pair<HoodieFileGroupId, String>> fileComparisonPairList =
         fileComparisonPairs.collectAsList().stream()
             .sorted(Comparator.comparing(Pair::getLeft)).collect(toList());
 
-    List<HoodieKeyLookupResult> keyLookupResults = new ArrayList<>();
-    Iterator<List<HoodieKeyLookupResult>> iterator = new HoodieBaseBloomIndexCheckFunction(
-        hoodieTable, config).apply(fileComparisonPairList.iterator());
-    while (iterator.hasNext()) {
-      keyLookupResults.addAll(iterator.next());
-    }
+    List<HoodieKeyLookupResult> keyLookupResults =
+        CollectionUtils.toStream(
+          new HoodieBloomIndexCheckFunction<Pair<HoodieFileGroupId, String>>(hoodieTable, config, Pair::getLeft, Pair::getRight)
+              .apply(fileComparisonPairList.iterator())
+        )
+            .flatMap(Collection::stream)
+            .filter(lr -> lr.getMatchingRecordKeys().size() > 0)
+            .collect(toList());
 
-    keyLookupResults = keyLookupResults.stream().filter(
-        lr -> lr.getMatchingRecordKeys().size() > 0).collect(toList());
     return context.parallelize(keyLookupResults).flatMap(lookupResult ->
         lookupResult.getMatchingRecordKeys().stream()
             .map(recordKey -> new ImmutablePair<>(lookupResult, recordKey)).iterator()
diff --git a/hudi-client/hudi-flink-client/src/test/java/org/apache/hudi/index/bloom/TestFlinkHoodieBloomIndex.java b/hudi-client/hudi-flink-client/src/test/java/org/apache/hudi/index/bloom/TestFlinkHoodieBloomIndex.java
index 6dd5a1c27e2..d4b4007bedb 100644
--- a/hudi-client/hudi-flink-client/src/test/java/org/apache/hudi/index/bloom/TestFlinkHoodieBloomIndex.java
+++ b/hudi-client/hudi-flink-client/src/test/java/org/apache/hudi/index/bloom/TestFlinkHoodieBloomIndex.java
@@ -24,6 +24,7 @@ import org.apache.hudi.common.bloom.BloomFilterTypeCode;
 import org.apache.hudi.common.data.HoodieListPairData;
 import org.apache.hudi.common.engine.EngineType;
 import org.apache.hudi.common.model.HoodieAvroRecord;
+import org.apache.hudi.common.model.HoodieFileGroupId;
 import org.apache.hudi.common.model.HoodieKey;
 import org.apache.hudi.common.model.HoodieRecord;
 import org.apache.hudi.common.table.HoodieTableMetaClient;
@@ -186,11 +187,14 @@ public class TestFlinkHoodieBloomIndex extends HoodieFlinkClientTestHarness {
           partitionRecordKeyMap.put(t.getLeft(), recordKeyList);
         });
 
-    List<Pair<String, HoodieKey>> comparisonKeyList = index.explodeRecordsWithFileComparisons(partitionToFileIndexInfo, HoodieListPairData.lazy(partitionRecordKeyMap)).collectAsList();
+    List<Pair<HoodieFileGroupId, String>> comparisonKeyList =
+        index.explodeRecordsWithFileComparisons(partitionToFileIndexInfo, HoodieListPairData.lazy(partitionRecordKeyMap)).collectAsList();
 
     assertEquals(10, comparisonKeyList.size());
     java.util.Map<String, List<String>> recordKeyToFileComps = comparisonKeyList.stream()
-        .collect(java.util.stream.Collectors.groupingBy(t -> t.getRight().getRecordKey(), java.util.stream.Collectors.mapping(t -> t.getLeft(), java.util.stream.Collectors.toList())));
+        .collect(
+            java.util.stream.Collectors.groupingBy(t -> t.getRight(),
+              java.util.stream.Collectors.mapping(t -> t.getLeft().getFileId(), java.util.stream.Collectors.toList())));
 
     assertEquals(4, recordKeyToFileComps.size());
     assertEquals(new java.util.HashSet<>(asList("f1", "f3", "f4")), new java.util.HashSet<>(recordKeyToFileComps.get("002")));
diff --git a/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/data/HoodieJavaPairRDD.java b/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/data/HoodieJavaPairRDD.java
index 9ec3c4cf715..9019fb43ff0 100644
--- a/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/data/HoodieJavaPairRDD.java
+++ b/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/data/HoodieJavaPairRDD.java
@@ -121,6 +121,11 @@ public class HoodieJavaPairRDD<K, V> implements HoodiePairData<K, V> {
         tuple -> func.apply(new ImmutablePair<>(tuple._1, tuple._2))));
   }
 
+  @Override
+  public <W> HoodiePairData<K, W> mapValues(SerializableFunction<V, W> func) {
+    return HoodieJavaPairRDD.of(pairRDDData.mapValues(func::apply));
+  }
+
   @Override
   public <L, W> HoodiePairData<L, W> mapToPair(SerializablePairFunction<Pair<K, V>, L, W> mapToPairFunc) {
     return HoodieJavaPairRDD.of(pairRDDData.mapToPair(pair -> {
diff --git a/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/data/HoodieJavaRDD.java b/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/data/HoodieJavaRDD.java
index ed9613bc15f..6ed3a854962 100644
--- a/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/data/HoodieJavaRDD.java
+++ b/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/data/HoodieJavaRDD.java
@@ -24,17 +24,16 @@ import org.apache.hudi.common.data.HoodieData;
 import org.apache.hudi.common.data.HoodiePairData;
 import org.apache.hudi.common.function.SerializableFunction;
 import org.apache.hudi.common.function.SerializablePairFunction;
+import org.apache.hudi.common.util.collection.MappingIterator;
 import org.apache.hudi.common.util.collection.Pair;
-
 import org.apache.spark.api.java.JavaPairRDD;
 import org.apache.spark.api.java.JavaRDD;
 import org.apache.spark.storage.StorageLevel;
+import scala.Tuple2;
 
 import java.util.Iterator;
 import java.util.List;
 
-import scala.Tuple2;
-
 /**
  * Holds a {@link JavaRDD} of objects.
  *
@@ -119,9 +118,18 @@ public class HoodieJavaRDD<T> implements HoodieData<T> {
 
   @Override
   public <O> HoodieData<O> flatMap(SerializableFunction<T, Iterator<O>> func) {
+    // NOTE: Unrolling this lambda into a method reference results in [[ClassCastException]]
+    //       due to weird interop b/w Scala and Java
     return HoodieJavaRDD.of(rddData.flatMap(e -> func.apply(e)));
   }
 
+  @Override
+  public <K, V> HoodiePairData<K, V> flatMapToPair(SerializableFunction<T, Iterator<? extends Pair<K, V>>> func) {
+    return HoodieJavaPairRDD.of(
+        rddData.flatMapToPair(e ->
+            new MappingIterator<>(func.apply(e), p -> new Tuple2<>(p.getKey(), p.getValue()))));
+  }
+
   @Override
   public <K, V> HoodiePairData<K, V> mapToPair(SerializablePairFunction<T, K, V> func) {
     return HoodieJavaPairRDD.of(rddData.mapToPair(input -> {
diff --git a/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/index/bloom/BucketizedBloomCheckPartitioner.java b/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/index/bloom/BucketizedBloomCheckPartitioner.java
index 36710dc02bb..48099220dbf 100644
--- a/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/index/bloom/BucketizedBloomCheckPartitioner.java
+++ b/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/index/bloom/BucketizedBloomCheckPartitioner.java
@@ -18,6 +18,7 @@
 
 package org.apache.hudi.index.bloom;
 
+import org.apache.hudi.common.model.HoodieFileGroupId;
 import org.apache.hudi.common.util.NumericUtils;
 import org.apache.hudi.common.util.collection.Pair;
 
@@ -63,7 +64,7 @@ public class BucketizedBloomCheckPartitioner extends Partitioner {
   /**
    * Stores the final mapping of a file group to a list of partitions for its keys.
    */
-  private Map<String, List<Integer>> fileGroupToPartitions;
+  private Map<HoodieFileGroupId, List<Integer>> fileGroupToPartitions;
 
   /**
    * Create a partitioner that computes a plan based on provided workload characteristics.
@@ -72,11 +73,11 @@ public class BucketizedBloomCheckPartitioner extends Partitioner {
    * @param fileGroupToComparisons number of expected comparisons per file group
    * @param keysPerBucket maximum number of keys to pack in a single bucket
    */
-  public BucketizedBloomCheckPartitioner(int targetPartitions, Map<String, Long> fileGroupToComparisons,
+  public BucketizedBloomCheckPartitioner(int targetPartitions, Map<HoodieFileGroupId, Long> fileGroupToComparisons,
       int keysPerBucket) {
     this.fileGroupToPartitions = new HashMap<>();
 
-    Map<String, Integer> bucketsPerFileGroup = new HashMap<>();
+    Map<HoodieFileGroupId, Integer> bucketsPerFileGroup = new HashMap<>();
     // Compute the buckets needed per file group, using simple uniform distribution
     fileGroupToComparisons.forEach((f, c) -> bucketsPerFileGroup.put(f, (int) Math.ceil((c * 1.0) / keysPerBucket)));
     int totalBuckets = bucketsPerFileGroup.values().stream().mapToInt(i -> i).sum();
@@ -90,9 +91,9 @@ public class BucketizedBloomCheckPartitioner extends Partitioner {
     int minBucketsPerPartition = Math.max((int) Math.floor((1.0 * totalBuckets) / partitions), 1);
     LOG.info(String.format("TotalBuckets %d, min_buckets/partition %d", totalBuckets, minBucketsPerPartition));
     int[] bucketsFilled = new int[partitions];
-    Map<String, AtomicInteger> bucketsFilledPerFileGroup = new HashMap<>();
+    Map<HoodieFileGroupId, AtomicInteger> bucketsFilledPerFileGroup = new HashMap<>();
     int partitionIndex = 0;
-    for (Map.Entry<String, Integer> e : bucketsPerFileGroup.entrySet()) {
+    for (Map.Entry<HoodieFileGroupId, Integer> e : bucketsPerFileGroup.entrySet()) {
       for (int b = 0; b < Math.max(1, e.getValue() - 1); b++) {
         // keep filled counts upto date
         bucketsFilled[partitionIndex]++;
@@ -115,7 +116,7 @@ public class BucketizedBloomCheckPartitioner extends Partitioner {
     // PHASE 2 : for remaining unassigned buckets, round robin over partitions once. Since we withheld 1 bucket from
     // each file group uniformly, this remaining is also an uniform mix across file groups. We just round robin to
     // optimize for goal 2.
-    for (Map.Entry<String, Integer> e : bucketsPerFileGroup.entrySet()) {
+    for (Map.Entry<HoodieFileGroupId, Integer> e : bucketsPerFileGroup.entrySet()) {
       int remaining = e.getValue() - bucketsFilledPerFileGroup.get(e.getKey()).intValue();
       for (int r = 0; r < remaining; r++) {
         // mark this partition against the file group
@@ -142,7 +143,8 @@ public class BucketizedBloomCheckPartitioner extends Partitioner {
 
   @Override
   public int getPartition(Object key) {
-    final Pair<String, String> parts = (Pair<String, String>) key;
+    final Pair<HoodieFileGroupId, String> parts = (Pair<HoodieFileGroupId, String>) key;
+    // TODO replace w/ more performant hash
     final long hashOfKey = NumericUtils.getMessageDigestHash("MD5", parts.getRight());
     final List<Integer> candidatePartitions = fileGroupToPartitions.get(parts.getLeft());
     final int idx = (int) Math.floorMod((int) hashOfKey, candidatePartitions.size());
@@ -150,7 +152,7 @@ public class BucketizedBloomCheckPartitioner extends Partitioner {
     return candidatePartitions.get(idx);
   }
 
-  Map<String, List<Integer>> getFileGroupToPartitions() {
+  Map<HoodieFileGroupId, List<Integer>> getFileGroupToPartitions() {
     return fileGroupToPartitions;
   }
 }
diff --git a/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/index/bloom/HoodieBloomFilterProbingResult.java b/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/index/bloom/HoodieBloomFilterProbingResult.java
new file mode 100644
index 00000000000..c124f8b27b8
--- /dev/null
+++ b/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/index/bloom/HoodieBloomFilterProbingResult.java
@@ -0,0 +1,35 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.hudi.index.bloom;
+
+import java.util.List;
+
+class HoodieBloomFilterProbingResult {
+
+  private final List<String> candidateKeys;
+
+  HoodieBloomFilterProbingResult(List<String> candidateKeys) {
+    this.candidateKeys = candidateKeys;
+  }
+
+  public List<String> getCandidateKeys() {
+    return candidateKeys;
+  }
+}
diff --git a/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/index/bloom/HoodieBloomIndexCheckFunction.java b/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/index/bloom/HoodieBloomIndexCheckFunction.java
deleted file mode 100644
index e19a429ea72..00000000000
--- a/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/index/bloom/HoodieBloomIndexCheckFunction.java
+++ /dev/null
@@ -1,120 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *      http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.hudi.index.bloom;
-
-import org.apache.hudi.client.utils.LazyIterableIterator;
-import org.apache.hudi.common.model.HoodieKey;
-import org.apache.hudi.common.util.collection.Pair;
-import org.apache.hudi.config.HoodieWriteConfig;
-import org.apache.hudi.exception.HoodieException;
-import org.apache.hudi.exception.HoodieIndexException;
-import org.apache.hudi.io.HoodieKeyLookupHandle;
-import org.apache.hudi.io.HoodieKeyLookupResult;
-import org.apache.hudi.table.HoodieTable;
-
-import org.apache.spark.api.java.function.Function2;
-
-import java.util.ArrayList;
-import java.util.Iterator;
-import java.util.List;
-
-import scala.Tuple2;
-
-/**
- * Function performing actual checking of RDD partition containing (fileId, hoodieKeys) against the actual files.
- */
-public class HoodieBloomIndexCheckFunction
-    implements Function2<Integer, Iterator<Tuple2<String, HoodieKey>>, Iterator<List<HoodieKeyLookupResult>>> {
-
-  private final HoodieTable hoodieTable;
-
-  private final HoodieWriteConfig config;
-
-  public HoodieBloomIndexCheckFunction(HoodieTable hoodieTable, HoodieWriteConfig config) {
-    this.hoodieTable = hoodieTable;
-    this.config = config;
-  }
-
-  @Override
-  public Iterator<List<HoodieKeyLookupResult>> call(Integer partition,
-                                                    Iterator<Tuple2<String, HoodieKey>> filePartitionRecordKeyTripletItr) {
-    return new LazyKeyCheckIterator(filePartitionRecordKeyTripletItr);
-  }
-
-  class LazyKeyCheckIterator extends LazyIterableIterator<Tuple2<String, HoodieKey>, List<HoodieKeyLookupResult>> {
-
-    private HoodieKeyLookupHandle keyLookupHandle;
-
-    LazyKeyCheckIterator(Iterator<Tuple2<String, HoodieKey>> filePartitionRecordKeyTripletItr) {
-      super(filePartitionRecordKeyTripletItr);
-    }
-
-    @Override
-    protected void start() {
-    }
-
-    @Override
-    protected List<HoodieKeyLookupResult> computeNext() {
-
-      List<HoodieKeyLookupResult> ret = new ArrayList<>();
-      try {
-        // process one file in each go.
-        while (inputItr.hasNext()) {
-          Tuple2<String, HoodieKey> currentTuple = inputItr.next();
-          String fileId = currentTuple._1;
-          String partitionPath = currentTuple._2.getPartitionPath();
-          String recordKey = currentTuple._2.getRecordKey();
-          Pair<String, String> partitionPathFilePair = Pair.of(partitionPath, fileId);
-
-          // lazily init state
-          if (keyLookupHandle == null) {
-            keyLookupHandle = new HoodieKeyLookupHandle(config, hoodieTable, partitionPathFilePair);
-          }
-
-          // if continue on current file
-          if (keyLookupHandle.getPartitionPathFileIDPair().equals(partitionPathFilePair)) {
-            keyLookupHandle.addKey(recordKey);
-          } else {
-            // do the actual checking of file & break out
-            ret.add(keyLookupHandle.getLookupResult());
-            keyLookupHandle = new HoodieKeyLookupHandle(config, hoodieTable, partitionPathFilePair);
-            keyLookupHandle.addKey(recordKey);
-            break;
-          }
-        }
-
-        // handle case, where we ran out of input, close pending work, update return val
-        if (!inputItr.hasNext()) {
-          ret.add(keyLookupHandle.getLookupResult());
-        }
-      } catch (Throwable e) {
-        if (e instanceof HoodieException) {
-          throw e;
-        }
-        throw new HoodieIndexException("Error checking bloom filter index. ", e);
-      }
-
-      return ret;
-    }
-
-    @Override
-    protected void end() {
-    }
-  }
-}
diff --git a/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/index/bloom/HoodieFileProbingFunction.java b/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/index/bloom/HoodieFileProbingFunction.java
new file mode 100644
index 00000000000..0809042c9fb
--- /dev/null
+++ b/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/index/bloom/HoodieFileProbingFunction.java
@@ -0,0 +1,143 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hudi.index.bloom;
+
+import org.apache.hadoop.fs.Path;
+import org.apache.hudi.client.utils.LazyIterableIterator;
+import org.apache.hudi.common.config.SerializableConfiguration;
+import org.apache.hudi.common.fs.FSUtils;
+import org.apache.hudi.common.model.HoodieBaseFile;
+import org.apache.hudi.common.model.HoodieFileGroupId;
+import org.apache.hudi.common.table.view.HoodieTableFileSystemView;
+import org.apache.hudi.common.util.Option;
+import org.apache.hudi.common.util.ValidationUtils;
+import org.apache.hudi.common.util.collection.Pair;
+import org.apache.hudi.exception.HoodieIndexException;
+import org.apache.hudi.index.HoodieIndexUtils;
+import org.apache.hudi.io.HoodieKeyLookupResult;
+import org.apache.log4j.LogManager;
+import org.apache.log4j.Logger;
+import org.apache.spark.api.java.function.FlatMapFunction;
+import org.apache.spark.broadcast.Broadcast;
+import scala.Tuple2;
+
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Map;
+import java.util.stream.Collectors;
+
+/**
+ * Implementation of the function probing filtered in candidate keys provided in
+ * {@link HoodieBloomFilterProbingResult} w/in corresponding files identified by {@link HoodieFileGroupId}
+ * to validate whether the record w/ the provided key is indeed persisted in it
+ */
+public class HoodieFileProbingFunction implements
+    FlatMapFunction<Iterator<Tuple2<HoodieFileGroupId, HoodieBloomFilterProbingResult>>, List<HoodieKeyLookupResult>> {
+
+  private static final Logger LOG = LogManager.getLogger(HoodieFileProbingFunction.class);
+
+  // Assuming each file bloom filter takes up 512K, sizing the max file count
+  // per batch so that the total fetched bloom filters would not cross 128 MB.
+  private static final long BLOOM_FILTER_CHECK_MAX_FILE_COUNT_PER_BATCH = 256;
+
+  private final Broadcast<HoodieTableFileSystemView> baseFileOnlyViewBroadcast;
+  private final SerializableConfiguration hadoopConf;
+
+  public HoodieFileProbingFunction(Broadcast<HoodieTableFileSystemView> baseFileOnlyViewBroadcast,
+                                   SerializableConfiguration hadoopConf) {
+    this.baseFileOnlyViewBroadcast = baseFileOnlyViewBroadcast;
+    this.hadoopConf = hadoopConf;
+  }
+
+  @Override
+  public Iterator<List<HoodieKeyLookupResult>> call(Iterator<Tuple2<HoodieFileGroupId, HoodieBloomFilterProbingResult>> tuple2Iterator) throws Exception {
+    return new BloomIndexLazyKeyCheckIterator(tuple2Iterator);
+  }
+
+  private class BloomIndexLazyKeyCheckIterator
+      extends LazyIterableIterator<Tuple2<HoodieFileGroupId, HoodieBloomFilterProbingResult>, List<HoodieKeyLookupResult>> {
+
+    public BloomIndexLazyKeyCheckIterator(Iterator<Tuple2<HoodieFileGroupId, HoodieBloomFilterProbingResult>> tuple2Iterator) {
+      super(tuple2Iterator);
+    }
+
+    @Override
+    protected List<HoodieKeyLookupResult> computeNext() {
+      // Partition path and file name pair to list of keys
+      final Map<Pair<String, String>, HoodieBloomFilterProbingResult> fileToLookupResults = new HashMap<>();
+      final Map<String, HoodieBaseFile> fileIDBaseFileMap = new HashMap<>();
+
+      while (inputItr.hasNext()) {
+        Tuple2<HoodieFileGroupId, HoodieBloomFilterProbingResult> entry = inputItr.next();
+        final String partitionPath = entry._1.getPartitionPath();
+        final String fileId = entry._1.getFileId();
+
+        if (!fileIDBaseFileMap.containsKey(fileId)) {
+          Option<HoodieBaseFile> baseFile =
+              baseFileOnlyViewBroadcast.getValue().getLatestBaseFile(partitionPath, fileId);
+          if (!baseFile.isPresent()) {
+            throw new HoodieIndexException("Failed to find the base file for partition: " + partitionPath
+                + ", fileId: " + fileId);
+          }
+
+          fileIDBaseFileMap.put(fileId, baseFile.get());
+        }
+
+        fileToLookupResults.putIfAbsent(Pair.of(partitionPath, fileIDBaseFileMap.get(fileId).getFileName()), entry._2);
+
+        if (fileToLookupResults.size() > BLOOM_FILTER_CHECK_MAX_FILE_COUNT_PER_BATCH) {
+          break;
+        }
+      }
+
+      if (fileToLookupResults.isEmpty()) {
+        return Collections.emptyList();
+      }
+
+      return fileToLookupResults.entrySet().stream()
+          .map(entry -> {
+            Pair<String, String> partitionPathFileNamePair = entry.getKey();
+            HoodieBloomFilterProbingResult bloomFilterKeyLookupResult = entry.getValue();
+
+            final String partitionPath = partitionPathFileNamePair.getLeft();
+            final String fileName = partitionPathFileNamePair.getRight();
+            final String fileId = FSUtils.getFileId(fileName);
+            ValidationUtils.checkState(!fileId.isEmpty());
+
+            List<String> candidateRecordKeys = bloomFilterKeyLookupResult.getCandidateKeys();
+
+            // TODO add assertion that file is checked only once
+
+            final HoodieBaseFile dataFile = fileIDBaseFileMap.get(fileId);
+            List<String> matchingKeys = HoodieIndexUtils.filterKeysFromFile(new Path(dataFile.getPath()),
+                candidateRecordKeys, hadoopConf.get());
+
+            LOG.debug(
+                String.format("Bloom filter candidates (%d) / false positives (%d), actual matches (%d)",
+                    candidateRecordKeys.size(), candidateRecordKeys.size() - matchingKeys.size(), matchingKeys.size()));
+
+            return new HoodieKeyLookupResult(fileId, partitionPath, dataFile.getCommitTime(), matchingKeys);
+          })
+          .collect(Collectors.toList());
+    }
+
+  }
+}
diff --git a/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/index/bloom/HoodieMetadataBloomFilterProbingFunction.java b/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/index/bloom/HoodieMetadataBloomFilterProbingFunction.java
new file mode 100644
index 00000000000..406be816500
--- /dev/null
+++ b/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/index/bloom/HoodieMetadataBloomFilterProbingFunction.java
@@ -0,0 +1,157 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hudi.index.bloom;
+
+import org.apache.hudi.client.utils.LazyIterableIterator;
+import org.apache.hudi.common.bloom.BloomFilter;
+import org.apache.hudi.common.fs.FSUtils;
+import org.apache.hudi.common.model.HoodieBaseFile;
+import org.apache.hudi.common.model.HoodieFileGroupId;
+import org.apache.hudi.common.model.HoodieKey;
+import org.apache.hudi.common.table.view.HoodieTableFileSystemView;
+import org.apache.hudi.common.util.Option;
+import org.apache.hudi.common.util.ValidationUtils;
+import org.apache.hudi.common.util.collection.FlatteningIterator;
+import org.apache.hudi.common.util.collection.Pair;
+import org.apache.hudi.exception.HoodieIndexException;
+import org.apache.hudi.metadata.HoodieBackedTableMetadata;
+import org.apache.hudi.table.HoodieTable;
+import org.apache.log4j.LogManager;
+import org.apache.log4j.Logger;
+import org.apache.spark.api.java.function.PairFlatMapFunction;
+import org.apache.spark.broadcast.Broadcast;
+import scala.Tuple2;
+
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Map;
+
+/**
+ * Implementation of the function that probing Bloom Filters of individual files verifying
+ * whether particular record key could be stored in the latest file-slice of the file-group
+ * identified by the {@link HoodieFileGroupId}
+ */
+public class HoodieMetadataBloomFilterProbingFunction implements
+    PairFlatMapFunction<Iterator<Tuple2<HoodieFileGroupId, String>>, HoodieFileGroupId, HoodieBloomFilterProbingResult> {
+
+  private static final Logger LOG = LogManager.getLogger(HoodieMetadataBloomFilterProbingFunction.class);
+
+  // Assuming each file bloom filter takes up 512K, sizing the max file count
+  // per batch so that the total fetched bloom filters would not cross 128 MB.
+  private static final long BLOOM_FILTER_CHECK_MAX_FILE_COUNT_PER_BATCH = 256;
+  private final HoodieTable hoodieTable;
+
+  private final Broadcast<HoodieTableFileSystemView> baseFileOnlyViewBroadcast;
+
+  /**
+   * NOTE: It's critical for this ctor to accept {@link HoodieTable} to make sure that it uses
+   *       broadcast-ed instance of {@link HoodieBackedTableMetadata} internally, instead of
+   *       one being serialized and deserialized for _every_ task individually
+   *
+   * NOTE: We pass in broadcasted {@link HoodieTableFileSystemView} to make sure it's materialized
+   *       on executor once
+   */
+  public HoodieMetadataBloomFilterProbingFunction(Broadcast<HoodieTableFileSystemView> baseFileOnlyViewBroadcast,
+                                                  HoodieTable hoodieTable) {
+    this.baseFileOnlyViewBroadcast = baseFileOnlyViewBroadcast;
+    this.hoodieTable = hoodieTable;
+  }
+
+  @Override
+  public Iterator<Tuple2<HoodieFileGroupId, HoodieBloomFilterProbingResult>> call(Iterator<Tuple2<HoodieFileGroupId, String>> tuple2Iterator) throws Exception {
+    return new FlatteningIterator<>(new BloomIndexLazyKeyCheckIterator(tuple2Iterator));
+  }
+
+  private class BloomIndexLazyKeyCheckIterator
+      extends LazyIterableIterator<Tuple2<HoodieFileGroupId, String>, Iterator<Tuple2<HoodieFileGroupId, HoodieBloomFilterProbingResult>>> {
+
+    public BloomIndexLazyKeyCheckIterator(Iterator<Tuple2<HoodieFileGroupId, String>> tuple2Iterator) {
+      super(tuple2Iterator);
+    }
+
+    @Override
+    protected Iterator<Tuple2<HoodieFileGroupId, HoodieBloomFilterProbingResult>> computeNext() {
+      // Partition path and file name pair to list of keys
+      final Map<Pair<String, String>, List<HoodieKey>> fileToKeysMap = new HashMap<>();
+      final Map<String, HoodieBaseFile> fileIDBaseFileMap = new HashMap<>();
+
+      while (inputItr.hasNext()) {
+        Tuple2<HoodieFileGroupId, String> entry = inputItr.next();
+        String partitionPath = entry._1.getPartitionPath();
+        String fileId = entry._1.getFileId();
+
+        if (!fileIDBaseFileMap.containsKey(fileId)) {
+          Option<HoodieBaseFile> baseFile = baseFileOnlyViewBroadcast.getValue().getLatestBaseFile(partitionPath, fileId);
+          if (!baseFile.isPresent()) {
+            throw new HoodieIndexException("Failed to find the base file for partition: " + partitionPath
+                + ", fileId: " + fileId);
+          }
+          fileIDBaseFileMap.put(fileId, baseFile.get());
+        }
+
+        fileToKeysMap.computeIfAbsent(Pair.of(partitionPath, fileIDBaseFileMap.get(fileId).getFileName()),
+            k -> new ArrayList<>()).add(new HoodieKey(entry._2, partitionPath));
+
+        if (fileToKeysMap.size() > BLOOM_FILTER_CHECK_MAX_FILE_COUNT_PER_BATCH) {
+          break;
+        }
+      }
+
+      if (fileToKeysMap.isEmpty()) {
+        return Collections.emptyIterator();
+      }
+
+      List<Pair<String, String>> partitionNameFileNameList = new ArrayList<>(fileToKeysMap.keySet());
+      Map<Pair<String, String>, BloomFilter> fileToBloomFilterMap =
+          hoodieTable.getMetadataTable().getBloomFilters(partitionNameFileNameList);
+
+      return fileToKeysMap.entrySet().stream()
+          .map(entry -> {
+            Pair<String, String> partitionPathFileNamePair = entry.getKey();
+            List<HoodieKey> hoodieKeyList = entry.getValue();
+
+            final String partitionPath = partitionPathFileNamePair.getLeft();
+            final String fileName = partitionPathFileNamePair.getRight();
+            final String fileId = FSUtils.getFileId(fileName);
+            ValidationUtils.checkState(!fileId.isEmpty());
+
+            if (!fileToBloomFilterMap.containsKey(partitionPathFileNamePair)) {
+              throw new HoodieIndexException("Failed to get the bloom filter for " + partitionPathFileNamePair);
+            }
+            final BloomFilter fileBloomFilter = fileToBloomFilterMap.get(partitionPathFileNamePair);
+
+            List<String> candidateRecordKeys = new ArrayList<>();
+            hoodieKeyList.forEach(hoodieKey -> {
+              if (fileBloomFilter.mightContain(hoodieKey.getRecordKey())) {
+                candidateRecordKeys.add(hoodieKey.getRecordKey());
+              }
+            });
+
+            LOG.debug(String.format("Total records (%d), bloom filter candidates (%d)",
+                hoodieKeyList.size(), candidateRecordKeys.size()));
+
+            return Tuple2.apply(new HoodieFileGroupId(partitionPath, fileId), new HoodieBloomFilterProbingResult(candidateRecordKeys));
+          })
+          .iterator();
+    }
+  }
+}
diff --git a/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/index/bloom/HoodieMetadataBloomIndexCheckFunction.java b/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/index/bloom/HoodieMetadataBloomIndexCheckFunction.java
deleted file mode 100644
index 8a2958eab9d..00000000000
--- a/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/index/bloom/HoodieMetadataBloomIndexCheckFunction.java
+++ /dev/null
@@ -1,154 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *      http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.hudi.index.bloom;
-
-import org.apache.hadoop.fs.Path;
-import org.apache.hudi.client.utils.LazyIterableIterator;
-import org.apache.hudi.common.bloom.BloomFilter;
-import org.apache.hudi.common.fs.FSUtils;
-import org.apache.hudi.common.model.HoodieBaseFile;
-import org.apache.hudi.common.model.HoodieKey;
-import org.apache.hudi.common.util.Option;
-import org.apache.hudi.common.util.ValidationUtils;
-import org.apache.hudi.common.util.collection.Pair;
-import org.apache.hudi.exception.HoodieIndexException;
-import org.apache.hudi.index.HoodieIndexUtils;
-import org.apache.hudi.io.HoodieKeyLookupResult;
-import org.apache.hudi.table.HoodieTable;
-import org.apache.log4j.LogManager;
-import org.apache.log4j.Logger;
-import org.apache.spark.api.java.function.Function2;
-import scala.Tuple2;
-
-import java.util.ArrayList;
-import java.util.Collections;
-import java.util.HashMap;
-import java.util.Iterator;
-import java.util.List;
-import java.util.Map;
-import java.util.concurrent.atomic.AtomicInteger;
-
-/**
- * Spark Function2 implementation for checking bloom filters for the
- * requested keys from the metadata table index. The bloom filter
- * checking for keys and the actual file verification for the
- * candidate keys is done in an iterative fashion. In each iteration,
- * bloom filters are requested for a batch of partition files and the
- * keys are checked against them.
- */
-public class HoodieMetadataBloomIndexCheckFunction implements
-    Function2<Integer, Iterator<Tuple2<String, HoodieKey>>, Iterator<List<HoodieKeyLookupResult>>> {
-
-  private static final Logger LOG = LogManager.getLogger(HoodieMetadataBloomIndexCheckFunction.class);
-
-  // Assuming each file bloom filter takes up 512K, sizing the max file count
-  // per batch so that the total fetched bloom filters would not cross 128 MB.
-  private static final long BLOOM_FILTER_CHECK_MAX_FILE_COUNT_PER_BATCH = 256;
-  private final HoodieTable hoodieTable;
-
-  public HoodieMetadataBloomIndexCheckFunction(HoodieTable hoodieTable) {
-    this.hoodieTable = hoodieTable;
-  }
-
-  @Override
-  public Iterator<List<HoodieKeyLookupResult>> call(Integer integer, Iterator<Tuple2<String, HoodieKey>> tuple2Iterator) throws Exception {
-    return new BloomIndexLazyKeyCheckIterator(tuple2Iterator);
-  }
-
-  private class BloomIndexLazyKeyCheckIterator extends LazyIterableIterator<Tuple2<String, HoodieKey>, List<HoodieKeyLookupResult>> {
-    public BloomIndexLazyKeyCheckIterator(Iterator<Tuple2<String, HoodieKey>> tuple2Iterator) {
-      super(tuple2Iterator);
-    }
-
-    @Override
-    protected void start() {
-    }
-
-    @Override
-    protected List<HoodieKeyLookupResult> computeNext() {
-      // Partition path and file name pair to list of keys
-      final Map<Pair<String, String>, List<HoodieKey>> fileToKeysMap = new HashMap<>();
-      final Map<String, HoodieBaseFile> fileIDBaseFileMap = new HashMap<>();
-      final List<HoodieKeyLookupResult> resultList = new ArrayList<>();
-
-      while (inputItr.hasNext()) {
-        Tuple2<String, HoodieKey> entry = inputItr.next();
-        final String partitionPath = entry._2.getPartitionPath();
-        final String fileId = entry._1;
-        if (!fileIDBaseFileMap.containsKey(fileId)) {
-          Option<HoodieBaseFile> baseFile = hoodieTable.getBaseFileOnlyView().getLatestBaseFile(partitionPath, fileId);
-          if (!baseFile.isPresent()) {
-            throw new HoodieIndexException("Failed to find the base file for partition: " + partitionPath
-                + ", fileId: " + fileId);
-          }
-          fileIDBaseFileMap.put(fileId, baseFile.get());
-        }
-        fileToKeysMap.computeIfAbsent(Pair.of(partitionPath, fileIDBaseFileMap.get(fileId).getFileName()),
-            k -> new ArrayList<>()).add(entry._2);
-        if (fileToKeysMap.size() > BLOOM_FILTER_CHECK_MAX_FILE_COUNT_PER_BATCH) {
-          break;
-        }
-      }
-      if (fileToKeysMap.isEmpty()) {
-        return Collections.emptyList();
-      }
-
-      List<Pair<String, String>> partitionNameFileNameList = new ArrayList<>(fileToKeysMap.keySet());
-      Map<Pair<String, String>, BloomFilter> fileToBloomFilterMap =
-          hoodieTable.getMetadataTable().getBloomFilters(partitionNameFileNameList);
-
-      final AtomicInteger totalKeys = new AtomicInteger(0);
-      fileToKeysMap.forEach((partitionPathFileNamePair, hoodieKeyList) -> {
-        final String partitionPath = partitionPathFileNamePair.getLeft();
-        final String fileName = partitionPathFileNamePair.getRight();
-        final String fileId = FSUtils.getFileId(fileName);
-        ValidationUtils.checkState(!fileId.isEmpty());
-
-        if (!fileToBloomFilterMap.containsKey(partitionPathFileNamePair)) {
-          throw new HoodieIndexException("Failed to get the bloom filter for " + partitionPathFileNamePair);
-        }
-        final BloomFilter fileBloomFilter = fileToBloomFilterMap.get(partitionPathFileNamePair);
-
-        List<String> candidateRecordKeys = new ArrayList<>();
-        hoodieKeyList.forEach(hoodieKey -> {
-          totalKeys.incrementAndGet();
-          if (fileBloomFilter.mightContain(hoodieKey.getRecordKey())) {
-            candidateRecordKeys.add(hoodieKey.getRecordKey());
-          }
-        });
-
-        final HoodieBaseFile dataFile = fileIDBaseFileMap.get(fileId);
-        List<String> matchingKeys =
-            HoodieIndexUtils.filterKeysFromFile(new Path(dataFile.getPath()), candidateRecordKeys,
-                hoodieTable.getHadoopConf());
-        LOG.debug(
-            String.format("Total records (%d), bloom filter candidates (%d)/fp(%d), actual matches (%d)",
-                hoodieKeyList.size(), candidateRecordKeys.size(),
-                candidateRecordKeys.size() - matchingKeys.size(), matchingKeys.size()));
-
-        resultList.add(new HoodieKeyLookupResult(fileId, partitionPath, dataFile.getCommitTime(), matchingKeys));
-      });
-      return resultList;
-    }
-
-    @Override
-    protected void end() {
-    }
-  }
-}
diff --git a/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/index/bloom/SparkHoodieBloomIndexHelper.java b/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/index/bloom/SparkHoodieBloomIndexHelper.java
index 5736024dc24..265b0507768 100644
--- a/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/index/bloom/SparkHoodieBloomIndexHelper.java
+++ b/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/index/bloom/SparkHoodieBloomIndexHelper.java
@@ -19,30 +19,47 @@
 
 package org.apache.hudi.index.bloom;
 
-import org.apache.hudi.common.data.HoodieData;
+import org.apache.hadoop.fs.FileStatus;
+import org.apache.hudi.client.common.HoodieSparkEngineContext;
+import org.apache.hudi.common.config.SerializableConfiguration;
 import org.apache.hudi.common.data.HoodiePairData;
 import org.apache.hudi.common.engine.HoodieEngineContext;
+import org.apache.hudi.common.model.BaseFile;
+import org.apache.hudi.common.model.HoodieBaseFile;
+import org.apache.hudi.common.model.HoodieFileGroupId;
 import org.apache.hudi.common.model.HoodieKey;
 import org.apache.hudi.common.model.HoodieRecordLocation;
+import org.apache.hudi.common.table.view.HoodieTableFileSystemView;
 import org.apache.hudi.common.util.collection.Pair;
+import org.apache.hudi.common.util.hash.FileIndexID;
+import org.apache.hudi.common.util.hash.PartitionIndexID;
 import org.apache.hudi.config.HoodieWriteConfig;
 import org.apache.hudi.data.HoodieJavaPairRDD;
 import org.apache.hudi.data.HoodieJavaRDD;
+import org.apache.hudi.exception.HoodieException;
+import org.apache.hudi.exception.HoodieIOException;
 import org.apache.hudi.io.HoodieKeyLookupResult;
 import org.apache.hudi.table.HoodieTable;
-
 import org.apache.log4j.LogManager;
 import org.apache.log4j.Logger;
 import org.apache.spark.Partitioner;
+import org.apache.spark.api.java.JavaPairRDD;
 import org.apache.spark.api.java.JavaRDD;
+import org.apache.spark.api.java.function.FlatMapFunction;
+import org.apache.spark.broadcast.Broadcast;
+import scala.Tuple2;
 
+import java.io.IOException;
+import java.util.Arrays;
+import java.util.Collection;
 import java.util.HashMap;
+import java.util.Iterator;
 import java.util.List;
 import java.util.Map;
 import java.util.stream.Collectors;
 
-import scala.Tuple2;
-
+import static org.apache.hudi.metadata.HoodieMetadataPayload.getBloomFilterIndexKey;
+import static org.apache.hudi.metadata.HoodieTableMetadataUtil.mapRecordKeyToFileGroupIndex;
 import static org.apache.hudi.metadata.MetadataPartitionType.BLOOM_FILTERS;
 
 /**
@@ -55,8 +72,7 @@ public class SparkHoodieBloomIndexHelper extends BaseHoodieBloomIndexHelper {
   private static final SparkHoodieBloomIndexHelper SINGLETON_INSTANCE =
       new SparkHoodieBloomIndexHelper();
 
-  private SparkHoodieBloomIndexHelper() {
-  }
+  private SparkHoodieBloomIndexHelper() {}
 
   public static SparkHoodieBloomIndexHelper getInstance() {
     return SINGLETON_INSTANCE;
@@ -66,42 +82,92 @@ public class SparkHoodieBloomIndexHelper extends BaseHoodieBloomIndexHelper {
   public HoodiePairData<HoodieKey, HoodieRecordLocation> findMatchingFilesForRecordKeys(
       HoodieWriteConfig config, HoodieEngineContext context, HoodieTable hoodieTable,
       HoodiePairData<String, String> partitionRecordKeyPairs,
-      HoodieData<Pair<String, HoodieKey>> fileComparisonPairs,
+      HoodiePairData<HoodieFileGroupId, String> fileComparisonPairs,
       Map<String, List<BloomIndexFileInfo>> partitionToFileInfo,
       Map<String, Long> recordsPerPartition) {
-    JavaRDD<Tuple2<String, HoodieKey>> fileComparisonsRDD =
-        HoodieJavaRDD.getJavaRDD(fileComparisonPairs)
-            .map(pair -> new Tuple2<>(pair.getLeft(), pair.getRight()));
 
-    int inputParallelism = HoodieJavaPairRDD.getJavaPairRDD(partitionRecordKeyPairs).partitions().size();
-    int joinParallelism = Math.max(inputParallelism, config.getBloomIndexParallelism());
-    LOG.info("InputParallelism: ${" + inputParallelism + "}, IndexParallelism: ${"
-        + config.getBloomIndexParallelism() + "}");
+    int inputParallelism = HoodieJavaPairRDD.getJavaPairRDD(partitionRecordKeyPairs).getNumPartitions();
+    int configuredBloomIndexParallelism = config.getBloomIndexParallelism();
 
+    // NOTE: Target parallelism could be overridden by the config
+    int targetParallelism =
+        configuredBloomIndexParallelism > 0 ? configuredBloomIndexParallelism : inputParallelism;
+
+    LOG.info(String.format("Input parallelism: %d, Index parallelism: %d", inputParallelism, targetParallelism));
+
+    JavaPairRDD<HoodieFileGroupId, String> fileComparisonsRDD = HoodieJavaRDD.getJavaRDD(fileComparisonPairs);
     JavaRDD<List<HoodieKeyLookupResult>> keyLookupResultRDD;
+
     if (config.getBloomIndexUseMetadata()
         && hoodieTable.getMetaClient().getTableConfig().getMetadataPartitions()
         .contains(BLOOM_FILTERS.getPartitionPath())) {
-      // Step 1: Sort by file id
-      JavaRDD<Tuple2<String, HoodieKey>> sortedFileIdAndKeyPairs =
-          fileComparisonsRDD.sortBy(Tuple2::_1, true, joinParallelism);
+      SerializableConfiguration hadoopConf = new SerializableConfiguration(hoodieTable.getHadoopConf());
+
+      HoodieTableFileSystemView baseFileOnlyView =
+          getBaseFileOnlyView(hoodieTable, partitionToFileInfo.keySet());
+
+      Broadcast<HoodieTableFileSystemView> baseFileOnlyViewBroadcast =
+          ((HoodieSparkEngineContext) context).getJavaSparkContext().broadcast(baseFileOnlyView);
+
+      // When leveraging MT we're aiming for following goals:
+      //    - (G1) All requests to MT are made in batch (ie we're trying to fetch all the values
+      //      for corresponding keys at once)
+      //    - (G2) Each task reads no more than just _one_ file-group from the MT Bloom Filters
+      //    partition
+      //
+      // Ta achieve G2, following invariant have to be maintained: Spark partitions have to be
+      // affine w/ Metadata Table's file-groups, meaning that each Spark partition holds records
+      // belonging to one and only file-group in MT Bloom Filters partition. To provide for that
+      // we need to make sure
+      //    - Spark's used [[Partitioner]] employs same hashing function as Metadata Table (as well
+      //      as being applied to the same keys as the MT one)
+      //    - Make sure that # of partitions is congruent to the # of file-groups (ie number of Spark
+      //    partitions is a multiple of the # of the file-groups).
+      //
+      //    Last provision is necessary, so that for every key it's the case that
+      //
+      //        (hash(key) % N) % M = hash(key) % M, iff N % M = 0
+      //
+      //    Let's take an example of N = 8 and M = 4 (default # of file-groups in Bloom Filter
+      //    partition). In that case Spark partitions for which `hash(key) % N` will be either 0
+      //    or 4, will map to the same (first) file-group in MT
+      int bloomFilterPartitionFileGroupCount =
+          config.getMetadataConfig().getBloomFilterIndexFileGroupCount();
+      int adjustedTargetParallelism =
+          targetParallelism % bloomFilterPartitionFileGroupCount == 0
+              ? targetParallelism
+              // NOTE: We add 1 to make sure parallelism a) value always stays positive and b)
+              //       {@code targetParallelism <= adjustedTargetParallelism}
+              : (targetParallelism / bloomFilterPartitionFileGroupCount + 1) * bloomFilterPartitionFileGroupCount;
+
+      AffineBloomIndexFileGroupPartitioner partitioner =
+          new AffineBloomIndexFileGroupPartitioner(baseFileOnlyViewBroadcast, adjustedTargetParallelism);
+
+      // First, we need to repartition and sort records using [[AffineBloomIndexFileGroupPartitioner]]
+      // to make sure every Spark task accesses no more than just a single file-group in MT (allows
+      // us to achieve G2).
+      //
+      // NOTE: Sorting records w/in individual partitions is required to make sure that we cluster
+      //       together keys co-located w/in the MT files (sorted by keys)
+      keyLookupResultRDD = fileComparisonsRDD.repartitionAndSortWithinPartitions(partitioner)
+          .mapPartitionsToPair(new HoodieMetadataBloomFilterProbingFunction(baseFileOnlyViewBroadcast, hoodieTable))
+          // Second, we use [[HoodieFileProbingFunction]] to open actual file and check whether it
+          // contains the records with candidate keys that were filtered in by the Bloom Filter
+          .mapPartitions(new HoodieFileProbingFunction(baseFileOnlyViewBroadcast, hadoopConf), true);
 
-      // Step 2: Use bloom filter to filter and the actual log file to get the record location
-      keyLookupResultRDD = sortedFileIdAndKeyPairs.mapPartitionsWithIndex(
-          new HoodieMetadataBloomIndexCheckFunction(hoodieTable), true);
     } else if (config.useBloomIndexBucketizedChecking()) {
-      Map<String, Long> comparisonsPerFileGroup = computeComparisonsPerFileGroup(
+      Map<HoodieFileGroupId, Long> comparisonsPerFileGroup = computeComparisonsPerFileGroup(
           config, recordsPerPartition, partitionToFileInfo, fileComparisonsRDD, context);
-      Partitioner partitioner = new BucketizedBloomCheckPartitioner(joinParallelism, comparisonsPerFileGroup,
+      Partitioner partitioner = new BucketizedBloomCheckPartitioner(targetParallelism, comparisonsPerFileGroup,
           config.getBloomIndexKeysPerBucket());
 
-      keyLookupResultRDD = fileComparisonsRDD.mapToPair(t -> new Tuple2<>(Pair.of(t._1, t._2.getRecordKey()), t))
+      keyLookupResultRDD = fileComparisonsRDD.mapToPair(t -> new Tuple2<>(Pair.of(t._1, t._2), t))
           .repartitionAndSortWithinPartitions(partitioner)
           .map(Tuple2::_2)
-          .mapPartitionsWithIndex(new HoodieBloomIndexCheckFunction(hoodieTable, config), true);
+          .mapPartitions(new HoodieSparkBloomIndexCheckFunction(hoodieTable, config), true);
     } else {
-      keyLookupResultRDD = fileComparisonsRDD.sortBy(Tuple2::_1, true, joinParallelism)
-          .mapPartitionsWithIndex(new HoodieBloomIndexCheckFunction(hoodieTable, config), true);
+      keyLookupResultRDD = fileComparisonsRDD.sortByKey(true, targetParallelism)
+          .mapPartitions(new HoodieSparkBloomIndexCheckFunction(hoodieTable, config), true);
     }
 
     return HoodieJavaPairRDD.of(keyLookupResultRDD.flatMap(List::iterator)
@@ -115,27 +181,124 @@ public class SparkHoodieBloomIndexHelper extends BaseHoodieBloomIndexHelper {
   /**
    * Compute the estimated number of bloom filter comparisons to be performed on each file group.
    */
-  private Map<String, Long> computeComparisonsPerFileGroup(
+  private Map<HoodieFileGroupId, Long> computeComparisonsPerFileGroup(
       final HoodieWriteConfig config,
       final Map<String, Long> recordsPerPartition,
       final Map<String, List<BloomIndexFileInfo>> partitionToFileInfo,
-      final JavaRDD<Tuple2<String, HoodieKey>> fileComparisonsRDD,
+      final JavaPairRDD<HoodieFileGroupId, String> fileComparisonsRDD,
       final HoodieEngineContext context) {
-    Map<String, Long> fileToComparisons;
+    Map<HoodieFileGroupId, Long> fileToComparisons;
     if (config.getBloomIndexPruneByRanges()) {
       // we will just try exploding the input and then count to determine comparisons
       // FIX(vc): Only do sampling here and extrapolate?
       context.setJobStatus(this.getClass().getSimpleName(), "Compute all comparisons needed between records and files: " + config.getTableName());
-      fileToComparisons = fileComparisonsRDD.mapToPair(t -> t).countByKey();
+      fileToComparisons = fileComparisonsRDD.countByKey();
     } else {
       fileToComparisons = new HashMap<>();
-      partitionToFileInfo.forEach((key, value) -> {
-        for (BloomIndexFileInfo fileInfo : value) {
+      partitionToFileInfo.forEach((partitionPath, fileInfos) -> {
+        for (BloomIndexFileInfo fileInfo : fileInfos) {
           // each file needs to be compared against all the records coming into the partition
-          fileToComparisons.put(fileInfo.getFileId(), recordsPerPartition.get(key));
+          fileToComparisons.put(
+              new HoodieFileGroupId(partitionPath, fileInfo.getFileId()), recordsPerPartition.get(partitionPath));
         }
       });
     }
     return fileToComparisons;
   }
+
+  private static HoodieTableFileSystemView getBaseFileOnlyView(HoodieTable<?, ?, ?, ?> hoodieTable, Collection<String> partitionPaths) {
+    try {
+      List<String> fullPartitionPaths = partitionPaths.stream()
+          .map(partitionPath ->
+              String.format("%s/%s", hoodieTable.getMetaClient().getBasePathV2(), partitionPath))
+          .collect(Collectors.toList());
+
+      FileStatus[] allFiles =
+          hoodieTable.getMetadataTable().getAllFilesInPartitions(fullPartitionPaths).values().stream()
+              .flatMap(Arrays::stream)
+              .toArray(FileStatus[]::new);
+
+      return new HoodieTableFileSystemView(hoodieTable.getMetaClient(), hoodieTable.getActiveTimeline(), allFiles);
+    } catch (IOException e) {
+      LOG.error(String.format("Failed to fetch all files for partitions (%s)", partitionPaths));
+      throw new HoodieIOException("Failed to fetch all files for partitions", e);
+    }
+  }
+
+  static class AffineBloomIndexFileGroupPartitioner extends Partitioner {
+
+    private final Broadcast<HoodieTableFileSystemView> latestBaseFilesBroadcast;
+
+    // TODO(HUDI-5619) remove when addressed
+    private final Map<String, Map<String, String>> cachedLatestBaseFileNames =
+        new HashMap<>(16);
+
+    private final int targetPartitions;
+
+    AffineBloomIndexFileGroupPartitioner(Broadcast<HoodieTableFileSystemView> baseFileOnlyViewBroadcast,
+                                         int targetPartitions) {
+      this.targetPartitions = targetPartitions;
+      this.latestBaseFilesBroadcast = baseFileOnlyViewBroadcast;
+    }
+
+    @Override
+    public int numPartitions() {
+      return targetPartitions;
+    }
+
+    @Override
+    public int getPartition(Object key) {
+      HoodieFileGroupId partitionFileGroupId = (HoodieFileGroupId) key;
+      String partitionPath = partitionFileGroupId.getPartitionPath();
+      String fileGroupId = partitionFileGroupId.getFileId();
+
+      /*
+      // TODO(HUDI-5619) uncomment when addressed
+      String baseFileName =
+          latestBaseFilesBroadcast.getValue()
+              .getLatestBaseFile(partitionPath, fileGroupId)
+              .orElseThrow(() -> new HoodieException(
+                  String.format("File from file-group (%s) not found in partition path (%s)", fileGroupId, partitionPath)))
+              .getFileName();
+       */
+
+      // NOTE: This is a workaround to alleviate performance impact of needing to process whole
+      //       partition for every file-group being looked up.
+      //       See HUDI-5619 for more details
+      String baseFileName = cachedLatestBaseFileNames.computeIfAbsent(partitionPath, ignored ->
+              latestBaseFilesBroadcast.getValue()
+                  .getLatestBaseFiles(partitionPath)
+                  .collect(
+                      Collectors.toMap(HoodieBaseFile::getFileId, BaseFile::getFileName)
+                  )
+          )
+          .get(fileGroupId);
+
+      if (baseFileName == null) {
+        throw new HoodieException(
+            String.format("File from file-group (%s) not found in partition path (%s)", fileGroupId, partitionPath));
+      }
+
+      String bloomIndexEncodedKey =
+          getBloomFilterIndexKey(new PartitionIndexID(partitionPath), new FileIndexID(baseFileName));
+
+      // NOTE: It's crucial that [[targetPartitions]] be congruent w/ the number of
+      //       actual file-groups in the Bloom Index in MT
+      return mapRecordKeyToFileGroupIndex(bloomIndexEncodedKey, targetPartitions);
+    }
+  }
+
+  public static class HoodieSparkBloomIndexCheckFunction extends HoodieBloomIndexCheckFunction<Tuple2<HoodieFileGroupId, String>>
+      implements FlatMapFunction<Iterator<Tuple2<HoodieFileGroupId, String>>, List<HoodieKeyLookupResult>> {
+
+    public HoodieSparkBloomIndexCheckFunction(HoodieTable hoodieTable,
+                                              HoodieWriteConfig config) {
+      super(hoodieTable, config, t -> t._1, t -> t._2);
+    }
+
+    @Override
+    public Iterator<List<HoodieKeyLookupResult>> call(Iterator<Tuple2<HoodieFileGroupId, String>> fileGroupIdRecordKeyPairIterator) {
+      return new LazyKeyCheckIterator(fileGroupIdRecordKeyPairIterator);
+    }
+  }
 }
diff --git a/hudi-client/hudi-spark-client/src/main/scala/org/apache/spark/sql/hudi/SparkAdapter.scala b/hudi-client/hudi-spark-client/src/main/scala/org/apache/spark/sql/hudi/SparkAdapter.scala
index f4edf7fb942..5853b4eb8a8 100644
--- a/hudi-client/hudi-spark-client/src/main/scala/org/apache/spark/sql/hudi/SparkAdapter.scala
+++ b/hudi-client/hudi-spark-client/src/main/scala/org/apache/spark/sql/hudi/SparkAdapter.scala
@@ -23,17 +23,17 @@ import org.apache.hadoop.fs.Path
 import org.apache.hudi.client.utils.SparkRowSerDe
 import org.apache.hudi.common.table.HoodieTableMetaClient
 import org.apache.hudi.common.util.TablePathUtils
+import org.apache.spark.sql._
 import org.apache.spark.sql.avro.{HoodieAvroDeserializer, HoodieAvroSchemaConverters, HoodieAvroSerializer}
-import org.apache.spark.sql.catalyst.{InternalRow, TableIdentifier}
 import org.apache.spark.sql.catalyst.catalog.CatalogTable
 import org.apache.spark.sql.catalyst.expressions.{AttributeReference, Expression, InterpretedPredicate}
 import org.apache.spark.sql.catalyst.parser.ParserInterface
 import org.apache.spark.sql.catalyst.plans.logical.{Command, LogicalPlan, SubqueryAlias}
+import org.apache.spark.sql.catalyst.{InternalRow, TableIdentifier}
 import org.apache.spark.sql.execution.datasources.parquet.ParquetFileFormat
 import org.apache.spark.sql.execution.datasources._
 import org.apache.spark.sql.sources.BaseRelation
 import org.apache.spark.sql.types.{DataType, StructType}
-import org.apache.spark.sql._
 import org.apache.spark.storage.StorageLevel
 
 import java.util.Locale
diff --git a/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/index/bloom/TestBucketizedBloomCheckPartitioner.java b/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/index/bloom/TestBucketizedBloomCheckPartitioner.java
index e946450c904..fe178839c0d 100644
--- a/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/index/bloom/TestBucketizedBloomCheckPartitioner.java
+++ b/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/index/bloom/TestBucketizedBloomCheckPartitioner.java
@@ -18,6 +18,7 @@
 
 package org.apache.hudi.index.bloom;
 
+import org.apache.hudi.common.model.HoodieFileGroupId;
 import org.apache.hudi.common.util.collection.Pair;
 
 import org.junit.jupiter.api.Test;
@@ -36,33 +37,37 @@ public class TestBucketizedBloomCheckPartitioner {
 
   @Test
   public void testAssignmentCorrectness() {
-    Map<String, Long> fileToComparisons = new HashMap<String, Long>() {
+    HoodieFileGroupId fg1 = new HoodieFileGroupId("p1", "f1");
+    HoodieFileGroupId fg2 = new HoodieFileGroupId("p1", "f2");
+    HoodieFileGroupId fg3 = new HoodieFileGroupId("p1", "f3");
+
+    Map<HoodieFileGroupId, Long> fileToComparisons = new HashMap<HoodieFileGroupId, Long>() {
       {
-        put("f1", 40L);
-        put("f2", 35L);
-        put("f3", 20L);
+        put(fg1, 40L);
+        put(fg2, 35L);
+        put(fg3, 20L);
       }
     };
     BucketizedBloomCheckPartitioner p = new BucketizedBloomCheckPartitioner(4, fileToComparisons, 10);
-    Map<String, List<Integer>> assignments = p.getFileGroupToPartitions();
-    assertEquals(4, assignments.get("f1").size(), "f1 should have 4 buckets");
-    assertEquals(4, assignments.get("f2").size(), "f2 should have 4 buckets");
-    assertEquals(2, assignments.get("f3").size(), "f3 should have 2 buckets");
-    assertArrayEquals(new Integer[] {0, 0, 1, 3}, assignments.get("f1").toArray(), "f1 spread across 3 partitions");
-    assertArrayEquals(new Integer[] {1, 2, 2, 0}, assignments.get("f2").toArray(), "f2 spread across 3 partitions");
-    assertArrayEquals(new Integer[] {3, 1}, assignments.get("f3").toArray(), "f3 spread across 2 partitions");
+    Map<HoodieFileGroupId, List<Integer>> assignments = p.getFileGroupToPartitions();
+    assertEquals(4, assignments.get(fg1).size(), "f1 should have 4 buckets");
+    assertEquals(4, assignments.get(fg2).size(), "f2 should have 4 buckets");
+    assertEquals(2, assignments.get(fg3).size(), "f3 should have 2 buckets");
+    assertArrayEquals(new Integer[] {0, 0, 1, 3}, assignments.get(fg1).toArray(), "f1 spread across 3 partitions");
+    assertArrayEquals(new Integer[] {2, 2, 3, 1}, assignments.get(fg2).toArray(), "f2 spread across 3 partitions");
+    assertArrayEquals(new Integer[] {1, 0}, assignments.get(fg3).toArray(), "f3 spread across 2 partitions");
   }
 
   @Test
   public void testUniformPacking() {
     // evenly distribute 10 buckets/file across 100 partitions
-    Map<String, Long> comparisons1 = new HashMap<String, Long>() {
+    Map<HoodieFileGroupId, Long> comparisons1 = new HashMap<HoodieFileGroupId, Long>() {
       {
-        IntStream.range(0, 10).forEach(f -> put("f" + f, 100L));
+        IntStream.range(0, 10).forEach(f -> put(new HoodieFileGroupId("p1", "f" + f), 100L));
       }
     };
     BucketizedBloomCheckPartitioner partitioner = new BucketizedBloomCheckPartitioner(100, comparisons1, 10);
-    Map<String, List<Integer>> assignments = partitioner.getFileGroupToPartitions();
+    Map<HoodieFileGroupId, List<Integer>> assignments = partitioner.getFileGroupToPartitions();
     assignments.forEach((key, value) -> assertEquals(10, value.size()));
     Map<Integer, Long> partitionToNumBuckets =
         assignments.entrySet().stream().flatMap(e -> e.getValue().stream().map(p -> Pair.of(p, e.getKey())))
@@ -72,9 +77,9 @@ public class TestBucketizedBloomCheckPartitioner {
 
   @Test
   public void testNumPartitions() {
-    Map<String, Long> comparisons1 = new HashMap<String, Long>() {
+    Map<HoodieFileGroupId, Long> comparisons1 = new HashMap<HoodieFileGroupId, Long>() {
       {
-        IntStream.range(0, 10).forEach(f -> put("f" + f, 100L));
+        IntStream.range(0, 10).forEach(f -> put(new HoodieFileGroupId("p1", "f" + f), 100L));
       }
     };
     BucketizedBloomCheckPartitioner p = new BucketizedBloomCheckPartitioner(10000, comparisons1, 10);
@@ -83,15 +88,15 @@ public class TestBucketizedBloomCheckPartitioner {
 
   @Test
   public void testGetPartitions() {
-    Map<String, Long> comparisons1 = new HashMap<String, Long>() {
+    Map<HoodieFileGroupId, Long> comparisons1 = new HashMap<HoodieFileGroupId, Long>() {
       {
-        IntStream.range(0, 100000).forEach(f -> put("f" + f, 100L));
+        IntStream.range(0, 100000).forEach(f -> put(new HoodieFileGroupId("p1", "f" + f), 100L));
       }
     };
     BucketizedBloomCheckPartitioner p = new BucketizedBloomCheckPartitioner(1000, comparisons1, 10);
 
     IntStream.range(0, 100000).forEach(f -> {
-      int partition = p.getPartition(Pair.of("f" + f, "value"));
+      int partition = p.getPartition(Pair.of(new HoodieFileGroupId("p1", "f" + f), "value"));
       assertTrue(0 <= partition && partition <= 1000, "partition is out of range: " + partition);
     });
   }
diff --git a/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/index/bloom/TestHoodieBloomIndex.java b/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/index/bloom/TestHoodieBloomIndex.java
index 5be4e4ce624..3c906062c16 100644
--- a/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/index/bloom/TestHoodieBloomIndex.java
+++ b/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/index/bloom/TestHoodieBloomIndex.java
@@ -24,6 +24,7 @@ import org.apache.hudi.common.bloom.BloomFilterFactory;
 import org.apache.hudi.common.bloom.BloomFilterTypeCode;
 import org.apache.hudi.common.config.HoodieMetadataConfig;
 import org.apache.hudi.common.model.HoodieAvroRecord;
+import org.apache.hudi.common.model.HoodieFileGroupId;
 import org.apache.hudi.common.model.HoodieKey;
 import org.apache.hudi.common.model.HoodieRecord;
 import org.apache.hudi.common.model.WriteOperationType;
@@ -35,7 +36,6 @@ import org.apache.hudi.common.util.collection.Pair;
 import org.apache.hudi.config.HoodieIndexConfig;
 import org.apache.hudi.config.HoodieWriteConfig;
 import org.apache.hudi.data.HoodieJavaPairRDD;
-import org.apache.hudi.data.HoodieJavaRDD;
 import org.apache.hudi.index.HoodieIndex;
 import org.apache.hudi.index.HoodieIndexUtils;
 import org.apache.hudi.metadata.SparkHoodieBackedTableMetadataWriter;
@@ -259,12 +259,14 @@ public class TestHoodieBloomIndex extends TestHoodieMetadataBase {
         jsc.parallelize(Arrays.asList(new Tuple2<>("2017/10/22", "003"), new Tuple2<>("2017/10/22", "002"),
             new Tuple2<>("2017/10/22", "005"), new Tuple2<>("2017/10/22", "004"))).mapToPair(t -> t);
 
-    List<Pair<String, HoodieKey>> comparisonKeyList = HoodieJavaRDD.getJavaRDD(
-        index.explodeRecordsWithFileComparisons(partitionToFileIndexInfo, HoodieJavaPairRDD.of(partitionRecordKeyPairRDD))).collect();
+    List<Pair<HoodieFileGroupId, String>> comparisonKeyList =
+        index.explodeRecordsWithFileComparisons(partitionToFileIndexInfo, HoodieJavaPairRDD.of(partitionRecordKeyPairRDD)).collectAsList();
 
     assertEquals(10, comparisonKeyList.size());
     Map<String, List<String>> recordKeyToFileComps = comparisonKeyList.stream()
-        .collect(Collectors.groupingBy(t -> t.getRight().getRecordKey(), Collectors.mapping(Pair::getLeft, Collectors.toList())));
+        .collect(
+            Collectors.groupingBy(t -> t.getRight(),
+                Collectors.mapping(t -> t.getLeft().getFileId(), Collectors.toList())));
 
     assertEquals(4, recordKeyToFileComps.size());
     assertEquals(new HashSet<>(Arrays.asList("f1", "f3", "f4")), new HashSet<>(recordKeyToFileComps.get("002")));
diff --git a/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/index/bloom/TestHoodieGlobalBloomIndex.java b/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/index/bloom/TestHoodieGlobalBloomIndex.java
index 3ad8952feea..2577f9ba284 100644
--- a/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/index/bloom/TestHoodieGlobalBloomIndex.java
+++ b/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/index/bloom/TestHoodieGlobalBloomIndex.java
@@ -21,6 +21,7 @@ package org.apache.hudi.index.bloom;
 import org.apache.hudi.client.functional.TestHoodieMetadataBase;
 import org.apache.hudi.common.model.EmptyHoodieRecordPayload;
 import org.apache.hudi.common.model.HoodieAvroRecord;
+import org.apache.hudi.common.model.HoodieFileGroupId;
 import org.apache.hudi.common.model.HoodieKey;
 import org.apache.hudi.common.model.HoodieRecord;
 import org.apache.hudi.common.model.WriteOperationType;
@@ -29,7 +30,6 @@ import org.apache.hudi.common.util.collection.Pair;
 import org.apache.hudi.config.HoodieIndexConfig;
 import org.apache.hudi.config.HoodieWriteConfig;
 import org.apache.hudi.data.HoodieJavaPairRDD;
-import org.apache.hudi.data.HoodieJavaRDD;
 import org.apache.hudi.index.HoodieIndex;
 import org.apache.hudi.table.HoodieSparkTable;
 import org.apache.hudi.table.HoodieTable;
@@ -201,9 +201,9 @@ public class TestHoodieGlobalBloomIndex extends TestHoodieMetadataBase {
         jsc.parallelize(Arrays.asList(new Tuple2<>("2017/10/21", "003"), new Tuple2<>("2017/10/22", "002"),
             new Tuple2<>("2017/10/22", "005"), new Tuple2<>("2017/10/23", "004"))).mapToPair(t -> t);
 
-    List<Pair<String, HoodieKey>> comparisonKeyList = HoodieJavaRDD.getJavaRDD(
-        index.explodeRecordsWithFileComparisons(partitionToFileIndexInfo,
-            HoodieJavaPairRDD.of(partitionRecordKeyPairRDD))).collect();
+    List<Pair<HoodieFileGroupId, String>> comparisonKeyList =
+        index.explodeRecordsWithFileComparisons(partitionToFileIndexInfo, HoodieJavaPairRDD.of(partitionRecordKeyPairRDD))
+            .collectAsList();
 
     /*
      * expecting: f4, HoodieKey { recordKey=003 partitionPath=2017/10/23} f1, HoodieKey { recordKey=003
@@ -216,7 +216,7 @@ public class TestHoodieGlobalBloomIndex extends TestHoodieMetadataBase {
     assertEquals(10, comparisonKeyList.size());
 
     Map<String, List<String>> recordKeyToFileComps = comparisonKeyList.stream()
-        .collect(Collectors.groupingBy(t -> t.getRight().getRecordKey(), Collectors.mapping(Pair::getKey, Collectors.toList())));
+        .collect(Collectors.groupingBy(t -> t.getRight(), Collectors.mapping(t -> t.getLeft().getFileId(), Collectors.toList())));
 
     assertEquals(4, recordKeyToFileComps.size());
     assertEquals(new HashSet<>(Arrays.asList("f4", "f1", "f3")), new HashSet<>(recordKeyToFileComps.get("002")));
diff --git a/hudi-common/src/main/java/org/apache/hudi/common/data/HoodieData.java b/hudi-common/src/main/java/org/apache/hudi/common/data/HoodieData.java
index 1d56e63fad9..9ce0947883f 100644
--- a/hudi-common/src/main/java/org/apache/hudi/common/data/HoodieData.java
+++ b/hudi-common/src/main/java/org/apache/hudi/common/data/HoodieData.java
@@ -105,9 +105,9 @@ public interface HoodieData<T> extends Serializable {
       Iterator<O>> func, boolean preservesPartitioning);
 
   /**
-   * Maps every element in the collection into a collection of the new elements (provided by
-   * {@link Iterator}) using provided mapping {@code func}, subsequently flattening the result
-   * (by concatenating) into a single collection
+   * Maps every element in the collection into a collection of the new elements using provided
+   * mapping {@code func}, subsequently flattening the result (by concatenating) into a single
+   * collection
    *
    * This is an intermediate operation
    *
@@ -117,6 +117,17 @@ public interface HoodieData<T> extends Serializable {
    */
   <O> HoodieData<O> flatMap(SerializableFunction<T, Iterator<O>> func);
 
+  /**
+   * Maps every element in the collection into a collection of the {@link Pair}s of new elements
+   * using provided mapping {@code func}, subsequently flattening the result (by concatenating) into
+   * a single collection
+   *
+   * NOTE: That this operation will convert container from {@link HoodieData} to {@link HoodiePairData}
+   *
+   * This is an intermediate operation
+   */
+  <K, V> HoodiePairData<K, V> flatMapToPair(SerializableFunction<T, Iterator<? extends Pair<K, V>>> func);
+
   /**
    * Maps every element in the collection using provided mapping {@code func} into a {@link Pair<K, V>}
    * of elements {@code K} and {@code V}
diff --git a/hudi-common/src/main/java/org/apache/hudi/common/data/HoodieListData.java b/hudi-common/src/main/java/org/apache/hudi/common/data/HoodieListData.java
index b2a503a85b3..c6287a744e0 100644
--- a/hudi-common/src/main/java/org/apache/hudi/common/data/HoodieListData.java
+++ b/hudi-common/src/main/java/org/apache/hudi/common/data/HoodieListData.java
@@ -125,6 +125,16 @@ public class HoodieListData<T> extends HoodieBaseListData<T> implements HoodieDa
     return new HoodieListData<>(mappedStream, lazy);
   }
 
+  @Override
+  public <K, V> HoodiePairData<K, V> flatMapToPair(SerializableFunction<T, Iterator<? extends Pair<K, V>>> func) {
+    Function<T, Iterator<? extends Pair<K, V>>> mapper = throwingMapWrapper(func);
+    Stream<Pair<K, V>> mappedStream = asStream().flatMap(e ->
+        StreamSupport.stream(
+            Spliterators.spliteratorUnknownSize(mapper.apply(e), Spliterator.ORDERED), true));
+
+    return new HoodieListPairData<>(mappedStream, lazy);
+  }
+
   @Override
   public <K, V> HoodiePairData<K, V> mapToPair(SerializablePairFunction<T, K, V> func) {
     Function<T, Pair<K, V>> throwableMapToPairFunc = throwingMapToPairWrapper(func);
diff --git a/hudi-common/src/main/java/org/apache/hudi/common/data/HoodieListPairData.java b/hudi-common/src/main/java/org/apache/hudi/common/data/HoodieListPairData.java
index a389649548e..39ce1411575 100644
--- a/hudi-common/src/main/java/org/apache/hudi/common/data/HoodieListPairData.java
+++ b/hudi-common/src/main/java/org/apache/hudi/common/data/HoodieListPairData.java
@@ -23,15 +23,20 @@ import org.apache.hudi.common.function.SerializableFunction;
 import org.apache.hudi.common.function.SerializablePairFunction;
 import org.apache.hudi.common.util.Option;
 import org.apache.hudi.common.util.ValidationUtils;
+import org.apache.hudi.common.util.collection.MappingIterator;
 import org.apache.hudi.common.util.collection.Pair;
 
 import java.util.HashMap;
+import java.util.Iterator;
 import java.util.List;
 import java.util.Map;
+import java.util.Spliterator;
+import java.util.Spliterators;
 import java.util.function.Function;
 import java.util.stream.Collector;
 import java.util.stream.Collectors;
 import java.util.stream.Stream;
+import java.util.stream.StreamSupport;
 
 import static org.apache.hudi.common.function.FunctionWrapper.throwingMapToPairWrapper;
 import static org.apache.hudi.common.function.FunctionWrapper.throwingMapWrapper;
@@ -136,6 +141,24 @@ public class HoodieListPairData<K, V> extends HoodieBaseListData<Pair<K, V>> imp
     return new HoodieListData<>(asStream().map(uncheckedMapper), lazy);
   }
 
+  @Override
+  public <W> HoodiePairData<K, W> mapValues(SerializableFunction<V, W> func) {
+    Function<V, W> uncheckedMapper = throwingMapWrapper(func);
+    return new HoodieListPairData<>(asStream().map(p -> Pair.of(p.getKey(), uncheckedMapper.apply(p.getValue()))), lazy);
+  }
+
+  public <W> HoodiePairData<K, W> flatMapValues(SerializableFunction<V, Iterator<W>> func) {
+    Function<V, Iterator<W>> uncheckedMapper = throwingMapWrapper(func);
+    return new HoodieListPairData<>(asStream().flatMap(p -> {
+      Iterator<W> mappedValuesIterator = uncheckedMapper.apply(p.getValue());
+      Iterator<Pair<K, W>> mappedPairsIterator =
+          new MappingIterator<>(mappedValuesIterator, w -> Pair.of(p.getKey(), w));
+
+      return StreamSupport.stream(
+          Spliterators.spliteratorUnknownSize(mappedPairsIterator, Spliterator.ORDERED), true);
+    }), lazy);
+  }
+
   @Override
   public <L, W> HoodiePairData<L, W> mapToPair(SerializablePairFunction<Pair<K, V>, L, W> mapToPairFunc) {
     return new HoodieListPairData<>(asStream().map(p -> throwingMapToPairWrapper(mapToPairFunc).apply(p)), lazy);
diff --git a/hudi-common/src/main/java/org/apache/hudi/common/data/HoodiePairData.java b/hudi-common/src/main/java/org/apache/hudi/common/data/HoodiePairData.java
index 49fa7174da9..1d3622786fd 100644
--- a/hudi-common/src/main/java/org/apache/hudi/common/data/HoodiePairData.java
+++ b/hudi-common/src/main/java/org/apache/hudi/common/data/HoodiePairData.java
@@ -90,12 +90,17 @@ public interface HoodiePairData<K, V> extends Serializable {
   HoodiePairData<K, V> reduceByKey(SerializableBiFunction<V, V, V> combiner, int parallelism);
 
   /**
-   * @param func serializable map function.
-   * @param <O>  output object type.
-   * @return {@link HoodieData<O>} containing the result. Actual execution may be deferred.
+   * Maps key-value pairs of this {@link HoodiePairData} container leveraging provided mapper
+   *
+   * NOTE: That this returns {@link HoodieData} and not {@link HoodiePairData}
    */
   <O> HoodieData<O> map(SerializableFunction<Pair<K, V>, O> func);
 
+  /**
+   * Maps values of this {@link HoodiePairData} container leveraging provided mapper
+   */
+  <W> HoodiePairData<K, W> mapValues(SerializableFunction<V, W> func);
+
   /**
    * @param mapToPairFunc serializable map function to generate another pair.
    * @param <L>           new key type.
diff --git a/hudi-common/src/main/java/org/apache/hudi/common/util/collection/MappingIterator.java b/hudi-common/src/main/java/org/apache/hudi/common/function/ThrowingConsumer.java
similarity index 59%
copy from hudi-common/src/main/java/org/apache/hudi/common/util/collection/MappingIterator.java
copy to hudi-common/src/main/java/org/apache/hudi/common/function/ThrowingConsumer.java
index a4655764a97..1cbae113fcb 100644
--- a/hudi-common/src/main/java/org/apache/hudi/common/util/collection/MappingIterator.java
+++ b/hudi-common/src/main/java/org/apache/hudi/common/function/ThrowingConsumer.java
@@ -16,29 +16,14 @@
  * limitations under the License.
  */
 
-package org.apache.hudi.common.util.collection;
+package org.apache.hudi.common.function;
 
-import java.util.Iterator;
-import java.util.function.Function;
-
-// TODO java-docs
-public class MappingIterator<I, O> implements Iterator<O> {
-
-  protected final Iterator<I> source;
-  private final Function<I, O> mapper;
-
-  public MappingIterator(Iterator<I> source, Function<I, O> mapper) {
-    this.source = source;
-    this.mapper = mapper;
-  }
+/**
+ * Throwing counterpart of {@link java.util.function.Consumer}
+ */
+@FunctionalInterface
+public interface ThrowingConsumer<T> {
 
-  @Override
-  public boolean hasNext() {
-    return source.hasNext();
-  }
+  void accept(T t) throws Exception;
 
-  @Override
-  public O next() {
-    return mapper.apply(source.next());
-  }
-}
+}
\ No newline at end of file
diff --git a/hudi-common/src/main/java/org/apache/hudi/common/model/HoodieAvroIndexedRecord.java b/hudi-common/src/main/java/org/apache/hudi/common/model/HoodieAvroIndexedRecord.java
index 7d497408e64..a8c695240cd 100644
--- a/hudi-common/src/main/java/org/apache/hudi/common/model/HoodieAvroIndexedRecord.java
+++ b/hudi-common/src/main/java/org/apache/hudi/common/model/HoodieAvroIndexedRecord.java
@@ -113,7 +113,7 @@ public class HoodieAvroIndexedRecord extends HoodieRecord<IndexedRecord> {
 
   @Override
   public HoodieRecord rewriteRecord(Schema recordSchema, Properties props, Schema targetSchema) throws IOException {
-    GenericRecord record = HoodieAvroUtils.rewriteRecord((GenericRecord) data, targetSchema);
+    GenericRecord record = HoodieAvroUtils.rewriteRecordWithNewSchema(data, targetSchema);
     return new HoodieAvroIndexedRecord(key, record, operation, metaData);
   }
 
diff --git a/hudi-common/src/main/java/org/apache/hudi/common/table/HoodieTableMetaClient.java b/hudi-common/src/main/java/org/apache/hudi/common/table/HoodieTableMetaClient.java
index e7c749a2911..00fa1b97db0 100644
--- a/hudi-common/src/main/java/org/apache/hudi/common/table/HoodieTableMetaClient.java
+++ b/hudi-common/src/main/java/org/apache/hudi/common/table/HoodieTableMetaClient.java
@@ -338,6 +338,10 @@ public class HoodieTableMetaClient implements Serializable {
     return hadoopConf.get();
   }
 
+  public SerializableConfiguration getSerializableHadoopConf() {
+    return hadoopConf;
+  }
+
   /**
    * Get the active instants as a timeline.
    *
diff --git a/hudi-common/src/main/java/org/apache/hudi/common/util/collection/FlatteningIterator.java b/hudi-common/src/main/java/org/apache/hudi/common/util/collection/FlatteningIterator.java
new file mode 100644
index 00000000000..4adf6b5c91f
--- /dev/null
+++ b/hudi-common/src/main/java/org/apache/hudi/common/util/collection/FlatteningIterator.java
@@ -0,0 +1,55 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hudi.common.util.collection;
+
+import java.util.Iterator;
+import java.util.NoSuchElementException;
+
+/**
+ * Iterator flattening source {@link Iterator} holding other {@link Iterator}s
+ */
+public final class FlatteningIterator<T, I extends Iterator<T>> implements Iterator<T> {
+
+  private final Iterator<I> sourceIterator;
+  private Iterator<T> innerSourceIterator;
+
+  public FlatteningIterator(Iterator<I> source) {
+    this.sourceIterator = source;
+  }
+
+  public boolean hasNext() {
+    while (innerSourceIterator == null || !innerSourceIterator.hasNext()) {
+      if (sourceIterator.hasNext()) {
+        innerSourceIterator = sourceIterator.next();
+      } else {
+        return false;
+      }
+    }
+
+    return true;
+  }
+
+  @Override
+  public T next() {
+    if (!hasNext()) {
+      throw new NoSuchElementException();
+    }
+
+    return innerSourceIterator.next();
+  }
+}
diff --git a/hudi-common/src/main/java/org/apache/hudi/common/util/collection/MappingIterator.java b/hudi-common/src/main/java/org/apache/hudi/common/util/collection/MappingIterator.java
index a4655764a97..24b0961470b 100644
--- a/hudi-common/src/main/java/org/apache/hudi/common/util/collection/MappingIterator.java
+++ b/hudi-common/src/main/java/org/apache/hudi/common/util/collection/MappingIterator.java
@@ -21,7 +21,9 @@ package org.apache.hudi.common.util.collection;
 import java.util.Iterator;
 import java.util.function.Function;
 
-// TODO java-docs
+/**
+ * Iterator mapping elements of the provided source {@link Iterator} from {@code I} to {@code O}
+ */
 public class MappingIterator<I, O> implements Iterator<O> {
 
   protected final Iterator<I> source;
diff --git a/hudi-common/src/main/java/org/apache/hudi/metadata/BaseTableMetadata.java b/hudi-common/src/main/java/org/apache/hudi/metadata/BaseTableMetadata.java
index 1034aa31255..f343d6437cf 100644
--- a/hudi-common/src/main/java/org/apache/hudi/metadata/BaseTableMetadata.java
+++ b/hudi-common/src/main/java/org/apache/hudi/metadata/BaseTableMetadata.java
@@ -19,12 +19,12 @@
 
 package org.apache.hudi.metadata;
 
+import org.apache.hadoop.conf.Configuration;
 import org.apache.hudi.avro.model.HoodieMetadataBloomFilter;
 import org.apache.hudi.avro.model.HoodieMetadataColumnStats;
 import org.apache.hudi.common.bloom.BloomFilter;
 import org.apache.hudi.common.bloom.BloomFilterFactory;
 import org.apache.hudi.common.config.HoodieMetadataConfig;
-import org.apache.hudi.common.config.SerializableConfiguration;
 import org.apache.hudi.common.engine.HoodieEngineContext;
 import org.apache.hudi.common.engine.HoodieLocalEngineContext;
 import org.apache.hudi.common.fs.FSUtils;
@@ -73,11 +73,12 @@ public abstract class BaseTableMetadata implements HoodieTableMetadata {
 
   private static final Logger LOG = LogManager.getLogger(BaseTableMetadata.class);
 
-  public static final long MAX_MEMORY_SIZE_IN_BYTES = 1024 * 1024 * 1024;
-  public static final int BUFFER_SIZE = 10 * 1024 * 1024;
+  protected static final long MAX_MEMORY_SIZE_IN_BYTES = 1024 * 1024 * 1024;
+  // NOTE: Buffer-size is deliberately set pretty low, since MT internally is relying
+  //       on HFile (serving as persisted binary key-value mapping) to do caching
+  protected static final int BUFFER_SIZE = 10 * 1024; // 10Kb
 
   protected final transient HoodieEngineContext engineContext;
-  protected final SerializableConfiguration hadoopConf;
   protected final SerializablePath dataBasePath;
   protected final HoodieTableMetaClient dataMetaClient;
   protected final Option<HoodieMetadataMetrics> metrics;
@@ -92,9 +93,11 @@ public abstract class BaseTableMetadata implements HoodieTableMetadata {
   protected BaseTableMetadata(HoodieEngineContext engineContext, HoodieMetadataConfig metadataConfig,
                               String dataBasePath, String spillableMapDirectory) {
     this.engineContext = engineContext;
-    this.hadoopConf = new SerializableConfiguration(engineContext.getHadoopConf());
     this.dataBasePath = new SerializablePath(new CachingPath(dataBasePath));
-    this.dataMetaClient = HoodieTableMetaClient.builder().setConf(hadoopConf.get()).setBasePath(dataBasePath).build();
+    this.dataMetaClient = HoodieTableMetaClient.builder()
+        .setConf(engineContext.getHadoopConf().get())
+        .setBasePath(dataBasePath)
+        .build();
     this.spillableMapDirectory = spillableMapDirectory;
     this.metadataConfig = metadataConfig;
 
@@ -123,8 +126,10 @@ public abstract class BaseTableMetadata implements HoodieTableMetadata {
         throw new HoodieMetadataException("Failed to retrieve list of partition from metadata", e);
       }
     }
-    return new FileSystemBackedTableMetadata(getEngineContext(), hadoopConf, dataBasePath.toString(),
-        metadataConfig.shouldAssumeDatePartitioning()).getAllPartitionPaths();
+
+    FileSystemBackedTableMetadata fileSystemBackedTableMetadata =
+        createFileSystemBackedTableMetadata();
+    return fileSystemBackedTableMetadata.getAllPartitionPaths();
   }
 
   /**
@@ -148,8 +153,9 @@ public abstract class BaseTableMetadata implements HoodieTableMetadata {
       }
     }
 
-    return new FileSystemBackedTableMetadata(getEngineContext(), hadoopConf, dataBasePath.toString(), metadataConfig.shouldAssumeDatePartitioning())
-        .getAllFilesInPartition(partitionPath);
+    FileSystemBackedTableMetadata fileSystemBackedTableMetadata =
+        createFileSystemBackedTableMetadata();
+    return fileSystemBackedTableMetadata.getAllFilesInPartition(partitionPath);
   }
 
   @Override
@@ -168,8 +174,9 @@ public abstract class BaseTableMetadata implements HoodieTableMetadata {
       }
     }
 
-    return new FileSystemBackedTableMetadata(getEngineContext(), hadoopConf, dataBasePath.toString(), metadataConfig.shouldAssumeDatePartitioning())
-        .getAllFilesInPartitions(partitions);
+    FileSystemBackedTableMetadata fileSystemBackedTableMetadata =
+        createFileSystemBackedTableMetadata();
+    return fileSystemBackedTableMetadata.getAllFilesInPartitions(partitions);
   }
 
   @Override
@@ -205,6 +212,7 @@ public abstract class BaseTableMetadata implements HoodieTableMetadata {
     HoodieTimer timer = HoodieTimer.start();
     Set<String> partitionIDFileIDSortedStrings = new TreeSet<>();
     Map<String, Pair<String, String>> fileToKeyMap = new HashMap<>();
+    // TODO simplify (no sorting is required)
     partitionNameFileNameList.forEach(partitionNameFileNamePair -> {
           final String bloomFilterIndexKey = HoodieMetadataPayload.getBloomFilterIndexKey(
               new PartitionIndexID(partitionNameFileNamePair.getLeft()), new FileIndexID(partitionNameFileNamePair.getRight()));
@@ -227,7 +235,11 @@ public abstract class BaseTableMetadata implements HoodieTableMetadata {
         if (bloomFilterMetadata.isPresent()) {
           if (!bloomFilterMetadata.get().getIsDeleted()) {
             ValidationUtils.checkState(fileToKeyMap.containsKey(entry.getLeft()));
-            final ByteBuffer bloomFilterByteBuffer = bloomFilterMetadata.get().getBloomFilter();
+            // NOTE: We have to duplicate the [[ByteBuffer]] object here since:
+            //        - Reading out [[ByteBuffer]] mutates its state
+            //        - [[BloomFilterMetadata]] could be re-used, and hence have to stay immutable
+            final ByteBuffer bloomFilterByteBuffer =
+                bloomFilterMetadata.get().getBloomFilter().duplicate();
             final String bloomFilterType = bloomFilterMetadata.get().getType();
             final BloomFilter bloomFilter = BloomFilterFactory.fromString(
                 StandardCharsets.UTF_8.decode(bloomFilterByteBuffer).toString(), bloomFilterType);
@@ -332,7 +344,7 @@ public abstract class BaseTableMetadata implements HoodieTableMetadata {
       HoodieMetadataPayload metadataPayload = record.getData();
       checkForSpuriousDeletes(metadataPayload, recordKey);
       try {
-        return metadataPayload.getFileStatuses(hadoopConf.get(), partitionPath);
+        return metadataPayload.getFileStatuses(getHadoopConf(), partitionPath);
       } catch (IOException e) {
         throw new HoodieIOException("Failed to extract file-statuses from the payload", e);
       }
@@ -358,7 +370,7 @@ public abstract class BaseTableMetadata implements HoodieTableMetadata {
         getRecordsByKeys(new ArrayList<>(partitionIdToPathMap.keySet()), MetadataPartitionType.FILES.getPartitionPath());
     metrics.ifPresent(m -> m.updateMetrics(HoodieMetadataMetrics.LOOKUP_FILES_STR, timer.endTimer()));
 
-    FileSystem fs = partitionPaths.get(0).getFileSystem(hadoopConf.get());
+    FileSystem fs = partitionPaths.get(0).getFileSystem(getHadoopConf());
 
     Map<String, FileStatus[]> partitionPathToFilesMap = partitionIdRecordPairs.parallelStream()
         .map(pair -> {
@@ -399,18 +411,27 @@ public abstract class BaseTableMetadata implements HoodieTableMetadata {
     }
   }
 
+  private FileSystemBackedTableMetadata createFileSystemBackedTableMetadata() {
+    return new FileSystemBackedTableMetadata(getEngineContext(), dataMetaClient.getSerializableHadoopConf(), dataBasePath.toString(),
+        metadataConfig.shouldAssumeDatePartitioning());
+  }
+
   protected abstract Option<HoodieRecord<HoodieMetadataPayload>> getRecordByKey(String key, String partitionName);
 
   public abstract List<Pair<String, Option<HoodieRecord<HoodieMetadataPayload>>>> getRecordsByKeys(List<String> key, String partitionName);
 
   protected HoodieEngineContext getEngineContext() {
-    return engineContext != null ? engineContext : new HoodieLocalEngineContext(hadoopConf.get());
+    return engineContext != null ? engineContext : new HoodieLocalEngineContext(getHadoopConf());
   }
 
   public HoodieMetadataConfig getMetadataConfig() {
     return metadataConfig;
   }
 
+  protected Configuration getHadoopConf() {
+    return dataMetaClient.getHadoopConf();
+  }
+
   protected String getLatestDataInstantTime() {
     return dataMetaClient.getActiveTimeline().filterCompletedInstants().lastInstant()
         .map(HoodieInstant::getTimestamp).orElse(SOLO_COMMIT_TIMESTAMP);
diff --git a/hudi-common/src/main/java/org/apache/hudi/metadata/HoodieBackedTableMetadata.java b/hudi-common/src/main/java/org/apache/hudi/metadata/HoodieBackedTableMetadata.java
index 09c6b35309a..ecb0da8792d 100644
--- a/hudi-common/src/main/java/org/apache/hudi/metadata/HoodieBackedTableMetadata.java
+++ b/hudi-common/src/main/java/org/apache/hudi/metadata/HoodieBackedTableMetadata.java
@@ -24,7 +24,6 @@ import org.apache.hudi.avro.model.HoodieRestoreMetadata;
 import org.apache.hudi.avro.model.HoodieRollbackMetadata;
 import org.apache.hudi.common.config.HoodieCommonConfig;
 import org.apache.hudi.common.config.HoodieMetadataConfig;
-import org.apache.hudi.common.config.SerializableConfiguration;
 import org.apache.hudi.common.data.HoodieData;
 import org.apache.hudi.common.data.HoodieListData;
 import org.apache.hudi.common.engine.HoodieEngineContext;
@@ -53,6 +52,7 @@ import org.apache.hudi.exception.HoodieException;
 import org.apache.hudi.exception.HoodieIOException;
 import org.apache.hudi.exception.HoodieMetadataException;
 import org.apache.hudi.exception.TableNotFoundException;
+import org.apache.hudi.util.Transient;
 import org.apache.hudi.io.storage.HoodieFileReaderFactory;
 import org.apache.hudi.io.storage.HoodieSeekingFileReader;
 
@@ -90,19 +90,18 @@ public class HoodieBackedTableMetadata extends BaseTableMetadata {
 
   private static final Logger LOG = LogManager.getLogger(HoodieBackedTableMetadata.class);
 
-  private static final Schema METADATA_RECORD_SCHEMA = HoodieMetadataRecord.getClassSchema();
+  private final String metadataBasePath;
 
-  private String metadataBasePath;
-  // Metadata table's timeline and metaclient
   private HoodieTableMetaClient metadataMetaClient;
   private HoodieTableConfig metadataTableConfig;
+
   private HoodieTableFileSystemView metadataFileSystemView;
   // should we reuse the open file handles, across calls
   private final boolean reuse;
 
   // Readers for the latest file slice corresponding to file groups in the metadata partition
-  private final Map<Pair<String, String>, Pair<HoodieSeekingFileReader<?>, HoodieMetadataLogRecordReader>> partitionReaders =
-      new ConcurrentHashMap<>();
+  private final Transient<Map<Pair<String, String>, Pair<HoodieSeekingFileReader<?>, HoodieMetadataLogRecordReader>>> partitionReaders =
+      Transient.lazy(ConcurrentHashMap::new);
 
   public HoodieBackedTableMetadata(HoodieEngineContext engineContext, HoodieMetadataConfig metadataConfig,
                                    String datasetBasePath, String spillableMapDirectory) {
@@ -113,18 +112,19 @@ public class HoodieBackedTableMetadata extends BaseTableMetadata {
                                    String datasetBasePath, String spillableMapDirectory, boolean reuse) {
     super(engineContext, metadataConfig, datasetBasePath, spillableMapDirectory);
     this.reuse = reuse;
+    this.metadataBasePath = HoodieTableMetadata.getMetadataTableBasePath(dataBasePath.toString());
+
     initIfNeeded();
   }
 
   private void initIfNeeded() {
-    this.metadataBasePath = HoodieTableMetadata.getMetadataTableBasePath(dataBasePath.toString());
     if (!isMetadataTableEnabled) {
       if (!HoodieTableMetadata.isMetadataTable(metadataBasePath)) {
         LOG.info("Metadata table is disabled.");
       }
     } else if (this.metadataMetaClient == null) {
       try {
-        this.metadataMetaClient = HoodieTableMetaClient.builder().setConf(hadoopConf.get()).setBasePath(metadataBasePath).build();
+        this.metadataMetaClient = HoodieTableMetaClient.builder().setConf(getHadoopConf()).setBasePath(metadataBasePath).build();
         this.metadataFileSystemView = getFileSystemView(metadataMetaClient);
         this.metadataTableConfig = metadataMetaClient.getTableConfig();
         this.isBloomFilterIndexEnabled = metadataConfig.isBloomFilterIndexEnabled();
@@ -213,14 +213,14 @@ public class HoodieBackedTableMetadata extends BaseTableMetadata {
 
               return mergedRecords.stream()
                 .map(keyRecordPair -> keyRecordPair.getValue().orElse(null))
+                .filter(Objects::nonNull)
                 .iterator();
             } catch (IOException ioe) {
               throw new HoodieIOException("Error merging records from metadata table for  " + sortedKeyPrefixes.size() + " key : ", ioe);
             } finally {
               closeReader(readers);
             }
-          })
-        .filter(Objects::nonNull);
+          });
   }
 
   @Override
@@ -425,7 +425,7 @@ public class HoodieBackedTableMetadata extends BaseTableMetadata {
   private Pair<HoodieSeekingFileReader<?>, HoodieMetadataLogRecordReader> getOrCreateReaders(String partitionName, FileSlice slice) {
     if (reuse) {
       Pair<String, String> key = Pair.of(partitionName, slice.getFileId());
-      return partitionReaders.computeIfAbsent(key, ignored -> openReaders(partitionName, slice));
+      return partitionReaders.get().computeIfAbsent(key, ignored -> openReaders(partitionName, slice));
     } else {
       return openReaders(partitionName, slice);
     }
@@ -462,7 +462,7 @@ public class HoodieBackedTableMetadata extends BaseTableMetadata {
     if (basefile.isPresent()) {
       String baseFilePath = basefile.get().getPath();
       baseFileReader = (HoodieSeekingFileReader<?>) HoodieFileReaderFactory.getReaderFactory(HoodieRecordType.AVRO)
-          .getFileReader(hadoopConf.get(), new Path(baseFilePath));
+          .getFileReader(getHadoopConf(), new Path(baseFilePath));
       baseFileOpenMs = timer.endTimer();
       LOG.info(String.format("Opened metadata base file from %s at instant %s in %d ms", baseFilePath,
           basefile.get().getCommitTime(), baseFileOpenMs));
@@ -596,7 +596,7 @@ public class HoodieBackedTableMetadata extends BaseTableMetadata {
    */
   private synchronized void close(Pair<String, String> partitionFileSlicePair) {
     Pair<HoodieSeekingFileReader<?>, HoodieMetadataLogRecordReader> readers =
-        partitionReaders.remove(partitionFileSlicePair);
+        partitionReaders.get().remove(partitionFileSlicePair);
     closeReader(readers);
   }
 
@@ -604,10 +604,10 @@ public class HoodieBackedTableMetadata extends BaseTableMetadata {
    * Close and clear all the partitions readers.
    */
   private void closePartitionReaders() {
-    for (Pair<String, String> partitionFileSlicePair : partitionReaders.keySet()) {
+    for (Pair<String, String> partitionFileSlicePair : partitionReaders.get().keySet()) {
       close(partitionFileSlicePair);
     }
-    partitionReaders.clear();
+    partitionReaders.get().clear();
   }
 
   private void closeReader(Pair<HoodieSeekingFileReader<?>, HoodieMetadataLogRecordReader> readers) {
@@ -629,10 +629,6 @@ public class HoodieBackedTableMetadata extends BaseTableMetadata {
     return isMetadataTableEnabled;
   }
 
-  public SerializableConfiguration getHadoopConf() {
-    return hadoopConf;
-  }
-
   public HoodieTableMetaClient getMetadataMetaClient() {
     return metadataMetaClient;
   }
diff --git a/hudi-common/src/main/java/org/apache/hudi/metadata/HoodieMetadataFileSystemView.java b/hudi-common/src/main/java/org/apache/hudi/metadata/HoodieMetadataFileSystemView.java
index 76b341609de..4d1db7e81d4 100644
--- a/hudi-common/src/main/java/org/apache/hudi/metadata/HoodieMetadataFileSystemView.java
+++ b/hudi-common/src/main/java/org/apache/hudi/metadata/HoodieMetadataFileSystemView.java
@@ -54,9 +54,8 @@ public class HoodieMetadataFileSystemView extends HoodieTableFileSystemView {
                                       HoodieTableMetaClient metaClient,
                                       HoodieTimeline visibleActiveTimeline,
                                       HoodieMetadataConfig metadataConfig) {
-    super(metaClient, visibleActiveTimeline);
-    this.tableMetadata = HoodieTableMetadata.create(engineContext, metadataConfig, metaClient.getBasePath(),
-        FileSystemViewStorageConfig.SPILLABLE_DIR.defaultValue(), true);
+    this(metaClient, visibleActiveTimeline, HoodieTableMetadata.create(engineContext, metadataConfig,
+        metaClient.getBasePath(), FileSystemViewStorageConfig.SPILLABLE_DIR.defaultValue(), true));
   }
 
   /**
diff --git a/hudi-common/src/main/java/org/apache/hudi/util/Transient.java b/hudi-common/src/main/java/org/apache/hudi/util/Transient.java
new file mode 100644
index 00000000000..0d8f6ad6565
--- /dev/null
+++ b/hudi-common/src/main/java/org/apache/hudi/util/Transient.java
@@ -0,0 +1,108 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hudi.util;
+
+import org.apache.hudi.common.function.SerializableSupplier;
+import org.apache.hudi.common.function.ThrowingConsumer;
+
+import javax.annotation.concurrent.ThreadSafe;
+import java.io.Serializable;
+
+import static org.apache.hudi.common.util.ValidationUtils.checkArgument;
+
+/**
+ * {@link Serializable} counterpart of {@link Lazy}
+ *
+ * @param <T> type of the object being held by {@link Transient}
+ */
+@ThreadSafe
+public class Transient<T> implements Serializable {
+
+  private SerializableSupplier<T> initializer;
+
+  private transient boolean initialized;
+  private transient T ref;
+
+  private Transient(SerializableSupplier<T> initializer) {
+    checkArgument(initializer != null);
+
+    this.initializer = initializer;
+    this.ref = null;
+    this.initialized = false;
+  }
+
+  private Transient(T value, SerializableSupplier<T> initializer) {
+    checkArgument(value != null);
+    checkArgument(initializer != null);
+
+    this.initializer = initializer;
+    this.ref = value;
+    this.initialized = true;
+  }
+
+  public T get() {
+    if (!initialized) {
+      synchronized (this) {
+        if (!initialized) {
+          this.ref = initializer.get();
+          initialized = true;
+        }
+      }
+    }
+
+    return ref;
+  }
+
+  public void reset() {
+    synchronized (this) {
+      this.ref = null;
+      this.initialized = false;
+    }
+  }
+
+  public void destroy(ThrowingConsumer<T> cleaner) throws Exception {
+    synchronized (this) {
+      if (initialized) {
+        cleaner.accept(ref);
+      }
+
+      this.ref = null;
+      this.initialized = false;
+      this.initializer = null;
+    }
+  }
+
+  /**
+   * Creates instance of {@link Transient} by lazily executing provided {@code initializer},
+   * to instantiate value of type {@link T}. Same initializer will be used to re-instantiate
+   * the value after original one being dropped during serialization/deserialization cycle
+   */
+  public static <T> Transient<T> lazy(SerializableSupplier<T> initializer) {
+    return new Transient<>(initializer);
+  }
+
+  /**
+   * Creates instance of {@link Transient} by eagerly setting it to provided {@code value},
+   * while given {@code initializer} will be used to re-instantiate the value after original
+   * one being dropped during serialization/deserialization cycle
+   */
+  public static <T> Transient<T> eager(T value, SerializableSupplier<T> initializer) {
+    return new Transient<>(value, initializer);
+  }
+}
\ No newline at end of file
diff --git a/hudi-common/src/test/java/org/apache/hudi/common/util/collection/TestIterators.java b/hudi-common/src/test/java/org/apache/hudi/common/util/collection/TestIterators.java
new file mode 100644
index 00000000000..6e9d43388b1
--- /dev/null
+++ b/hudi-common/src/test/java/org/apache/hudi/common/util/collection/TestIterators.java
@@ -0,0 +1,48 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to you under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hudi.common.util.collection;
+
+import org.junit.jupiter.api.Test;
+
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.List;
+import java.util.stream.Collectors;
+
+import static org.apache.hudi.common.util.CollectionUtils.toStream;
+import static org.junit.jupiter.api.Assertions.assertEquals;
+
+public class TestIterators {
+
+  @Test
+  public void testFlatteningIterator() {
+    List<List<Integer>> listOfList =
+        Arrays.asList(
+            Arrays.asList(0),
+            Arrays.asList(1, 2),
+            Collections.emptyList(),
+            Arrays.asList(3, 4, 5)
+        );
+
+    List<Integer> flattenedList =
+        toStream(new FlatteningIterator<>(new MappingIterator<>(listOfList.iterator(), List::iterator)))
+            .collect(Collectors.toList());
+
+    assertEquals(Arrays.asList(0, 1, 2, 3, 4, 5), flattenedList);
+  }
+}
diff --git a/hudi-spark-datasource/hudi-spark2/src/main/scala/org/apache/spark/sql/adapter/Spark2Adapter.scala b/hudi-spark-datasource/hudi-spark2/src/main/scala/org/apache/spark/sql/adapter/Spark2Adapter.scala
index da53f00e697..c4d7c1ff5b0 100644
--- a/hudi-spark-datasource/hudi-spark2/src/main/scala/org/apache/spark/sql/adapter/Spark2Adapter.scala
+++ b/hudi-spark-datasource/hudi-spark2/src/main/scala/org/apache/spark/sql/adapter/Spark2Adapter.scala
@@ -33,13 +33,11 @@ import org.apache.spark.sql.catalyst.parser.ParserInterface
 import org.apache.spark.sql.catalyst.plans.logical.{Command, DeleteFromTable, LogicalPlan}
 import org.apache.spark.sql.execution.datasources._
 import org.apache.spark.sql.execution.datasources.parquet.{ParquetFileFormat, Spark24HoodieParquetFileFormat}
+import org.apache.spark.sql.execution.vectorized.MutableColumnarRow
 import org.apache.spark.sql.hudi.SparkAdapter
 import org.apache.spark.sql.hudi.parser.HoodieSpark2ExtendedSqlParser
 import org.apache.spark.sql.sources.BaseRelation
 import org.apache.spark.sql.types.{DataType, StructType}
-import org.apache.spark.sql._
-import org.apache.spark.sql.execution.vectorized.MutableColumnarRow
-import org.apache.spark.sql.sources.BaseRelation
 import org.apache.spark.storage.StorageLevel
 import org.apache.spark.storage.StorageLevel._