You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hudi.apache.org by GitBox <gi...@apache.org> on 2022/01/05 21:48:12 UTC

[GitHub] [hudi] nsivabalan commented on a change in pull request #4352: [HUDI-1295] Metadata Index - Bloom filter and Column stats index to speed up index lookups

nsivabalan commented on a change in pull request #4352:
URL: https://github.com/apache/hudi/pull/4352#discussion_r779071706



##########
File path: hudi-client/hudi-client-common/src/main/java/org/apache/hudi/config/HoodieWriteConfig.java
##########
@@ -1417,6 +1417,19 @@ public boolean useBloomIndexBucketizedChecking() {
     return getBoolean(HoodieIndexConfig.BLOOM_INDEX_BUCKETIZED_CHECKING);
   }
 
+  public boolean isMetaIndexBloomFilterEnabled() {
+    return isMetadataTableEnabled() && getMetadataConfig().isMetaIndexBloomFilterEnabled();
+  }
+
+  public boolean isMetaIndexColumnStatsForAllColumns() {
+    return isMetadataTableEnabled() && isMetaIndexColumnStatsForAllColumns()
+        && getMetadataConfig().isMetaIndexColumnStatsForAllColumns();

Review comment:
       may I know why do we have two configs? 
   isMetaIndexColumnStatsForAllColumns()
   getMetadataConfig().isMetaIndexColumnStatsForAllColumns()

##########
File path: hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/index/bloom/HoodieBloomMetaIndexBatchCheckFunction.java
##########
@@ -0,0 +1,155 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hudi.index.bloom;
+
+import org.apache.hadoop.fs.Path;
+import org.apache.hudi.common.bloom.BloomFilterTypeCode;
+import org.apache.hudi.common.bloom.HoodieDynamicBoundedBloomFilter;
+import org.apache.hudi.common.model.HoodieBaseFile;
+import org.apache.hudi.common.model.HoodieKey;
+import org.apache.hudi.common.util.HoodieTimer;
+import org.apache.hudi.common.util.Option;
+import org.apache.hudi.common.util.ValidationUtils;
+import org.apache.hudi.common.util.collection.Pair;
+import org.apache.hudi.common.util.hash.FileIndexID;
+import org.apache.hudi.common.util.hash.PartitionIndexID;
+import org.apache.hudi.config.HoodieWriteConfig;
+import org.apache.hudi.exception.HoodieIndexException;
+import org.apache.hudi.io.HoodieKeyMetaIndexBatchLookupHandle.MetaBloomIndexGroupedKeyLookupResult;
+import org.apache.hudi.io.storage.HoodieFileReader;
+import org.apache.hudi.io.storage.HoodieFileReaderFactory;
+import org.apache.hudi.table.HoodieTable;
+import org.apache.log4j.LogManager;
+import org.apache.log4j.Logger;
+import org.apache.spark.api.java.function.Function2;
+import scala.Tuple2;
+
+import java.nio.ByteBuffer;
+import java.nio.charset.StandardCharsets;
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.stream.Collectors;
+
+/**
+ * Function performing actual checking of RDD partition containing (fileId, hoodieKeys) against the actual files.
+ */
+public class HoodieBloomMetaIndexBatchCheckFunction implements
+    Function2<Integer, Iterator<Tuple2<String, HoodieKey>>, Iterator<List<MetaBloomIndexGroupedKeyLookupResult>>> {
+
+  private static final Logger LOG = LogManager.getLogger(HoodieBloomMetaIndexBatchCheckFunction.class);
+  private final HoodieTable hoodieTable;
+  private final HoodieWriteConfig config;
+
+  public HoodieBloomMetaIndexBatchCheckFunction(HoodieTable hoodieTable, HoodieWriteConfig config) {
+    this.hoodieTable = hoodieTable;
+    this.config = config;
+  }
+
+  @Override
+  public Iterator<List<MetaBloomIndexGroupedKeyLookupResult>> call(Integer integer, Iterator<Tuple2<String, HoodieKey>> tuple2Iterator) throws Exception {
+    List<List<MetaBloomIndexGroupedKeyLookupResult>> resultList = new ArrayList<>();
+    Map<Pair<String, String>, List<HoodieKey>> fileToKeysMap = new HashMap<>();
+
+    while (tuple2Iterator.hasNext()) {
+      Tuple2<String, HoodieKey> entry = tuple2Iterator.next();
+      fileToKeysMap.computeIfAbsent(Pair.of(entry._2.getPartitionPath(), entry._1), k -> new ArrayList<>()).add(entry._2);
+    }
+
+    List<Pair<PartitionIndexID, FileIndexID>> partitionIDFileIDList =
+        fileToKeysMap.keySet().stream().map(partitionFileIdPair -> {
+          return Pair.of(new PartitionIndexID(partitionFileIdPair.getLeft()),
+              new FileIndexID(partitionFileIdPair.getRight()));
+        }).collect(Collectors.toList());
+
+    Map<String, ByteBuffer> fileIDToBloomFilterByteBufferMap =

Review comment:
       I was expecting we will sort the records before looking up in metadata table. is it intentionally avoided? or we rely on metadataTable.getBloomFilters() to sort it internally? 

##########
File path: hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/index/bloom/HoodieBloomMetaIndexBatchCheckFunction.java
##########
@@ -0,0 +1,155 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hudi.index.bloom;
+
+import org.apache.hadoop.fs.Path;
+import org.apache.hudi.common.bloom.BloomFilterTypeCode;
+import org.apache.hudi.common.bloom.HoodieDynamicBoundedBloomFilter;
+import org.apache.hudi.common.model.HoodieBaseFile;
+import org.apache.hudi.common.model.HoodieKey;
+import org.apache.hudi.common.util.HoodieTimer;
+import org.apache.hudi.common.util.Option;
+import org.apache.hudi.common.util.ValidationUtils;
+import org.apache.hudi.common.util.collection.Pair;
+import org.apache.hudi.common.util.hash.FileIndexID;
+import org.apache.hudi.common.util.hash.PartitionIndexID;
+import org.apache.hudi.config.HoodieWriteConfig;
+import org.apache.hudi.exception.HoodieIndexException;
+import org.apache.hudi.io.HoodieKeyMetaIndexBatchLookupHandle.MetaBloomIndexGroupedKeyLookupResult;
+import org.apache.hudi.io.storage.HoodieFileReader;
+import org.apache.hudi.io.storage.HoodieFileReaderFactory;
+import org.apache.hudi.table.HoodieTable;
+import org.apache.log4j.LogManager;
+import org.apache.log4j.Logger;
+import org.apache.spark.api.java.function.Function2;
+import scala.Tuple2;
+
+import java.nio.ByteBuffer;
+import java.nio.charset.StandardCharsets;
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.stream.Collectors;
+
+/**
+ * Function performing actual checking of RDD partition containing (fileId, hoodieKeys) against the actual files.
+ */
+public class HoodieBloomMetaIndexBatchCheckFunction implements
+    Function2<Integer, Iterator<Tuple2<String, HoodieKey>>, Iterator<List<MetaBloomIndexGroupedKeyLookupResult>>> {
+
+  private static final Logger LOG = LogManager.getLogger(HoodieBloomMetaIndexBatchCheckFunction.class);
+  private final HoodieTable hoodieTable;
+  private final HoodieWriteConfig config;
+
+  public HoodieBloomMetaIndexBatchCheckFunction(HoodieTable hoodieTable, HoodieWriteConfig config) {
+    this.hoodieTable = hoodieTable;
+    this.config = config;
+  }
+
+  @Override
+  public Iterator<List<MetaBloomIndexGroupedKeyLookupResult>> call(Integer integer, Iterator<Tuple2<String, HoodieKey>> tuple2Iterator) throws Exception {

Review comment:
       Whats the first entry in tuple in Iterator<Tuple2<String, HoodieKey>> tuple2Iterator?
   is it fileId or filename ? looks like fileName. 

##########
File path: hudi-client/hudi-client-common/src/main/java/org/apache/hudi/metadata/HoodieBackedTableMetadataWriter.java
##########
@@ -735,14 +768,19 @@ protected void bootstrapCommit(List<DirectoryInfo> partitionInfoList, String cre
     List<String> partitions = partitionInfoList.stream().map(p ->
         p.getRelativePath().isEmpty() ? NON_PARTITIONED_NAME : p.getRelativePath()).collect(Collectors.toList());
     final int totalFiles = partitionInfoList.stream().mapToInt(p -> p.getTotalFiles()).sum();
+    final Map<MetadataPartitionType, HoodieData<HoodieRecord>> partitionToRecordsMap = new HashMap<>();
 
     // Record which saves the list of all partitions
     HoodieRecord allPartitionRecord = HoodieMetadataPayload.createPartitionListRecord(partitions);
     if (partitions.isEmpty()) {
-      // in case of boostrapping of a fresh table, there won't be any partitions, but we need to make a boostrap commit
-      commit(engineContext.parallelize(Collections.singletonList(allPartitionRecord), 1), MetadataPartitionType.FILES.partitionPath(), createInstantTime, false);
+      // in case of bootstrapping of a fresh table, there won't be any partitions, but we need to make a boostrap commit
+      final HoodieData<HoodieRecord> allPartitionRecordsRDD = engineContext.parallelize(
+          Collections.singletonList(allPartitionRecord), 1);
+      partitionToRecordsMap.put(MetadataPartitionType.FILES, allPartitionRecordsRDD);
+      commit(createInstantTime, partitionToRecordsMap, false);

Review comment:
       don't we need to add records to cols stats and bloom partitions here as well (bootstrap code path) ? 

##########
File path: hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/metadata/SparkHoodieBackedTableMetadataWriter.java
##########
@@ -165,17 +167,40 @@ protected void commit(HoodieData<HoodieRecord> hoodieDataRecords, String partiti
 
   /**
    * Tag each record with the location in the given partition.
-   *
+   * <p>
    * The record is tagged with respective file slice's location based on its record key.
    */
-  private JavaRDD<HoodieRecord> prepRecords(JavaRDD<HoodieRecord> recordsRDD, String partitionName, int numFileGroups) {
-    List<FileSlice> fileSlices = HoodieTableMetadataUtil.getPartitionLatestFileSlices(metadataMetaClient, partitionName);
-    ValidationUtils.checkArgument(fileSlices.size() == numFileGroups, String.format("Invalid number of file groups: found=%d, required=%d", fileSlices.size(), numFileGroups));
-
-    return recordsRDD.map(r -> {
-      FileSlice slice = fileSlices.get(HoodieTableMetadataUtil.mapRecordKeyToFileGroupIndex(r.getRecordKey(), numFileGroups));
-      r.setCurrentLocation(new HoodieRecordLocation(slice.getBaseInstantTime(), slice.getFileId()));
-      return r;
-    });
+  private JavaRDD<HoodieRecord> prepRecords(Map<MetadataPartitionType, HoodieData<HoodieRecord>> partitionRecordsMap) {
+    // The result set
+    JavaRDD<HoodieRecord> rddAllPartitionRecords = null;
+
+    for (Map.Entry<MetadataPartitionType, HoodieData<HoodieRecord>> entry : partitionRecordsMap.entrySet()) {
+      final String partitionName = entry.getKey().partitionPath();
+      final int fileGroupCount = entry.getKey().getFileGroupCount();
+      HoodieData<HoodieRecord> records = entry.getValue();
+      JavaRDD<HoodieRecord> recordsRDD = (JavaRDD<HoodieRecord>) records.get();
+
+      List<FileSlice> fileSlices =
+          HoodieTableMetadataUtil.getPartitionLatestFileSlices(metadataMetaClient, partitionName);
+      ValidationUtils.checkArgument(fileSlices.size() == fileGroupCount,
+          String.format("Invalid number of file groups: found=%d, required=%d", fileSlices.size(), fileGroupCount));
+
+      JavaSparkContext jsc = ((HoodieSparkEngineContext) engineContext).getJavaSparkContext();
+      JavaRDD<HoodieRecord> rddSinglePartitionRecords = recordsRDD.map(r -> {
+        FileSlice slice = fileSlices.get(HoodieTableMetadataUtil.mapRecordKeyToFileGroupIndex(r.getRecordKey(),
+            fileGroupCount));
+        r.setCurrentLocation(new HoodieRecordLocation(slice.getBaseInstantTime(), slice.getFileId()));
+        return r;
+      });
+
+      if (rddAllPartitionRecords == null) {
+        rddAllPartitionRecords = rddSinglePartitionRecords;
+

Review comment:
       remove extra line break

##########
File path: hudi-common/src/main/avro/HoodieMetadata.avsc
##########
@@ -30,27 +30,118 @@
             "doc": "Type of the metadata record",
             "type": "int"
         },
-        {   "name": "filesystemMetadata",
+        {
             "doc": "Contains information about partitions and files within the dataset",
-            "type": ["null", {
-               "type": "map",
-               "values": {
+            "name": "filesystemMetadata",
+            "type": [
+                "null",
+                {
+                    "type": "map",
+                    "values": {
+                        "type": "record",
+                        "name": "HoodieMetadataFileInfo",
+                        "fields": [
+                            {
+                                "name": "size",
+                                "type": "long",
+                                "doc": "Size of the file"
+                            },
+                            {
+                                "name": "isDeleted",
+                                "type": "boolean",
+                                "doc": "True if this file has been deleted"
+                            }
+                        ]
+                    }
+                }
+            ]
+        },
+        {
+            "doc": "Metadata Index of bloom filters for all data files in the user table",
+            "name": "BloomFilterMetadata",
+            "type": [
+                "null",
+                {
+                    "doc": "Data file bloom filter details",
+                    "name": "HoodieMetadataBloomFilter",
                     "type": "record",
-                    "name": "HoodieMetadataFileInfo",
                     "fields": [
                         {
-                            "name": "size",
-                            "type": "long",
-                            "doc": "Size of the file"
+                            "doc": "Bloom filter type code",
+                            "name": "type",
+                            "type": "string"
+                        },
+                        {
+                            "doc": "Instant timestamp when this metadata was created/updated",
+                            "name": "timestamp",
+                            "type": "string"
+                        },
+                        {
+                            "doc": "Bloom filter binary byte array",
+                            "name": "bloomFilter",
+                            "type": "bytes"
                         },
                         {
+                            "doc": "Bloom filter entry valid/deleted flag",
                             "name": "isDeleted",
-                            "type": "boolean",
-                            "doc": "True if this file has been deleted"
+                            "type": "boolean"
+                        },
+                        {
+                            "doc": "Reserved bytes for future use",
+                            "name": "reserved",

Review comment:
       why do we need this? 

##########
File path: hudi-common/src/main/java/org/apache/hudi/metadata/BaseTableMetadata.java
##########
@@ -146,6 +155,97 @@ protected BaseTableMetadata(HoodieEngineContext engineContext, HoodieMetadataCon
         .getAllFilesInPartitions(partitions);
   }
 
+  @Override
+  public Option<ByteBuffer> getBloomFilter(final PartitionIndexID partitionIndexID, final FileIndexID fileIndexID)
+      throws HoodieMetadataException {
+    if (!isMetaIndexBloomFilterEnabled) {
+      LOG.error("Meta bloom filter index is disabled!");
+      return Option.empty();
+    }
+
+    HoodieTimer timer = new HoodieTimer().startTimer();
+    final String bloomIndexKey = partitionIndexID.asBase64EncodedString().concat(fileIndexID.asBase64EncodedString());
+    Option<HoodieRecord<HoodieMetadataPayload>> hoodieRecord = getRecordByKey(bloomIndexKey,
+        MetadataPartitionType.BLOOM_FILTERS.partitionPath());
+    metrics.ifPresent(m -> m.updateMetrics(HoodieMetadataMetrics.LOOKUP_BLOOM_FILTERS_METADATA_STR, timer.endTimer()));
+
+    if (!hoodieRecord.isPresent()) {
+      LOG.error("Meta bloom filter index: lookup failed for partition: " + partitionIndexID.getName() + ", file: "
+          + fileIndexID.getName());
+      return Option.empty();
+    }
+
+    final Option<HoodieMetadataBloomFilter> fileBloomFilter = hoodieRecord.get().getData().getBloomFilterMetadata();
+    if (!fileBloomFilter.isPresent()) {
+      LOG.error("Meta bloom filter index: bloom filter missing for partition: " + partitionIndexID.getName()
+          + ", file: " + fileIndexID.getName());
+      return Option.empty();
+    }
+
+    return Option.of(fileBloomFilter.get().getBloomFilter());
+  }
+
+  @Override
+  public Map<String, ByteBuffer> getBloomFilters(final List<Pair<PartitionIndexID, FileIndexID>> partitionFileIndexIDList)
+      throws HoodieMetadataException {
+    if (!isMetaIndexBloomFilterEnabled) {
+      LOG.error("Meta bloom filter index is disabled!");

Review comment:
       shouldn't we throw exception. 

##########
File path: hudi-common/src/main/java/org/apache/hudi/metadata/BaseTableMetadata.java
##########
@@ -146,6 +155,97 @@ protected BaseTableMetadata(HoodieEngineContext engineContext, HoodieMetadataCon
         .getAllFilesInPartitions(partitions);
   }
 
+  @Override
+  public Option<ByteBuffer> getBloomFilter(final PartitionIndexID partitionIndexID, final FileIndexID fileIndexID)
+      throws HoodieMetadataException {
+    if (!isMetaIndexBloomFilterEnabled) {
+      LOG.error("Meta bloom filter index is disabled!");
+      return Option.empty();
+    }
+
+    HoodieTimer timer = new HoodieTimer().startTimer();
+    final String bloomIndexKey = partitionIndexID.asBase64EncodedString().concat(fileIndexID.asBase64EncodedString());
+    Option<HoodieRecord<HoodieMetadataPayload>> hoodieRecord = getRecordByKey(bloomIndexKey,
+        MetadataPartitionType.BLOOM_FILTERS.partitionPath());
+    metrics.ifPresent(m -> m.updateMetrics(HoodieMetadataMetrics.LOOKUP_BLOOM_FILTERS_METADATA_STR, timer.endTimer()));
+
+    if (!hoodieRecord.isPresent()) {
+      LOG.error("Meta bloom filter index: lookup failed for partition: " + partitionIndexID.getName() + ", file: "
+          + fileIndexID.getName());
+      return Option.empty();
+    }
+
+    final Option<HoodieMetadataBloomFilter> fileBloomFilter = hoodieRecord.get().getData().getBloomFilterMetadata();
+    if (!fileBloomFilter.isPresent()) {
+      LOG.error("Meta bloom filter index: bloom filter missing for partition: " + partitionIndexID.getName()
+          + ", file: " + fileIndexID.getName());
+      return Option.empty();
+    }
+
+    return Option.of(fileBloomFilter.get().getBloomFilter());
+  }
+
+  @Override
+  public Map<String, ByteBuffer> getBloomFilters(final List<Pair<PartitionIndexID, FileIndexID>> partitionFileIndexIDList)
+      throws HoodieMetadataException {
+    if (!isMetaIndexBloomFilterEnabled) {
+      LOG.error("Meta bloom filter index is disabled!");
+      return Collections.emptyMap();
+    }
+
+    HoodieTimer timer = new HoodieTimer().startTimer();
+    Set<String> partitionIDFileIDSortedStrings = new TreeSet<>();
+    partitionFileIndexIDList.forEach(partitionIDFileIDPair -> {
+          final String bloomKey = partitionIDFileIDPair.getLeft().asBase64EncodedString()
+              .concat(partitionIDFileIDPair.getRight().asBase64EncodedString());
+          partitionIDFileIDSortedStrings.add(bloomKey);
+        }
+    );
+    List<String> partitionIDFileIDStrings = new ArrayList<>(partitionIDFileIDSortedStrings);
+
+    List<Pair<String, Option<HoodieRecord<HoodieMetadataPayload>>>> hoodieRecordList =
+        getRecordsByKeys(partitionIDFileIDStrings, MetadataPartitionType.BLOOM_FILTERS.partitionPath());
+    metrics.ifPresent(m -> m.updateMetrics(HoodieMetadataMetrics.LOOKUP_BLOOM_FILTERS_METADATA_STR, timer.endTimer()));
+
+    Map<String, ByteBuffer> fileToBloomFilterMap = new HashMap<>();
+    for (final Pair<String, Option<HoodieRecord<HoodieMetadataPayload>>> entry : hoodieRecordList) {
+      if (entry.getRight().isPresent()) {
+        final Option<HoodieMetadataBloomFilter> optionalBloomFilterMetadata =
+            entry.getRight().get().getData().getBloomFilterMetadata();
+        if (optionalBloomFilterMetadata.isPresent()) {
+          fileToBloomFilterMap.put(entry.getLeft(), optionalBloomFilterMetadata.get().getBloomFilter());
+        }
+      }
+    }
+    return fileToBloomFilterMap;
+  }
+
+  @Override
+  public Map<String, HoodieColumnStats> getColumnStats(List<String> keySet) throws HoodieMetadataException {
+    if (!isMetaIndexColumnStatsEnabled) {
+      LOG.error("Meta column range index is disabled!");
+      return Collections.emptyMap();
+    }
+
+    HoodieTimer timer = new HoodieTimer().startTimer();
+    List<Pair<String, Option<HoodieRecord<HoodieMetadataPayload>>>> hoodieRecordList =

Review comment:
       do we know if the keySet is sorted? 

##########
File path: hudi-common/src/main/java/org/apache/hudi/metadata/HoodieBackedTableMetadata.java
##########
@@ -125,30 +129,43 @@ private void initIfNeeded() {
     return recordsByKeys.size() == 0 ? Option.empty() : recordsByKeys.get(0).getValue();
   }
 
-  protected List<Pair<String, Option<HoodieRecord<HoodieMetadataPayload>>>> getRecordsByKeys(List<String> keys, String partitionName) {
-    Pair<HoodieFileReader, HoodieMetadataMergedLogRecordReader> readers = openReadersIfNeeded(keys.get(0), partitionName);
-    try {
-      List<Long> timings = new ArrayList<>();
-      HoodieFileReader baseFileReader = readers.getKey();
-      HoodieMetadataMergedLogRecordReader logRecordScanner = readers.getRight();
+  @Override
+  protected List<Pair<String, Option<HoodieRecord<HoodieMetadataPayload>>>> getRecordsByKeys(List<String> keys,
+                                                                                             String partitionName) {
+    Map<Pair<String, FileSlice>, List<String>> partitionFileSliceToKeysMap = getPartitionFileSlices(partitionName, keys);
+    List<Pair<String, Option<HoodieRecord<HoodieMetadataPayload>>>> result = new ArrayList<>();
+    AtomicInteger fileSlicesKeysCount = new AtomicInteger();
+    partitionFileSliceToKeysMap.forEach((partitionFileSlicePair, fileSliceKeys) -> {

Review comment:
       shouldn't we be parallelizing the read from multiple file slices (spark.map). sequentially iterating over 100 file groups might have an impact on perf. 

##########
File path: hudi-client/hudi-client-common/src/main/java/org/apache/hudi/index/bloom/HoodieBloomIndex.java
##########
@@ -159,6 +173,71 @@ public HoodieBloomIndex(HoodieWriteConfig config, BaseHoodieBloomIndexHelper blo
     }
   }
 
+  /**
+   * Load the column stats index as BloomIndexFileInfo for all the involved files in the partition.
+   *
+   * @param partitions  - List of partitions for which column stats need to be loaded
+   * @param context     - Engine context
+   * @param hoodieTable - Hoodie table

Review comment:
       can you add java docs for return value as well. 

##########
File path: hudi-client/hudi-client-common/src/main/java/org/apache/hudi/index/bloom/HoodieBloomIndex.java
##########
@@ -111,13 +124,14 @@ public HoodieBloomIndex(HoodieWriteConfig config, BaseHoodieBloomIndexHelper blo
   private HoodiePairData<HoodieKey, HoodieRecordLocation> lookupIndex(
       HoodiePairData<String, String> partitionRecordKeyPairs, final HoodieEngineContext context,
       final HoodieTable hoodieTable) {
-    // Obtain records per partition, in the incoming records
+    // Step 1: Obtain records per partition, in the incoming records
     Map<String, Long> recordsPerPartition = partitionRecordKeyPairs.countByKey();
     List<String> affectedPartitionPathList = new ArrayList<>(recordsPerPartition.keySet());
 
     // Step 2: Load all involved files as <Partition, filename> pairs
-    List<Pair<String, BloomIndexFileInfo>> fileInfoList =
-        loadInvolvedFiles(affectedPartitionPathList, context, hoodieTable);
+    List<Pair<String, BloomIndexFileInfo>> fileInfoList = (config.getMetadataConfig().isMetaIndexColumnStatsEnabled()

Review comment:
       Even incase of column stats, in the return list, first entry in the pair is fileId right and not fileName? 
   interested to see what we do while looking up the bloom filter if we don't have the entire file name handy. will continue reviewing.

##########
File path: hudi-client/hudi-client-common/src/main/java/org/apache/hudi/index/bloom/HoodieBloomIndex.java
##########
@@ -159,6 +173,71 @@ public HoodieBloomIndex(HoodieWriteConfig config, BaseHoodieBloomIndexHelper blo
     }
   }
 
+  /**
+   * Load the column stats index as BloomIndexFileInfo for all the involved files in the partition.
+   *
+   * @param partitions  - List of partitions for which column stats need to be loaded
+   * @param context     - Engine context
+   * @param hoodieTable - Hoodie table
+   */
+  List<Pair<String, BloomIndexFileInfo>> loadColumnStats(
+      List<String> partitions, final HoodieEngineContext context, final HoodieTable hoodieTable) {
+    HoodieTimer timer = new HoodieTimer().startTimer();
+    if (config.getBloomIndexPruneByRanges()) {
+      // also obtain file ranges, if range pruning is enabled
+      context.setJobStatus(this.getClass().getName(), "Obtain key ranges for file slices (range pruning=on)");
+
+      final String keyField = hoodieTable.getMetaClient().getTableConfig().getRecordKeyFieldProp();
+      return context.flatMap(partitions, new SerializableFunction<String, Stream<Pair<String, BloomIndexFileInfo>>>() {
+        @Override
+        public Stream<Pair<String, BloomIndexFileInfo>> apply(String partitionName) throws Exception {
+          final String columnIndexID = new ColumnIndexID(keyField).asBase64EncodedString();
+          final String partitionIndexID = new PartitionIndexID(partitionName).asBase64EncodedString();
+
+          List<Pair<String, String>> partitionFileIdList =
+              HoodieIndexUtils.getLatestBaseFilesForPartition(partitionName,
+                      hoodieTable).stream().map(baseFile -> Pair.of(baseFile.getFileId(), baseFile.getFileName()))
+                  .collect(toList());
+          try {
+            Map<String, String> columnStatKeyToFileIdMap = new HashMap<>();
+            List<String> columnStatKeys = new ArrayList<>();
+            for (Pair<String, String> fileIdFileName : partitionFileIdList) {
+              final String columnStatIndexKey = columnIndexID
+                  .concat(partitionIndexID)
+                  .concat(new FileIndexID(fileIdFileName.getLeft()).asBase64EncodedString());
+              columnStatKeys.add(columnStatIndexKey);
+              columnStatKeyToFileIdMap.put(columnStatIndexKey, fileIdFileName.getLeft());
+            }
+            Collections.sort(columnStatKeys);
+
+            Map<String, HoodieColumnStats> columnKeyHashToStatMap = hoodieTable

Review comment:
       note to self. 
   we delegate the col stats look up to executor. So, each executor reads col stats partition pertaining to one hoodie partition. 
   

##########
File path: hudi-client/hudi-client-common/src/main/java/org/apache/hudi/index/bloom/HoodieBloomIndex.java
##########
@@ -159,6 +173,71 @@ public HoodieBloomIndex(HoodieWriteConfig config, BaseHoodieBloomIndexHelper blo
     }
   }
 
+  /**
+   * Load the column stats index as BloomIndexFileInfo for all the involved files in the partition.
+   *
+   * @param partitions  - List of partitions for which column stats need to be loaded
+   * @param context     - Engine context
+   * @param hoodieTable - Hoodie table
+   */
+  List<Pair<String, BloomIndexFileInfo>> loadColumnStats(
+      List<String> partitions, final HoodieEngineContext context, final HoodieTable hoodieTable) {
+    HoodieTimer timer = new HoodieTimer().startTimer();
+    if (config.getBloomIndexPruneByRanges()) {
+      // also obtain file ranges, if range pruning is enabled
+      context.setJobStatus(this.getClass().getName(), "Obtain key ranges for file slices (range pruning=on)");
+
+      final String keyField = hoodieTable.getMetaClient().getTableConfig().getRecordKeyFieldProp();
+      return context.flatMap(partitions, new SerializableFunction<String, Stream<Pair<String, BloomIndexFileInfo>>>() {
+        @Override
+        public Stream<Pair<String, BloomIndexFileInfo>> apply(String partitionName) throws Exception {
+          final String columnIndexID = new ColumnIndexID(keyField).asBase64EncodedString();
+          final String partitionIndexID = new PartitionIndexID(partitionName).asBase64EncodedString();
+
+          List<Pair<String, String>> partitionFileIdList =
+              HoodieIndexUtils.getLatestBaseFilesForPartition(partitionName,
+                      hoodieTable).stream().map(baseFile -> Pair.of(baseFile.getFileId(), baseFile.getFileName()))
+                  .collect(toList());
+          try {
+            Map<String, String> columnStatKeyToFileIdMap = new HashMap<>();
+            List<String> columnStatKeys = new ArrayList<>();
+            for (Pair<String, String> fileIdFileName : partitionFileIdList) {
+              final String columnStatIndexKey = columnIndexID
+                  .concat(partitionIndexID)
+                  .concat(new FileIndexID(fileIdFileName.getLeft()).asBase64EncodedString());
+              columnStatKeys.add(columnStatIndexKey);
+              columnStatKeyToFileIdMap.put(columnStatIndexKey, fileIdFileName.getLeft());
+            }
+            Collections.sort(columnStatKeys);
+
+            Map<String, HoodieColumnStats> columnKeyHashToStatMap = hoodieTable
+                .getMetadataTable().getColumnStats(columnStatKeys);
+            List<Pair<String, BloomIndexFileInfo>> result = new ArrayList<>();
+            for (Map.Entry<String, HoodieColumnStats> entry : columnKeyHashToStatMap.entrySet()) {
+              result.add(Pair.of(partitionName,
+                  new BloomIndexFileInfo(
+                      columnStatKeyToFileIdMap.get(entry.getKey()),
+                      entry.getValue().getMinValue(),
+                      entry.getValue().getMaxValue()
+                  )));
+            }
+            return result.stream();
+          } catch (MetadataNotFoundException me) {
+            throw new HoodieMetadataException("Unable to find column range metadata for partition:" + partitionName, me);
+          }
+        }
+      }, Math.max(partitions.size(), 1));
+    } else {
+      // Obtain the latest data files from all the partitions.
+      List<Pair<String, String>> partitionPathFileIDList = getLatestBaseFilesForAllPartitions(partitions, context,

Review comment:
       can we re-use code. is this same as loadInvolvedFiles (else block)

##########
File path: hudi-client/hudi-client-common/src/main/java/org/apache/hudi/io/HoodieKeyMetaIndexLookupHandle.java
##########
@@ -0,0 +1,170 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hudi.io;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.Path;
+import org.apache.hudi.common.bloom.BloomFilter;
+import org.apache.hudi.common.bloom.BloomFilterTypeCode;
+import org.apache.hudi.common.bloom.HoodieDynamicBoundedBloomFilter;
+import org.apache.hudi.common.model.HoodieBaseFile;
+import org.apache.hudi.common.model.HoodieRecordPayload;
+import org.apache.hudi.common.model.HoodieTableType;
+import org.apache.hudi.common.util.HoodieTimer;
+import org.apache.hudi.common.util.Option;
+import org.apache.hudi.common.util.collection.Pair;
+import org.apache.hudi.common.util.hash.FileIndexID;
+import org.apache.hudi.common.util.hash.PartitionIndexID;
+import org.apache.hudi.config.HoodieWriteConfig;
+import org.apache.hudi.exception.HoodieIndexException;
+import org.apache.hudi.table.HoodieTable;
+import org.apache.log4j.LogManager;
+import org.apache.log4j.Logger;
+
+import java.nio.ByteBuffer;
+import java.nio.charset.StandardCharsets;
+import java.util.ArrayList;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Set;
+
+/**
+ * Takes a bunch of keys and returns ones that are present in the file group.
+ */
+public class HoodieKeyMetaIndexLookupHandle<T extends HoodieRecordPayload, I, K, O> extends HoodieReadHandle<T,
+    I, K, O> {
+
+  private static final Logger LOG = LogManager.getLogger(HoodieKeyMetaIndexLookupHandle.class);
+  private final HoodieTableType tableType;
+  private final BloomFilter bloomFilter;
+  private final List<String> candidateRecordKeys;
+  private long totalKeysChecked;
+
+  public HoodieKeyMetaIndexLookupHandle(HoodieWriteConfig config, HoodieTable<T, I, K, O> hoodieTable,
+                                        Pair<String, String> partitionPathFileIdPair, String fileId) {
+    super(config, null, hoodieTable, partitionPathFileIdPair);
+    this.tableType = hoodieTable.getMetaClient().getTableType();
+    this.candidateRecordKeys = new ArrayList<>();
+    this.totalKeysChecked = 0;
+
+    HoodieTimer timer = new HoodieTimer().startTimer();
+    Option<ByteBuffer> bloomFilterByteBuffer =

Review comment:
       from what I infer, only difference I see between this class and HoodieKeyLookupHandle is the instantiation of BloomFilter. rest of the code is same. may be good to abstract it out.

##########
File path: hudi-client/hudi-client-common/src/main/java/org/apache/hudi/index/bloom/HoodieBloomIndex.java
##########
@@ -80,9 +90,12 @@ public HoodieBloomIndex(HoodieWriteConfig config, BaseHoodieBloomIndexHelper blo
     HoodiePairData<String, String> partitionRecordKeyPairs = records.mapToPair(
         record -> new ImmutablePair<>(record.getPartitionPath(), record.getRecordKey()));
 
+    HoodieTimer timer = new HoodieTimer().startTimer();
     // Step 2: Lookup indexes for all the partition/recordkey pair
     HoodiePairData<HoodieKey, HoodieRecordLocation> keyFilenamePairs =
         lookupIndex(partitionRecordKeyPairs, context, hoodieTable);
+    final long indexLookupTime = timer.endTimer();
+    LOG.debug("Index lookup time taken: " + indexLookupTime + " ms");

Review comment:
       minor. Index look up should also include the  tagging back location to records in my opinion. may be we can rewrod this a bit. but will leave it to you.

##########
File path: hudi-common/src/main/java/org/apache/hudi/metadata/HoodieTableMetadataUtil.java
##########
@@ -319,9 +635,90 @@ private static void processRollbackMetadata(HoodieActiveTimeline metadataTableTi
     return records;
   }
 
+  /**
+   * Convert rollback action metadata to bloom filter index records.
+   */
+  private static List<HoodieRecord> convertFilesToBloomFilterRecords(HoodieEngineContext engineContext,
+                                                                     HoodieTableMetaClient dataMetaClient,
+                                                                     Map<String, List<String>> partitionToDeletedFiles,
+                                                                     Map<String, Map<String, Long>> partitionToAppendedFiles,
+                                                                     String instantTime) {
+    List<HoodieRecord> records = new LinkedList<>();
+    partitionToDeletedFiles.forEach((partitionName, deletedFileList) -> deletedFileList.forEach(deletedFile -> {
+      if (!FSUtils.isBaseFile(new Path(deletedFile))) {
+        return;
+      }
+
+      final String partition = partitionName.equals(EMPTY_PARTITION_NAME) ? NON_PARTITIONED_NAME : partitionName;
+      records.add(HoodieMetadataPayload.createBloomFilterMetadataRecord(
+          new PartitionIndexID(partition), new FileIndexID(deletedFile),
+          instantTime, ByteBuffer.allocate(0), true));
+    }));
+
+    partitionToAppendedFiles.forEach((partitionName, appendedFileMap) -> {
+      final String partition = partitionName.equals(EMPTY_PARTITION_NAME) ? NON_PARTITIONED_NAME : partitionName;
+      appendedFileMap.forEach((appendedFile, length) -> {
+        if (!FSUtils.isBaseFile(new Path(appendedFile))) {
+          return;
+        }
+        final String pathWithPartition = partitionName + "/" + appendedFile;

Review comment:
       Do you know in any of your testing, this code block was executed? 

##########
File path: hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/index/bloom/HoodieBloomMetaIndexLazyCheckFunction.java
##########
@@ -0,0 +1,121 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hudi.index.bloom;
+
+import org.apache.hudi.client.utils.LazyIterableIterator;
+import org.apache.hudi.common.model.HoodieKey;
+import org.apache.hudi.common.util.ValidationUtils;
+import org.apache.hudi.common.util.collection.Pair;
+import org.apache.hudi.config.HoodieWriteConfig;
+import org.apache.hudi.exception.HoodieException;
+import org.apache.hudi.exception.HoodieIndexException;
+import org.apache.hudi.io.HoodieKeyMetaIndexLookupHandle;
+import org.apache.hudi.io.HoodieKeyMetaIndexLookupHandle.MetaBloomIndexKeyLookupResult;
+import org.apache.hudi.table.HoodieTable;
+import org.apache.log4j.LogManager;
+import org.apache.log4j.Logger;
+import org.apache.spark.api.java.function.Function2;
+import scala.Tuple2;
+
+import java.util.ArrayList;
+import java.util.Iterator;
+import java.util.List;
+
+/**
+ * Function performing actual checking of RDD partition containing (fileId, hoodieKeys) against the actual files.
+ */
+public class HoodieBloomMetaIndexLazyCheckFunction implements Function2<Integer, Iterator<Tuple2<String, HoodieKey>>, Iterator<List<MetaBloomIndexKeyLookupResult>>> {

Review comment:
       Again, this looks very similar to HoodieBloomIndexCheckFunction. did you consider abstracting it out

##########
File path: hudi-client/hudi-client-common/src/main/java/org/apache/hudi/io/HoodieKeyMetaIndexBatchLookupHandle.java
##########
@@ -0,0 +1,176 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hudi.io;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.Path;
+import org.apache.hudi.common.bloom.BloomFilter;
+import org.apache.hudi.common.bloom.BloomFilterTypeCode;
+import org.apache.hudi.common.bloom.HoodieDynamicBoundedBloomFilter;
+import org.apache.hudi.common.model.HoodieBaseFile;
+import org.apache.hudi.common.model.HoodieRecordPayload;
+import org.apache.hudi.common.model.HoodieTableType;
+import org.apache.hudi.common.util.HoodieTimer;
+import org.apache.hudi.common.util.Option;
+import org.apache.hudi.common.util.collection.Pair;
+import org.apache.hudi.common.util.hash.FileIndexID;
+import org.apache.hudi.common.util.hash.PartitionIndexID;
+import org.apache.hudi.config.HoodieWriteConfig;
+import org.apache.hudi.exception.HoodieIndexException;
+import org.apache.hudi.table.HoodieTable;
+import org.apache.log4j.LogManager;
+import org.apache.log4j.Logger;
+
+import java.nio.ByteBuffer;
+import java.nio.charset.StandardCharsets;
+import java.util.ArrayList;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Set;
+
+/**
+ * Takes a bunch of keys and returns ones that are present in the file group.
+ */
+public class HoodieKeyMetaIndexBatchLookupHandle<T extends HoodieRecordPayload, I, K, O> extends HoodieReadHandle<T,
+    I, K, O> {
+
+  private static final Logger LOG = LogManager.getLogger(HoodieKeyMetaIndexBatchLookupHandle.class);
+
+  private final HoodieTableType tableType;
+
+  private final BloomFilter bloomFilter;
+
+  private final List<String> candidateRecordKeys;
+
+  private long totalKeysChecked;
+
+  public HoodieKeyMetaIndexBatchLookupHandle(HoodieWriteConfig config, HoodieTable<T, I, K, O> hoodieTable,

Review comment:
       may I know where is this used?

##########
File path: hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/index/bloom/HoodieBloomMetaIndexBatchCheckFunction.java
##########
@@ -0,0 +1,155 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hudi.index.bloom;
+
+import org.apache.hadoop.fs.Path;
+import org.apache.hudi.common.bloom.BloomFilterTypeCode;
+import org.apache.hudi.common.bloom.HoodieDynamicBoundedBloomFilter;
+import org.apache.hudi.common.model.HoodieBaseFile;
+import org.apache.hudi.common.model.HoodieKey;
+import org.apache.hudi.common.util.HoodieTimer;
+import org.apache.hudi.common.util.Option;
+import org.apache.hudi.common.util.ValidationUtils;
+import org.apache.hudi.common.util.collection.Pair;
+import org.apache.hudi.common.util.hash.FileIndexID;
+import org.apache.hudi.common.util.hash.PartitionIndexID;
+import org.apache.hudi.config.HoodieWriteConfig;
+import org.apache.hudi.exception.HoodieIndexException;
+import org.apache.hudi.io.HoodieKeyMetaIndexBatchLookupHandle.MetaBloomIndexGroupedKeyLookupResult;
+import org.apache.hudi.io.storage.HoodieFileReader;
+import org.apache.hudi.io.storage.HoodieFileReaderFactory;
+import org.apache.hudi.table.HoodieTable;
+import org.apache.log4j.LogManager;
+import org.apache.log4j.Logger;
+import org.apache.spark.api.java.function.Function2;
+import scala.Tuple2;
+
+import java.nio.ByteBuffer;
+import java.nio.charset.StandardCharsets;
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.stream.Collectors;
+
+/**
+ * Function performing actual checking of RDD partition containing (fileId, hoodieKeys) against the actual files.
+ */
+public class HoodieBloomMetaIndexBatchCheckFunction implements
+    Function2<Integer, Iterator<Tuple2<String, HoodieKey>>, Iterator<List<MetaBloomIndexGroupedKeyLookupResult>>> {
+
+  private static final Logger LOG = LogManager.getLogger(HoodieBloomMetaIndexBatchCheckFunction.class);
+  private final HoodieTable hoodieTable;
+  private final HoodieWriteConfig config;
+
+  public HoodieBloomMetaIndexBatchCheckFunction(HoodieTable hoodieTable, HoodieWriteConfig config) {
+    this.hoodieTable = hoodieTable;
+    this.config = config;
+  }
+
+  @Override
+  public Iterator<List<MetaBloomIndexGroupedKeyLookupResult>> call(Integer integer, Iterator<Tuple2<String, HoodieKey>> tuple2Iterator) throws Exception {
+    List<List<MetaBloomIndexGroupedKeyLookupResult>> resultList = new ArrayList<>();
+    Map<Pair<String, String>, List<HoodieKey>> fileToKeysMap = new HashMap<>();
+
+    while (tuple2Iterator.hasNext()) {
+      Tuple2<String, HoodieKey> entry = tuple2Iterator.next();
+      fileToKeysMap.computeIfAbsent(Pair.of(entry._2.getPartitionPath(), entry._1), k -> new ArrayList<>()).add(entry._2);
+    }
+
+    List<Pair<PartitionIndexID, FileIndexID>> partitionIDFileIDList =
+        fileToKeysMap.keySet().stream().map(partitionFileIdPair -> {
+          return Pair.of(new PartitionIndexID(partitionFileIdPair.getLeft()),
+              new FileIndexID(partitionFileIdPair.getRight()));
+        }).collect(Collectors.toList());
+
+    Map<String, ByteBuffer> fileIDToBloomFilterByteBufferMap =
+        hoodieTable.getMetadataTable().getBloomFilters(partitionIDFileIDList);
+
+    fileToKeysMap.forEach((partitionPathFileIdPair, hoodieKeyList) -> {
+      final String partitionPath = partitionPathFileIdPair.getLeft();
+      final String fileId = partitionPathFileIdPair.getRight();
+      ValidationUtils.checkState(!fileId.isEmpty());
+
+      final String partitionIDHash = new PartitionIndexID(partitionPath).asBase64EncodedString();
+      final String fileIDHash = new FileIndexID(fileId).asBase64EncodedString();

Review comment:
       is L90 refers to fileId or fileName. can you fix the variable naming please.

##########
File path: hudi-common/src/main/java/org/apache/hudi/metadata/HoodieTableMetadataUtil.java
##########
@@ -319,9 +635,90 @@ private static void processRollbackMetadata(HoodieActiveTimeline metadataTableTi
     return records;
   }
 
+  /**
+   * Convert rollback action metadata to bloom filter index records.
+   */
+  private static List<HoodieRecord> convertFilesToBloomFilterRecords(HoodieEngineContext engineContext,
+                                                                     HoodieTableMetaClient dataMetaClient,
+                                                                     Map<String, List<String>> partitionToDeletedFiles,
+                                                                     Map<String, Map<String, Long>> partitionToAppendedFiles,
+                                                                     String instantTime) {
+    List<HoodieRecord> records = new LinkedList<>();
+    partitionToDeletedFiles.forEach((partitionName, deletedFileList) -> deletedFileList.forEach(deletedFile -> {
+      if (!FSUtils.isBaseFile(new Path(deletedFile))) {
+        return;
+      }
+
+      final String partition = partitionName.equals(EMPTY_PARTITION_NAME) ? NON_PARTITIONED_NAME : partitionName;
+      records.add(HoodieMetadataPayload.createBloomFilterMetadataRecord(
+          new PartitionIndexID(partition), new FileIndexID(deletedFile),
+          instantTime, ByteBuffer.allocate(0), true));
+    }));
+
+    partitionToAppendedFiles.forEach((partitionName, appendedFileMap) -> {
+      final String partition = partitionName.equals(EMPTY_PARTITION_NAME) ? NON_PARTITIONED_NAME : partitionName;
+      appendedFileMap.forEach((appendedFile, length) -> {
+        if (!FSUtils.isBaseFile(new Path(appendedFile))) {
+          return;
+        }
+        final String pathWithPartition = partitionName + "/" + appendedFile;
+        final Path appendedFilePath = new Path(dataMetaClient.getBasePath(), pathWithPartition);
+        try {
+          HoodieFileReader<IndexedRecord> fileReader =
+              HoodieFileReaderFactory.getFileReader(dataMetaClient.getHadoopConf(), appendedFilePath);
+          final BloomFilter fileBloomFilter = fileReader.readBloomFilter();
+          if (fileBloomFilter == null) {
+            LOG.error("Failed to read bloom filter for " + appendedFilePath);
+            return;
+          }
+          ByteBuffer bloomByteBuffer = ByteBuffer.wrap(fileBloomFilter.serializeToString().getBytes());
+          HoodieRecord record = HoodieMetadataPayload.createBloomFilterMetadataRecord(
+              new PartitionIndexID(partition), new FileIndexID(appendedFile), instantTime,
+              bloomByteBuffer, false);
+          records.add(record);
+          fileReader.close();
+        } catch (IOException e) {
+          LOG.error("Failed to get bloom filter for file: " + appendedFilePath);
+        }
+      });
+    });
+    return records;
+  }
+
+  /**
+   * Convert rollback action metadata to column stats index records.
+   */
+  private static List<HoodieRecord> convertFilesToColumnStatsRecords(HoodieEngineContext engineContext,
+                                                                     HoodieTableMetaClient datasetMetaClient,
+                                                                     Map<String, List<String>> partitionToDeletedFiles,
+                                                                     Map<String, Map<String, Long>> partitionToAppendedFiles,
+                                                                     String instantTime) {
+    List<HoodieRecord> records = new LinkedList<>();
+    List<String> latestColumns = getLatestColumns(datasetMetaClient);
+    partitionToDeletedFiles.forEach((partitionName, deletedFileList) -> deletedFileList.forEach(deletedFile -> {
+      final String partition = partitionName.equals(EMPTY_PARTITION_NAME) ? NON_PARTITIONED_NAME : partitionName;
+      if (deletedFile.endsWith(HoodieFileFormat.PARQUET.getFileExtension())) {
+        final String filePathWithPartition = partitionName + "/" + deletedFile;
+        records.addAll(getColumnStats(partition, filePathWithPartition, datasetMetaClient,

Review comment:
       we can't fetch col stats for a delete file(the file does not exists only). wondering did we test this code path, or am I missing anything. 

##########
File path: hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/index/bloom/SparkHoodieBloomIndexHelper.java
##########
@@ -67,18 +66,43 @@ public static SparkHoodieBloomIndexHelper getInstance() {
       HoodieData<ImmutablePair<String, HoodieKey>> fileComparisonPairs,
       Map<String, List<BloomIndexFileInfo>> partitionToFileInfo,
       Map<String, Long> recordsPerPartition) {
+    HoodieTimer timer = new HoodieTimer().startTimer();
     JavaRDD<Tuple2<String, HoodieKey>> fileComparisonsRDD =
         HoodieJavaRDD.getJavaRDD(fileComparisonPairs)
             .map(pair -> new Tuple2<>(pair.getLeft(), pair.getRight()));
-    Map<String, Long> comparisonsPerFileGroup = computeComparisonsPerFileGroup(
-        config, recordsPerPartition, partitionToFileInfo, fileComparisonsRDD, context);
-    int inputParallelism =
-        HoodieJavaPairRDD.getJavaPairRDD(partitionRecordKeyPairs).partitions().size();
+
+    int inputParallelism = HoodieJavaPairRDD.getJavaPairRDD(partitionRecordKeyPairs).partitions().size();
     int joinParallelism = Math.max(inputParallelism, config.getBloomIndexParallelism());
     LOG.info("InputParallelism: ${" + inputParallelism + "}, IndexParallelism: ${"
         + config.getBloomIndexParallelism() + "}");
 
-    if (config.useBloomIndexBucketizedChecking()) {
+    if (config.isMetaIndexBloomFilterEnabled()) {
+      // Step 1: Sort by file id
+      JavaRDD<Tuple2<String, HoodieKey>> sortedFileIdAndKeyPairs =
+          fileComparisonsRDD.sortBy(entry -> entry._1, true, joinParallelism);
+
+      // Step 2: Use bloom filter to filter and the actual log file to get the record location
+      final boolean isBloomFiltersBatchLoadEnabled = config.isMetaIndexBloomFilterBatchLoadEnabled();
+      if (isBloomFiltersBatchLoadEnabled) {
+        return HoodieJavaPairRDD.of(sortedFileIdAndKeyPairs.mapPartitionsWithIndex(
+                new HoodieBloomMetaIndexBatchCheckFunction(hoodieTable, config), true)
+            .flatMap(List::iterator).filter(lr -> lr.getMatchingRecordKeys().size() > 0)

Review comment:
       I see this code snippet is repeated. 
   ```
   .flatMap(List::iterator).filter(lr -> lr.getMatchingRecordKeys().size() > 0)
               .flatMapToPair(lookupResult -> lookupResult.getMatchingRecordKeys().stream()
                   .map(recordKey -> new Tuple2<>(new HoodieKey(recordKey, lookupResult.getPartitionPath()),
                       new HoodieRecordLocation(lookupResult.getBaseInstantTime(), lookupResult.getFileId())))
                   .collect(Collectors.toList()).iterator()));
   ```
   including lines 116 to 122. Did you consider to re-use code.

##########
File path: hudi-client/hudi-client-common/src/main/java/org/apache/hudi/io/HoodieKeyMetaIndexLookupHandle.java
##########
@@ -0,0 +1,170 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hudi.io;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.Path;
+import org.apache.hudi.common.bloom.BloomFilter;
+import org.apache.hudi.common.bloom.BloomFilterTypeCode;
+import org.apache.hudi.common.bloom.HoodieDynamicBoundedBloomFilter;
+import org.apache.hudi.common.model.HoodieBaseFile;
+import org.apache.hudi.common.model.HoodieRecordPayload;
+import org.apache.hudi.common.model.HoodieTableType;
+import org.apache.hudi.common.util.HoodieTimer;
+import org.apache.hudi.common.util.Option;
+import org.apache.hudi.common.util.collection.Pair;
+import org.apache.hudi.common.util.hash.FileIndexID;
+import org.apache.hudi.common.util.hash.PartitionIndexID;
+import org.apache.hudi.config.HoodieWriteConfig;
+import org.apache.hudi.exception.HoodieIndexException;
+import org.apache.hudi.table.HoodieTable;
+import org.apache.log4j.LogManager;
+import org.apache.log4j.Logger;
+
+import java.nio.ByteBuffer;
+import java.nio.charset.StandardCharsets;
+import java.util.ArrayList;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Set;
+
+/**
+ * Takes a bunch of keys and returns ones that are present in the file group.
+ */
+public class HoodieKeyMetaIndexLookupHandle<T extends HoodieRecordPayload, I, K, O> extends HoodieReadHandle<T,
+    I, K, O> {
+
+  private static final Logger LOG = LogManager.getLogger(HoodieKeyMetaIndexLookupHandle.class);
+  private final HoodieTableType tableType;
+  private final BloomFilter bloomFilter;
+  private final List<String> candidateRecordKeys;
+  private long totalKeysChecked;
+
+  public HoodieKeyMetaIndexLookupHandle(HoodieWriteConfig config, HoodieTable<T, I, K, O> hoodieTable,
+                                        Pair<String, String> partitionPathFileIdPair, String fileId) {
+    super(config, null, hoodieTable, partitionPathFileIdPair);
+    this.tableType = hoodieTable.getMetaClient().getTableType();
+    this.candidateRecordKeys = new ArrayList<>();
+    this.totalKeysChecked = 0;
+
+    HoodieTimer timer = new HoodieTimer().startTimer();
+    Option<ByteBuffer> bloomFilterByteBuffer =
+        hoodieTable.getMetadataTable().getBloomFilter(new PartitionIndexID(partitionPathFileIdPair.getLeft()),
+            new FileIndexID(fileId));
+    if (!bloomFilterByteBuffer.isPresent()) {
+      throw new HoodieIndexException("BloomFilter missing for " + fileId);
+    }
+
+    // TODO: Go via the factory and the filter type
+    this.bloomFilter =
+        new HoodieDynamicBoundedBloomFilter(StandardCharsets.UTF_8.decode(bloomFilterByteBuffer.get()).toString(),
+            BloomFilterTypeCode.DYNAMIC_V0);
+    LOG.debug(String.format("Read bloom filter from %s,%s, size: %s in %d ms", partitionPathFileIdPair, fileId,
+        bloomFilterByteBuffer.get().array().length, timer.endTimer()));
+  }
+
+  /**
+   * Given a list of row keys and one file, return only row keys existing in that file.
+   */
+  public List<String> checkCandidatesAgainstFile(Configuration configuration, List<String> candidateRecordKeys,
+                                                 Path filePath) throws HoodieIndexException {
+    List<String> foundRecordKeys = new ArrayList<>();
+    try {
+      // Load all rowKeys from the file, to double-confirm
+      if (!candidateRecordKeys.isEmpty()) {
+        HoodieTimer timer = new HoodieTimer().startTimer();
+        Set<String> fileRowKeys = createNewFileReader().filterRowKeys(new HashSet<>(candidateRecordKeys));
+        foundRecordKeys.addAll(fileRowKeys);
+        LOG.debug(String.format("Checked keys against file %s, in %d ms. #candidates (%d) #found (%d)", filePath,
+            timer.endTimer(), candidateRecordKeys.size(), foundRecordKeys.size()));
+        LOG.debug("Keys matching for file " + filePath + " => " + foundRecordKeys);
+      }
+    } catch (Exception e) {
+      throw new HoodieIndexException("Error checking candidate keys against file.", e);
+    }
+    return foundRecordKeys;
+  }
+
+  /**
+   * Adds the key for look up.
+   */
+  public void addKey(String recordKey) {
+    // check record key against bloom filter of current file & add to possible keys if needed
+    if (bloomFilter.mightContain(recordKey)) {
+      if (LOG.isDebugEnabled()) {
+        LOG.debug("Record key " + recordKey + " matches bloom filter in  " + partitionPathFilePair);
+      }
+      candidateRecordKeys.add(recordKey);
+    }
+    totalKeysChecked++;
+  }
+
+  /**
+   * Of all the keys, that were added, return a list of keys that were actually found in the file group.
+   */
+  public MetaBloomIndexKeyLookupResult getLookupResult() {
+    if (LOG.isDebugEnabled()) {
+      LOG.debug("#The candidate row keys for " + partitionPathFilePair + " => " + candidateRecordKeys);
+    }
+
+    HoodieBaseFile dataFile = getLatestDataFile();
+    List<String> matchingKeys =
+        checkCandidatesAgainstFile(hoodieTable.getHadoopConf(), candidateRecordKeys, new Path(dataFile.getPath()));
+    LOG.debug(String.format("Total records (%d), bloom filter candidates (%d)/fp(%d), actual matches (%d)",
+        totalKeysChecked, candidateRecordKeys.size(), candidateRecordKeys.size() - matchingKeys.size(),
+        matchingKeys.size()));
+    return new MetaBloomIndexKeyLookupResult(partitionPathFilePair.getRight(), partitionPathFilePair.getLeft(),
+        dataFile.getCommitTime(), matchingKeys);
+  }
+
+  /**
+   * Encapsulates the result from a key lookup.
+   */
+  public static class MetaBloomIndexKeyLookupResult {

Review comment:
       How is this different from HoodiekeyLookupHandle.KeyLookupResult. if there is no diff, can we re-use code.

##########
File path: hudi-client/hudi-client-common/src/main/java/org/apache/hudi/io/HoodieKeyMetaIndexBatchLookupHandle.java
##########
@@ -0,0 +1,176 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hudi.io;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.Path;
+import org.apache.hudi.common.bloom.BloomFilter;
+import org.apache.hudi.common.bloom.BloomFilterTypeCode;
+import org.apache.hudi.common.bloom.HoodieDynamicBoundedBloomFilter;
+import org.apache.hudi.common.model.HoodieBaseFile;
+import org.apache.hudi.common.model.HoodieRecordPayload;
+import org.apache.hudi.common.model.HoodieTableType;
+import org.apache.hudi.common.util.HoodieTimer;
+import org.apache.hudi.common.util.Option;
+import org.apache.hudi.common.util.collection.Pair;
+import org.apache.hudi.common.util.hash.FileIndexID;
+import org.apache.hudi.common.util.hash.PartitionIndexID;
+import org.apache.hudi.config.HoodieWriteConfig;
+import org.apache.hudi.exception.HoodieIndexException;
+import org.apache.hudi.table.HoodieTable;
+import org.apache.log4j.LogManager;
+import org.apache.log4j.Logger;
+
+import java.nio.ByteBuffer;
+import java.nio.charset.StandardCharsets;
+import java.util.ArrayList;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Set;
+
+/**
+ * Takes a bunch of keys and returns ones that are present in the file group.
+ */
+public class HoodieKeyMetaIndexBatchLookupHandle<T extends HoodieRecordPayload, I, K, O> extends HoodieReadHandle<T,
+    I, K, O> {
+
+  private static final Logger LOG = LogManager.getLogger(HoodieKeyMetaIndexBatchLookupHandle.class);
+
+  private final HoodieTableType tableType;
+
+  private final BloomFilter bloomFilter;
+
+  private final List<String> candidateRecordKeys;
+
+  private long totalKeysChecked;
+
+  public HoodieKeyMetaIndexBatchLookupHandle(HoodieWriteConfig config, HoodieTable<T, I, K, O> hoodieTable,
+                                             Pair<String, String> partitionPathFileIdPair, String fileName) {
+    super(config, null, hoodieTable, partitionPathFileIdPair);
+    this.tableType = hoodieTable.getMetaClient().getTableType();
+
+    this.candidateRecordKeys = new ArrayList<>();
+    this.totalKeysChecked = 0;
+    HoodieTimer timer = new HoodieTimer().startTimer();
+
+    Option<ByteBuffer> bloomFilterByteBuffer =
+        hoodieTable.getMetadataTable().getBloomFilter(new PartitionIndexID(partitionPathFileIdPair.getLeft()),
+            new FileIndexID(fileName));
+    if (!bloomFilterByteBuffer.isPresent()) {
+      throw new HoodieIndexException("BloomFilter missing for " + fileName);
+    }
+
+    // TODO: Go via the factory and the filter type
+    this.bloomFilter =
+        new HoodieDynamicBoundedBloomFilter(StandardCharsets.UTF_8.decode(bloomFilterByteBuffer.get()).toString(),
+            BloomFilterTypeCode.DYNAMIC_V0);
+
+    LOG.debug(String.format("Read bloom filter from %s,%s, size: %s in %d ms", partitionPathFileIdPair, fileName,
+        bloomFilterByteBuffer.get().array().length, timer.endTimer()));
+  }
+
+  /**
+   * Given a list of row keys and one file, return only row keys existing in that file.
+   */
+  public List<String> checkCandidatesAgainstFile(Configuration configuration, List<String> candidateRecordKeys,
+                                                 Path filePath) throws HoodieIndexException {
+    List<String> foundRecordKeys = new ArrayList<>();
+    try {
+      // Load all rowKeys from the file, to double-confirm
+      if (!candidateRecordKeys.isEmpty()) {
+        HoodieTimer timer = new HoodieTimer().startTimer();
+        Set<String> fileRowKeys = createNewFileReader().filterRowKeys(new HashSet<>(candidateRecordKeys));
+        foundRecordKeys.addAll(fileRowKeys);
+        LOG.debug(String.format("Checked keys against file %s, in %d ms. #candidates (%d) #found (%d)", filePath,
+            timer.endTimer(), candidateRecordKeys.size(), foundRecordKeys.size()));
+        LOG.debug("Keys matching for file " + filePath + " => " + foundRecordKeys);
+      }
+    } catch (Exception e) {
+      throw new HoodieIndexException("Error checking candidate keys against file.", e);
+    }
+    return foundRecordKeys;
+  }
+
+  /**
+   * Adds the key for look up.
+   */
+  public void addKey(String recordKey) {
+    // check record key against bloom filter of current file & add to possible keys if needed
+    if (bloomFilter.mightContain(recordKey)) {
+      if (LOG.isDebugEnabled()) {
+        LOG.debug("Record key " + recordKey + " matches bloom filter in  " + partitionPathFilePair);
+      }
+      candidateRecordKeys.add(recordKey);
+    }
+    totalKeysChecked++;
+  }
+
+  /**
+   * Of all the keys, that were added, return a list of keys that were actually found in the file group.
+   */
+  public MetaBloomIndexGroupedKeyLookupResult getLookupResult() {
+    if (LOG.isDebugEnabled()) {
+      LOG.debug("#The candidate row keys for " + partitionPathFilePair + " => " + candidateRecordKeys);
+    }
+
+    HoodieBaseFile dataFile = getLatestDataFile();
+    List<String> matchingKeys =
+        checkCandidatesAgainstFile(hoodieTable.getHadoopConf(), candidateRecordKeys, new Path(dataFile.getPath()));
+    LOG.debug(
+        String.format("Total records (%d), bloom filter candidates (%d)/fp(%d), actual matches (%d)", totalKeysChecked,
+            candidateRecordKeys.size(), candidateRecordKeys.size() - matchingKeys.size(), matchingKeys.size()));
+    return new MetaBloomIndexGroupedKeyLookupResult(partitionPathFilePair.getRight(), partitionPathFilePair.getLeft(),
+        dataFile.getCommitTime(), matchingKeys);
+  }
+
+  /**
+   * Encapsulates the result from a key lookup.
+   */
+  public static class MetaBloomIndexGroupedKeyLookupResult {

Review comment:
       may I know whats the diff between this class and MetaBloomIndexKeyLookupResult? 

##########
File path: hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/index/bloom/HoodieBloomMetaIndexBatchCheckFunction.java
##########
@@ -0,0 +1,155 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hudi.index.bloom;
+
+import org.apache.hadoop.fs.Path;
+import org.apache.hudi.common.bloom.BloomFilterTypeCode;
+import org.apache.hudi.common.bloom.HoodieDynamicBoundedBloomFilter;
+import org.apache.hudi.common.model.HoodieBaseFile;
+import org.apache.hudi.common.model.HoodieKey;
+import org.apache.hudi.common.util.HoodieTimer;
+import org.apache.hudi.common.util.Option;
+import org.apache.hudi.common.util.ValidationUtils;
+import org.apache.hudi.common.util.collection.Pair;
+import org.apache.hudi.common.util.hash.FileIndexID;
+import org.apache.hudi.common.util.hash.PartitionIndexID;
+import org.apache.hudi.config.HoodieWriteConfig;
+import org.apache.hudi.exception.HoodieIndexException;
+import org.apache.hudi.io.HoodieKeyMetaIndexBatchLookupHandle.MetaBloomIndexGroupedKeyLookupResult;
+import org.apache.hudi.io.storage.HoodieFileReader;
+import org.apache.hudi.io.storage.HoodieFileReaderFactory;
+import org.apache.hudi.table.HoodieTable;
+import org.apache.log4j.LogManager;
+import org.apache.log4j.Logger;
+import org.apache.spark.api.java.function.Function2;
+import scala.Tuple2;
+
+import java.nio.ByteBuffer;
+import java.nio.charset.StandardCharsets;
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.stream.Collectors;
+
+/**
+ * Function performing actual checking of RDD partition containing (fileId, hoodieKeys) against the actual files.
+ */
+public class HoodieBloomMetaIndexBatchCheckFunction implements
+    Function2<Integer, Iterator<Tuple2<String, HoodieKey>>, Iterator<List<MetaBloomIndexGroupedKeyLookupResult>>> {
+
+  private static final Logger LOG = LogManager.getLogger(HoodieBloomMetaIndexBatchCheckFunction.class);
+  private final HoodieTable hoodieTable;
+  private final HoodieWriteConfig config;
+
+  public HoodieBloomMetaIndexBatchCheckFunction(HoodieTable hoodieTable, HoodieWriteConfig config) {
+    this.hoodieTable = hoodieTable;
+    this.config = config;
+  }
+
+  @Override
+  public Iterator<List<MetaBloomIndexGroupedKeyLookupResult>> call(Integer integer, Iterator<Tuple2<String, HoodieKey>> tuple2Iterator) throws Exception {
+    List<List<MetaBloomIndexGroupedKeyLookupResult>> resultList = new ArrayList<>();
+    Map<Pair<String, String>, List<HoodieKey>> fileToKeysMap = new HashMap<>();
+
+    while (tuple2Iterator.hasNext()) {
+      Tuple2<String, HoodieKey> entry = tuple2Iterator.next();
+      fileToKeysMap.computeIfAbsent(Pair.of(entry._2.getPartitionPath(), entry._1), k -> new ArrayList<>()).add(entry._2);
+    }
+
+    List<Pair<PartitionIndexID, FileIndexID>> partitionIDFileIDList =
+        fileToKeysMap.keySet().stream().map(partitionFileIdPair -> {
+          return Pair.of(new PartitionIndexID(partitionFileIdPair.getLeft()),
+              new FileIndexID(partitionFileIdPair.getRight()));
+        }).collect(Collectors.toList());
+
+    Map<String, ByteBuffer> fileIDToBloomFilterByteBufferMap =
+        hoodieTable.getMetadataTable().getBloomFilters(partitionIDFileIDList);
+
+    fileToKeysMap.forEach((partitionPathFileIdPair, hoodieKeyList) -> {
+      final String partitionPath = partitionPathFileIdPair.getLeft();
+      final String fileId = partitionPathFileIdPair.getRight();
+      ValidationUtils.checkState(!fileId.isEmpty());
+
+      final String partitionIDHash = new PartitionIndexID(partitionPath).asBase64EncodedString();
+      final String fileIDHash = new FileIndexID(fileId).asBase64EncodedString();
+      final String bloomKey = partitionIDHash.concat(fileIDHash);
+      if (!fileIDToBloomFilterByteBufferMap.containsKey(bloomKey)) {
+        throw new HoodieIndexException("Failed to get the bloom filter for " + partitionPathFileIdPair);
+      }
+      final ByteBuffer fileBloomFilterByteBuffer = fileIDToBloomFilterByteBufferMap.get(bloomKey);
+
+      HoodieDynamicBoundedBloomFilter fileBloomFilter =

Review comment:
       we can't assume it dynamic. We have to intercept the BloomFilterTypeCode and then go about deducing it.

##########
File path: hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/metadata/FlinkHoodieBackedTableMetadataWriter.java
##########
@@ -101,10 +102,20 @@ protected void initRegistry() {
   }
 
   @Override
-  protected void commit(HoodieData<HoodieRecord> hoodieDataRecords, String partitionName, String instantTime, boolean canTriggerTableService) {
+  protected void commit(String instantTime, Map<MetadataPartitionType, HoodieData<HoodieRecord>> partitionRecordsMap,
+                        boolean canTriggerTableService) {
+    for (Map.Entry<MetadataPartitionType, HoodieData<HoodieRecord>> partitionTypeHoodieDataEntry : partitionRecordsMap.entrySet()) {
+      commit(partitionTypeHoodieDataEntry.getValue(), partitionTypeHoodieDataEntry.getKey(),

Review comment:
       not sure I understand the flow here. but are we making multiple commits here (one per partition)? or am I missing something here. I was expecting we will make one commit altogether. 

##########
File path: hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/metadata/SparkHoodieBackedTableMetadataWriter.java
##########
@@ -165,17 +167,40 @@ protected void commit(HoodieData<HoodieRecord> hoodieDataRecords, String partiti
 
   /**
    * Tag each record with the location in the given partition.
-   *
+   * <p>
    * The record is tagged with respective file slice's location based on its record key.
    */
-  private JavaRDD<HoodieRecord> prepRecords(JavaRDD<HoodieRecord> recordsRDD, String partitionName, int numFileGroups) {
-    List<FileSlice> fileSlices = HoodieTableMetadataUtil.getPartitionLatestFileSlices(metadataMetaClient, partitionName);
-    ValidationUtils.checkArgument(fileSlices.size() == numFileGroups, String.format("Invalid number of file groups: found=%d, required=%d", fileSlices.size(), numFileGroups));
-
-    return recordsRDD.map(r -> {
-      FileSlice slice = fileSlices.get(HoodieTableMetadataUtil.mapRecordKeyToFileGroupIndex(r.getRecordKey(), numFileGroups));
-      r.setCurrentLocation(new HoodieRecordLocation(slice.getBaseInstantTime(), slice.getFileId()));
-      return r;
-    });
+  private JavaRDD<HoodieRecord> prepRecords(Map<MetadataPartitionType, HoodieData<HoodieRecord>> partitionRecordsMap) {
+    // The result set
+    JavaRDD<HoodieRecord> rddAllPartitionRecords = null;
+
+    for (Map.Entry<MetadataPartitionType, HoodieData<HoodieRecord>> entry : partitionRecordsMap.entrySet()) {

Review comment:
       looks like you fixed the proper way in spark, but did not fix flink. I was very surprised when I saw we are making multiple commits(one per partition) in flink for a single batch.

##########
File path: hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/index/bloom/HoodieBloomMetaIndexBatchCheckFunction.java
##########
@@ -0,0 +1,155 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hudi.index.bloom;
+
+import org.apache.hadoop.fs.Path;
+import org.apache.hudi.common.bloom.BloomFilterTypeCode;
+import org.apache.hudi.common.bloom.HoodieDynamicBoundedBloomFilter;
+import org.apache.hudi.common.model.HoodieBaseFile;
+import org.apache.hudi.common.model.HoodieKey;
+import org.apache.hudi.common.util.HoodieTimer;
+import org.apache.hudi.common.util.Option;
+import org.apache.hudi.common.util.ValidationUtils;
+import org.apache.hudi.common.util.collection.Pair;
+import org.apache.hudi.common.util.hash.FileIndexID;
+import org.apache.hudi.common.util.hash.PartitionIndexID;
+import org.apache.hudi.config.HoodieWriteConfig;
+import org.apache.hudi.exception.HoodieIndexException;
+import org.apache.hudi.io.HoodieKeyMetaIndexBatchLookupHandle.MetaBloomIndexGroupedKeyLookupResult;
+import org.apache.hudi.io.storage.HoodieFileReader;
+import org.apache.hudi.io.storage.HoodieFileReaderFactory;
+import org.apache.hudi.table.HoodieTable;
+import org.apache.log4j.LogManager;
+import org.apache.log4j.Logger;
+import org.apache.spark.api.java.function.Function2;
+import scala.Tuple2;
+
+import java.nio.ByteBuffer;
+import java.nio.charset.StandardCharsets;
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.stream.Collectors;
+
+/**
+ * Function performing actual checking of RDD partition containing (fileId, hoodieKeys) against the actual files.
+ */
+public class HoodieBloomMetaIndexBatchCheckFunction implements
+    Function2<Integer, Iterator<Tuple2<String, HoodieKey>>, Iterator<List<MetaBloomIndexGroupedKeyLookupResult>>> {
+
+  private static final Logger LOG = LogManager.getLogger(HoodieBloomMetaIndexBatchCheckFunction.class);
+  private final HoodieTable hoodieTable;
+  private final HoodieWriteConfig config;
+
+  public HoodieBloomMetaIndexBatchCheckFunction(HoodieTable hoodieTable, HoodieWriteConfig config) {
+    this.hoodieTable = hoodieTable;
+    this.config = config;
+  }
+
+  @Override
+  public Iterator<List<MetaBloomIndexGroupedKeyLookupResult>> call(Integer integer, Iterator<Tuple2<String, HoodieKey>> tuple2Iterator) throws Exception {
+    List<List<MetaBloomIndexGroupedKeyLookupResult>> resultList = new ArrayList<>();
+    Map<Pair<String, String>, List<HoodieKey>> fileToKeysMap = new HashMap<>();
+
+    while (tuple2Iterator.hasNext()) {
+      Tuple2<String, HoodieKey> entry = tuple2Iterator.next();
+      fileToKeysMap.computeIfAbsent(Pair.of(entry._2.getPartitionPath(), entry._1), k -> new ArrayList<>()).add(entry._2);
+    }
+
+    List<Pair<PartitionIndexID, FileIndexID>> partitionIDFileIDList =
+        fileToKeysMap.keySet().stream().map(partitionFileIdPair -> {
+          return Pair.of(new PartitionIndexID(partitionFileIdPair.getLeft()),
+              new FileIndexID(partitionFileIdPair.getRight()));
+        }).collect(Collectors.toList());
+
+    Map<String, ByteBuffer> fileIDToBloomFilterByteBufferMap =
+        hoodieTable.getMetadataTable().getBloomFilters(partitionIDFileIDList);
+
+    fileToKeysMap.forEach((partitionPathFileIdPair, hoodieKeyList) -> {
+      final String partitionPath = partitionPathFileIdPair.getLeft();
+      final String fileId = partitionPathFileIdPair.getRight();
+      ValidationUtils.checkState(!fileId.isEmpty());
+
+      final String partitionIDHash = new PartitionIndexID(partitionPath).asBase64EncodedString();
+      final String fileIDHash = new FileIndexID(fileId).asBase64EncodedString();
+      final String bloomKey = partitionIDHash.concat(fileIDHash);
+      if (!fileIDToBloomFilterByteBufferMap.containsKey(bloomKey)) {
+        throw new HoodieIndexException("Failed to get the bloom filter for " + partitionPathFileIdPair);
+      }
+      final ByteBuffer fileBloomFilterByteBuffer = fileIDToBloomFilterByteBufferMap.get(bloomKey);
+
+      HoodieDynamicBoundedBloomFilter fileBloomFilter =
+          new HoodieDynamicBoundedBloomFilter(StandardCharsets.UTF_8.decode(fileBloomFilterByteBuffer).toString(),
+              BloomFilterTypeCode.DYNAMIC_V0);
+
+      List<String> candidateRecordKeys = new ArrayList<>();
+      hoodieKeyList.forEach(hoodieKey -> {
+        if (fileBloomFilter.mightContain(hoodieKey.getRecordKey())) {
+          candidateRecordKeys.add(hoodieKey.getRecordKey());
+        }
+      });
+
+      Option<HoodieBaseFile> dataFile = hoodieTable.getBaseFileOnlyView().getLatestBaseFile(partitionPath, fileId);
+      if (!dataFile.isPresent()) {
+        throw new HoodieIndexException("Failed to find the base file for partition: " + partitionPath
+            + ", fileId: " + fileId);
+      }
+
+      List<String> matchingKeys =
+          checkCandidatesAgainstFile(candidateRecordKeys, new Path(dataFile.get().getPath()));
+      LOG.debug(
+          String.format("Total records (%d), bloom filter candidates (%d)/fp(%d), actual matches (%d)",
+              hoodieKeyList.size(), candidateRecordKeys.size(),
+              candidateRecordKeys.size() - matchingKeys.size(), matchingKeys.size()));
+
+      ArrayList<MetaBloomIndexGroupedKeyLookupResult> subList = new ArrayList<>();
+      subList.add(new MetaBloomIndexGroupedKeyLookupResult(fileId, partitionPath, dataFile.get().getCommitTime(),
+          matchingKeys));
+      resultList.add(subList);
+    });
+
+    return resultList.iterator();
+  }
+
+  public List<String> checkCandidatesAgainstFile(List<String> candidateRecordKeys, Path latestDataFilePath) throws HoodieIndexException {

Review comment:
       Don't we have a similar method already in existing bloom filter code. unless or otherwise, its drastically changed, can we try to re-use code. 

##########
File path: hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/index/bloom/HoodieBloomMetaIndexColStatFunction.java
##########
@@ -0,0 +1,93 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hudi.index.bloom;
+
+import org.apache.hudi.avro.model.HoodieColumnStats;
+import org.apache.hudi.common.fs.FSUtils;
+import org.apache.hudi.common.model.HoodieKey;
+import org.apache.hudi.common.util.ValidationUtils;
+import org.apache.hudi.config.HoodieWriteConfig;
+import org.apache.hudi.table.HoodieTable;
+import org.apache.log4j.LogManager;
+import org.apache.log4j.Logger;
+import org.apache.spark.api.java.function.FlatMapFunction;
+import scala.Tuple2;
+
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Map;
+import java.util.Objects;
+import java.util.stream.Collectors;
+
+/**
+ * Function performing actual checking of RDD partition containing (fileId, hoodieKeys) against the actual files.
+ */
+public class HoodieBloomMetaIndexColStatFunction
+    implements FlatMapFunction<Iterator<Tuple2<Tuple2<String, String>, HoodieKey>>, Tuple2<Tuple2<String, String>,
+    HoodieKey>> {
+
+  private static final Logger LOG = LogManager.getLogger(HoodieBloomMetaIndexColStatFunction.class);
+  private final HoodieTable hoodieTable;
+  private final HoodieWriteConfig config;
+
+  public HoodieBloomMetaIndexColStatFunction(HoodieTable hoodieTable, HoodieWriteConfig config) {

Review comment:
       is this used anywhere?

##########
File path: hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/metadata/SparkHoodieBackedTableMetadataWriter.java
##########
@@ -165,17 +167,40 @@ protected void commit(HoodieData<HoodieRecord> hoodieDataRecords, String partiti
 
   /**
    * Tag each record with the location in the given partition.
-   *
+   * <p>
    * The record is tagged with respective file slice's location based on its record key.
    */
-  private JavaRDD<HoodieRecord> prepRecords(JavaRDD<HoodieRecord> recordsRDD, String partitionName, int numFileGroups) {
-    List<FileSlice> fileSlices = HoodieTableMetadataUtil.getPartitionLatestFileSlices(metadataMetaClient, partitionName);
-    ValidationUtils.checkArgument(fileSlices.size() == numFileGroups, String.format("Invalid number of file groups: found=%d, required=%d", fileSlices.size(), numFileGroups));
-
-    return recordsRDD.map(r -> {
-      FileSlice slice = fileSlices.get(HoodieTableMetadataUtil.mapRecordKeyToFileGroupIndex(r.getRecordKey(), numFileGroups));
-      r.setCurrentLocation(new HoodieRecordLocation(slice.getBaseInstantTime(), slice.getFileId()));
-      return r;
-    });
+  private JavaRDD<HoodieRecord> prepRecords(Map<MetadataPartitionType, HoodieData<HoodieRecord>> partitionRecordsMap) {
+    // The result set
+    JavaRDD<HoodieRecord> rddAllPartitionRecords = null;
+
+    for (Map.Entry<MetadataPartitionType, HoodieData<HoodieRecord>> entry : partitionRecordsMap.entrySet()) {
+      final String partitionName = entry.getKey().partitionPath();
+      final int fileGroupCount = entry.getKey().getFileGroupCount();
+      HoodieData<HoodieRecord> records = entry.getValue();
+      JavaRDD<HoodieRecord> recordsRDD = (JavaRDD<HoodieRecord>) records.get();
+
+      List<FileSlice> fileSlices =
+          HoodieTableMetadataUtil.getPartitionLatestFileSlices(metadataMetaClient, partitionName);
+      ValidationUtils.checkArgument(fileSlices.size() == fileGroupCount,
+          String.format("Invalid number of file groups: found=%d, required=%d", fileSlices.size(), fileGroupCount));
+
+      JavaSparkContext jsc = ((HoodieSparkEngineContext) engineContext).getJavaSparkContext();
+      JavaRDD<HoodieRecord> rddSinglePartitionRecords = recordsRDD.map(r -> {
+        FileSlice slice = fileSlices.get(HoodieTableMetadataUtil.mapRecordKeyToFileGroupIndex(r.getRecordKey(),

Review comment:
       looks like you are accessing `List<FileSlice> fileSlices` from within the executor and so the datastructure  will be broadcasted. Hope you are aware of it. I don't think its a problem. just wanted to remind you just incase you. 

##########
File path: hudi-common/src/main/java/org/apache/hudi/metadata/BaseTableMetadata.java
##########
@@ -146,6 +155,97 @@ protected BaseTableMetadata(HoodieEngineContext engineContext, HoodieMetadataCon
         .getAllFilesInPartitions(partitions);
   }
 
+  @Override
+  public Option<ByteBuffer> getBloomFilter(final PartitionIndexID partitionIndexID, final FileIndexID fileIndexID)
+      throws HoodieMetadataException {
+    if (!isMetaIndexBloomFilterEnabled) {
+      LOG.error("Meta bloom filter index is disabled!");
+      return Option.empty();
+    }
+
+    HoodieTimer timer = new HoodieTimer().startTimer();
+    final String bloomIndexKey = partitionIndexID.asBase64EncodedString().concat(fileIndexID.asBase64EncodedString());
+    Option<HoodieRecord<HoodieMetadataPayload>> hoodieRecord = getRecordByKey(bloomIndexKey,
+        MetadataPartitionType.BLOOM_FILTERS.partitionPath());
+    metrics.ifPresent(m -> m.updateMetrics(HoodieMetadataMetrics.LOOKUP_BLOOM_FILTERS_METADATA_STR, timer.endTimer()));
+
+    if (!hoodieRecord.isPresent()) {
+      LOG.error("Meta bloom filter index: lookup failed for partition: " + partitionIndexID.getName() + ", file: "
+          + fileIndexID.getName());
+      return Option.empty();
+    }
+
+    final Option<HoodieMetadataBloomFilter> fileBloomFilter = hoodieRecord.get().getData().getBloomFilterMetadata();
+    if (!fileBloomFilter.isPresent()) {
+      LOG.error("Meta bloom filter index: bloom filter missing for partition: " + partitionIndexID.getName()
+          + ", file: " + fileIndexID.getName());
+      return Option.empty();
+    }
+
+    return Option.of(fileBloomFilter.get().getBloomFilter());
+  }
+
+  @Override
+  public Map<String, ByteBuffer> getBloomFilters(final List<Pair<PartitionIndexID, FileIndexID>> partitionFileIndexIDList)
+      throws HoodieMetadataException {
+    if (!isMetaIndexBloomFilterEnabled) {
+      LOG.error("Meta bloom filter index is disabled!");
+      return Collections.emptyMap();
+    }
+
+    HoodieTimer timer = new HoodieTimer().startTimer();
+    Set<String> partitionIDFileIDSortedStrings = new TreeSet<>();
+    partitionFileIndexIDList.forEach(partitionIDFileIDPair -> {
+          final String bloomKey = partitionIDFileIDPair.getLeft().asBase64EncodedString()
+              .concat(partitionIDFileIDPair.getRight().asBase64EncodedString());
+          partitionIDFileIDSortedStrings.add(bloomKey);
+        }
+    );
+    List<String> partitionIDFileIDStrings = new ArrayList<>(partitionIDFileIDSortedStrings);
+
+    List<Pair<String, Option<HoodieRecord<HoodieMetadataPayload>>>> hoodieRecordList =
+        getRecordsByKeys(partitionIDFileIDStrings, MetadataPartitionType.BLOOM_FILTERS.partitionPath());
+    metrics.ifPresent(m -> m.updateMetrics(HoodieMetadataMetrics.LOOKUP_BLOOM_FILTERS_METADATA_STR, timer.endTimer()));
+
+    Map<String, ByteBuffer> fileToBloomFilterMap = new HashMap<>();
+    for (final Pair<String, Option<HoodieRecord<HoodieMetadataPayload>>> entry : hoodieRecordList) {
+      if (entry.getRight().isPresent()) {
+        final Option<HoodieMetadataBloomFilter> optionalBloomFilterMetadata =
+            entry.getRight().get().getData().getBloomFilterMetadata();
+        if (optionalBloomFilterMetadata.isPresent()) {
+          fileToBloomFilterMap.put(entry.getLeft(), optionalBloomFilterMetadata.get().getBloomFilter());
+        }
+      }
+    }
+    return fileToBloomFilterMap;
+  }
+
+  @Override
+  public Map<String, HoodieColumnStats> getColumnStats(List<String> keySet) throws HoodieMetadataException {
+    if (!isMetaIndexColumnStatsEnabled) {
+      LOG.error("Meta column range index is disabled!");
+      return Collections.emptyMap();
+    }
+
+    HoodieTimer timer = new HoodieTimer().startTimer();
+    List<Pair<String, Option<HoodieRecord<HoodieMetadataPayload>>>> hoodieRecordList =
+        getRecordsByKeys(keySet, MetadataPartitionType.COLUMN_STATS.partitionPath());
+    metrics.ifPresent(m -> m.updateMetrics(HoodieMetadataMetrics.LOOKUP_COLUMN_STATS_METADATA_STR, timer.endTimer()));
+
+    Map<String, HoodieColumnStats> columnToRangeMap = new HashMap<>();
+    for (final Pair<String, Option<HoodieRecord<HoodieMetadataPayload>>> entry : hoodieRecordList) {
+      if (entry.getRight().isPresent()) {
+        final Option<HoodieColumnStats> optionalColumnStatPayload =
+            entry.getRight().get().getData().getColumnStatMetadata();
+        if (optionalColumnStatPayload.isPresent()) {
+          ValidationUtils.checkState(!columnToRangeMap.containsKey(entry.getLeft()));
+          columnToRangeMap.put(entry.getLeft(), optionalColumnStatPayload.get());
+        }
+      }

Review comment:
       similar question. whats the case where cols stats may not be present. and why we are completely ignoring it. atleast error logging would be good.

##########
File path: hudi-common/src/main/avro/HoodieMetadata.avsc
##########
@@ -30,27 +30,118 @@
             "doc": "Type of the metadata record",
             "type": "int"
         },
-        {   "name": "filesystemMetadata",
+        {
             "doc": "Contains information about partitions and files within the dataset",
-            "type": ["null", {
-               "type": "map",
-               "values": {
+            "name": "filesystemMetadata",
+            "type": [
+                "null",
+                {
+                    "type": "map",
+                    "values": {
+                        "type": "record",
+                        "name": "HoodieMetadataFileInfo",
+                        "fields": [
+                            {
+                                "name": "size",
+                                "type": "long",
+                                "doc": "Size of the file"
+                            },
+                            {
+                                "name": "isDeleted",
+                                "type": "boolean",
+                                "doc": "True if this file has been deleted"
+                            }
+                        ]
+                    }
+                }
+            ]
+        },
+        {
+            "doc": "Metadata Index of bloom filters for all data files in the user table",
+            "name": "BloomFilterMetadata",
+            "type": [
+                "null",
+                {
+                    "doc": "Data file bloom filter details",
+                    "name": "HoodieMetadataBloomFilter",
                     "type": "record",
-                    "name": "HoodieMetadataFileInfo",
                     "fields": [
                         {
-                            "name": "size",
-                            "type": "long",
-                            "doc": "Size of the file"
+                            "doc": "Bloom filter type code",
+                            "name": "type",
+                            "type": "string"
+                        },
+                        {
+                            "doc": "Instant timestamp when this metadata was created/updated",
+                            "name": "timestamp",
+                            "type": "string"
+                        },
+                        {
+                            "doc": "Bloom filter binary byte array",
+                            "name": "bloomFilter",
+                            "type": "bytes"
                         },
                         {
+                            "doc": "Bloom filter entry valid/deleted flag",
                             "name": "isDeleted",
-                            "type": "boolean",
-                            "doc": "True if this file has been deleted"
+                            "type": "boolean"
+                        },
+                        {
+                            "doc": "Reserved bytes for future use",
+                            "name": "reserved",
+                            "type": "bytes"
+                        }
+                    ]
+                }
+            ]
+        },
+        {
+            "doc": "Metadata Index of column ranges for all data files in the user table",
+            "name": "ColumnStatsMetadata",
+            "type": [
+                "null",
+                {
+                    "doc": "Data file column ranges details",
+                    "name": "HoodieColumnStats",
+                    "type": "record",
+                    "fields": [
+                        {
+                            "doc": "Minimum value in the range. Based on user data table schema, we can convert this to appropriate type",
+                            "name": "minValue",
+                            "type": [
+                                "null",
+                                "string"
+                            ]
+                        },
+                        {
+                            "doc": "Maximum value in the range. Based on user data table schema, we can convert it to appropriate type",
+                            "name": "maxValue",
+                            "type": [
+                                "null",
+                                "string"
+                            ]
+                        },
+                        {
+                            "doc": "Maximum value in the range. Based on user data table schema, we can convert it to appropriate type",

Review comment:
       I remember having 6 entires in our RFC to be added to our col stats partition. do you plan to add as a follow up? 

##########
File path: hudi-common/src/main/java/org/apache/hudi/common/config/HoodieMetadataConfig.java
##########
@@ -124,6 +124,37 @@
       .sinceVersion("0.10.0")
       .withDocumentation("Enable full scanning of log files while reading log records. If disabled, hudi does look up of only interested entries.");
 
+  public static final ConfigProperty<Boolean> ENABLE_META_INDEX_BLOOM_FILTER = ConfigProperty
+      .key(METADATA_PREFIX + ".index.bloomfilter.enable")
+      .defaultValue(false)
+      .sinceVersion("0.11.0")
+      .withDocumentation("Enable indexing user data files bloom filters under metadata table. When enabled, "
+          + "a new partition under metadata table will be created to store the bloom filter index and will be "
+          + "used during the index lookups.");
+
+  public static final ConfigProperty<Boolean> ENABLE_META_INDEX_COLUMN_STATS = ConfigProperty
+      .key(METADATA_PREFIX + ".index.column.stats.enable")
+      .defaultValue(false)
+      .sinceVersion("0.11.0")
+      .withDocumentation("Enable indexing user data files column ranges under metadata table key lookups. When "
+          + "enabled, a new partition under metadata table will be created to store the column ranges and will "
+          + "used for pruning files during the index lookups.");
+
+  public static final ConfigProperty<Boolean> META_INDEX_COLUMN_STATS_FOR_ALL_COLUMNS = ConfigProperty
+      .key(METADATA_PREFIX + ".index.column.stats.all_columns")
+      .defaultValue(false)
+      .sinceVersion("0.11.0")
+      .withDocumentation("Enable indexing user data files column ranges under metadata table key lookups. When "
+          + "enabled, a new partition under metadata table will be created to store the column ranges and will "
+          + "used for pruning files during the index lookups.");
+
+  public static final ConfigProperty<Boolean> ENABLE_META_INDEX_BLOOM_FILTER_BATCH_LOAD_MODE = ConfigProperty
+      .key(METADATA_PREFIX + ".index.bloomfilter.batchload.enable")

Review comment:
       similarly "." between batch and load.

##########
File path: hudi-common/src/main/java/org/apache/hudi/common/config/HoodieMetadataConfig.java
##########
@@ -124,6 +124,37 @@
       .sinceVersion("0.10.0")
       .withDocumentation("Enable full scanning of log files while reading log records. If disabled, hudi does look up of only interested entries.");
 
+  public static final ConfigProperty<Boolean> ENABLE_META_INDEX_BLOOM_FILTER = ConfigProperty
+      .key(METADATA_PREFIX + ".index.bloomfilter.enable")
+      .defaultValue(false)
+      .sinceVersion("0.11.0")
+      .withDocumentation("Enable indexing user data files bloom filters under metadata table. When enabled, "
+          + "a new partition under metadata table will be created to store the bloom filter index and will be "
+          + "used during the index lookups.");
+
+  public static final ConfigProperty<Boolean> ENABLE_META_INDEX_COLUMN_STATS = ConfigProperty
+      .key(METADATA_PREFIX + ".index.column.stats.enable")
+      .defaultValue(false)
+      .sinceVersion("0.11.0")
+      .withDocumentation("Enable indexing user data files column ranges under metadata table key lookups. When "
+          + "enabled, a new partition under metadata table will be created to store the column ranges and will "

Review comment:
       similar comment

##########
File path: hudi-common/src/main/java/org/apache/hudi/common/config/HoodieMetadataConfig.java
##########
@@ -124,6 +124,37 @@
       .sinceVersion("0.10.0")
       .withDocumentation("Enable full scanning of log files while reading log records. If disabled, hudi does look up of only interested entries.");
 
+  public static final ConfigProperty<Boolean> ENABLE_META_INDEX_BLOOM_FILTER = ConfigProperty
+      .key(METADATA_PREFIX + ".index.bloomfilter.enable")

Review comment:
       can we have "." in between bloom and filter? 

##########
File path: hudi-common/src/main/java/org/apache/hudi/common/config/HoodieMetadataConfig.java
##########
@@ -124,6 +124,37 @@
       .sinceVersion("0.10.0")
       .withDocumentation("Enable full scanning of log files while reading log records. If disabled, hudi does look up of only interested entries.");
 
+  public static final ConfigProperty<Boolean> ENABLE_META_INDEX_BLOOM_FILTER = ConfigProperty
+      .key(METADATA_PREFIX + ".index.bloomfilter.enable")
+      .defaultValue(false)
+      .sinceVersion("0.11.0")
+      .withDocumentation("Enable indexing user data files bloom filters under metadata table. When enabled, "
+          + "a new partition under metadata table will be created to store the bloom filter index and will be "

Review comment:
       not sure if we need to explicitly say " a new partition will be created" sort of. we can just say that metadata table will have a partition dedicated to store bloom filters for all data files in hoodie. 

##########
File path: hudi-common/src/main/java/org/apache/hudi/common/util/ParquetUtils.java
##########
@@ -380,4 +382,75 @@ public Boolean apply(String recordKey) {
 
     return val;
   }
+
+  /**
+   * Parse min/max statistics stored in parquet footers for all columns.
+   */
+  public Collection<HoodieColumnStatsMetadata<Comparable>> readColumnStatsFromParquetMetadata(Configuration conf,
+                                                                                              String partitionPath,
+                                                                                              Path parquetFilePath,
+                                                                                              Option<List<String>> latestColumns) {
+
+    ParquetMetadata metadata = readMetadata(conf, parquetFilePath);
+    // collect stats from all parquet blocks
+    Map<String, List<HoodieColumnStatsMetadata<Comparable>>> columnToStatsListMap =

Review comment:
       I am not doing detailed review of this since we will plan to re-use existing code.

##########
File path: hudi-common/src/main/java/org/apache/hudi/metadata/BaseTableMetadata.java
##########
@@ -146,6 +155,97 @@ protected BaseTableMetadata(HoodieEngineContext engineContext, HoodieMetadataCon
         .getAllFilesInPartitions(partitions);
   }
 
+  @Override
+  public Option<ByteBuffer> getBloomFilter(final PartitionIndexID partitionIndexID, final FileIndexID fileIndexID)
+      throws HoodieMetadataException {
+    if (!isMetaIndexBloomFilterEnabled) {
+      LOG.error("Meta bloom filter index is disabled!");

Review comment:
       shouldn't we throw exception here.

##########
File path: hudi-common/src/main/java/org/apache/hudi/metadata/BaseTableMetadata.java
##########
@@ -146,6 +155,97 @@ protected BaseTableMetadata(HoodieEngineContext engineContext, HoodieMetadataCon
         .getAllFilesInPartitions(partitions);
   }
 
+  @Override
+  public Option<ByteBuffer> getBloomFilter(final PartitionIndexID partitionIndexID, final FileIndexID fileIndexID)
+      throws HoodieMetadataException {
+    if (!isMetaIndexBloomFilterEnabled) {
+      LOG.error("Meta bloom filter index is disabled!");
+      return Option.empty();
+    }
+
+    HoodieTimer timer = new HoodieTimer().startTimer();
+    final String bloomIndexKey = partitionIndexID.asBase64EncodedString().concat(fileIndexID.asBase64EncodedString());
+    Option<HoodieRecord<HoodieMetadataPayload>> hoodieRecord = getRecordByKey(bloomIndexKey,
+        MetadataPartitionType.BLOOM_FILTERS.partitionPath());
+    metrics.ifPresent(m -> m.updateMetrics(HoodieMetadataMetrics.LOOKUP_BLOOM_FILTERS_METADATA_STR, timer.endTimer()));
+
+    if (!hoodieRecord.isPresent()) {
+      LOG.error("Meta bloom filter index: lookup failed for partition: " + partitionIndexID.getName() + ", file: "
+          + fileIndexID.getName());
+      return Option.empty();
+    }
+
+    final Option<HoodieMetadataBloomFilter> fileBloomFilter = hoodieRecord.get().getData().getBloomFilterMetadata();
+    if (!fileBloomFilter.isPresent()) {
+      LOG.error("Meta bloom filter index: bloom filter missing for partition: " + partitionIndexID.getName()
+          + ", file: " + fileIndexID.getName());
+      return Option.empty();
+    }
+
+    return Option.of(fileBloomFilter.get().getBloomFilter());
+  }
+
+  @Override
+  public Map<String, ByteBuffer> getBloomFilters(final List<Pair<PartitionIndexID, FileIndexID>> partitionFileIndexIDList)
+      throws HoodieMetadataException {
+    if (!isMetaIndexBloomFilterEnabled) {
+      LOG.error("Meta bloom filter index is disabled!");
+      return Collections.emptyMap();
+    }
+
+    HoodieTimer timer = new HoodieTimer().startTimer();
+    Set<String> partitionIDFileIDSortedStrings = new TreeSet<>();
+    partitionFileIndexIDList.forEach(partitionIDFileIDPair -> {
+          final String bloomKey = partitionIDFileIDPair.getLeft().asBase64EncodedString()
+              .concat(partitionIDFileIDPair.getRight().asBase64EncodedString());
+          partitionIDFileIDSortedStrings.add(bloomKey);
+        }
+    );
+    List<String> partitionIDFileIDStrings = new ArrayList<>(partitionIDFileIDSortedStrings);
+
+    List<Pair<String, Option<HoodieRecord<HoodieMetadataPayload>>>> hoodieRecordList =
+        getRecordsByKeys(partitionIDFileIDStrings, MetadataPartitionType.BLOOM_FILTERS.partitionPath());
+    metrics.ifPresent(m -> m.updateMetrics(HoodieMetadataMetrics.LOOKUP_BLOOM_FILTERS_METADATA_STR, timer.endTimer()));
+
+    Map<String, ByteBuffer> fileToBloomFilterMap = new HashMap<>();
+    for (final Pair<String, Option<HoodieRecord<HoodieMetadataPayload>>> entry : hoodieRecordList) {
+      if (entry.getRight().isPresent()) {
+        final Option<HoodieMetadataBloomFilter> optionalBloomFilterMetadata =
+            entry.getRight().get().getData().getBloomFilterMetadata();
+        if (optionalBloomFilterMetadata.isPresent()) {
+          fileToBloomFilterMap.put(entry.getLeft(), optionalBloomFilterMetadata.get().getBloomFilter());
+        }

Review comment:
       shouldn't we do error logging here if not found? similar to one record look up.

##########
File path: hudi-common/src/main/java/org/apache/hudi/metadata/HoodieBackedTableMetadata.java
##########
@@ -233,38 +250,78 @@ private void initIfNeeded() {
   }
 
   /**
-   * Returns a new pair of readers to the base and log files.
+   * Get the file slice details for the given key in a partition.
+   *
+   * @param partitionName - Metadata partition name
+   * @param key           - Key to get the file slice for
+   * @return Partition and file slice pair for the given key
    */
-  private Pair<HoodieFileReader, HoodieMetadataMergedLogRecordReader> openReadersIfNeeded(String key, String partitionName) {
-    return partitionReaders.computeIfAbsent(partitionName, k -> {
-      try {
-        final long baseFileOpenMs;
-        final long logScannerOpenMs;
-        HoodieFileReader baseFileReader = null;
-        HoodieMetadataMergedLogRecordReader logRecordScanner = null;
+  private Pair<String, FileSlice> getPartitionFileSlice(final String partitionName, final String key) {
+    // Metadata is in sync till the latest completed instant on the dataset
+    List<FileSlice> latestFileSlices =
+        HoodieTableMetadataUtil.getPartitionLatestMergedFileSlices(metadataMetaClient, partitionName);
+    Option<MetadataPartitionType> partitionType = HoodieTableMetadataUtil.fromPartitionPath(partitionName);
+    ValidationUtils.checkArgument(partitionType.isPresent());
+    ValidationUtils.checkArgument(latestFileSlices.size() == partitionType.get().getFileGroupCount(),
+        String.format("Invalid number of file slices: found=%d, required=%d", latestFileSlices.size(),
+            partitionType.get().getFileGroupCount()));
+    final FileSlice slice = latestFileSlices.get(HoodieTableMetadataUtil.mapRecordKeyToFileGroupIndex(key,

Review comment:
       So, `List<FileSlice> latestFileSlices` is ordered by some ordering is it? I see here we are just fetching it by index. 

##########
File path: hudi-common/src/main/java/org/apache/hudi/metadata/BaseTableMetadata.java
##########
@@ -146,6 +155,97 @@ protected BaseTableMetadata(HoodieEngineContext engineContext, HoodieMetadataCon
         .getAllFilesInPartitions(partitions);
   }
 
+  @Override
+  public Option<ByteBuffer> getBloomFilter(final PartitionIndexID partitionIndexID, final FileIndexID fileIndexID)
+      throws HoodieMetadataException {
+    if (!isMetaIndexBloomFilterEnabled) {
+      LOG.error("Meta bloom filter index is disabled!");
+      return Option.empty();
+    }
+
+    HoodieTimer timer = new HoodieTimer().startTimer();
+    final String bloomIndexKey = partitionIndexID.asBase64EncodedString().concat(fileIndexID.asBase64EncodedString());
+    Option<HoodieRecord<HoodieMetadataPayload>> hoodieRecord = getRecordByKey(bloomIndexKey,
+        MetadataPartitionType.BLOOM_FILTERS.partitionPath());
+    metrics.ifPresent(m -> m.updateMetrics(HoodieMetadataMetrics.LOOKUP_BLOOM_FILTERS_METADATA_STR, timer.endTimer()));
+
+    if (!hoodieRecord.isPresent()) {
+      LOG.error("Meta bloom filter index: lookup failed for partition: " + partitionIndexID.getName() + ", file: "
+          + fileIndexID.getName());
+      return Option.empty();
+    }
+
+    final Option<HoodieMetadataBloomFilter> fileBloomFilter = hoodieRecord.get().getData().getBloomFilterMetadata();
+    if (!fileBloomFilter.isPresent()) {
+      LOG.error("Meta bloom filter index: bloom filter missing for partition: " + partitionIndexID.getName()

Review comment:
       do you know when this could happen ?

##########
File path: hudi-common/src/main/java/org/apache/hudi/metadata/HoodieTableMetadataUtil.java
##########
@@ -124,14 +205,107 @@ public static void deleteMetadataTable(String basePath, HoodieEngineContext cont
     return records;
   }
 
+  /**
+   * Convert commit action metadata to bloom filter records.
+   *
+   * @param commitMetadata - Commit action metadata
+   * @param dataMetaClient - Meta client for the data table
+   * @param instantTime    - Action instant time
+   * @return List of metadata table records
+   */
+  public static List<HoodieRecord> convertMetadataToBloomFilterRecords(HoodieCommitMetadata commitMetadata,
+                                                                       HoodieTableMetaClient dataMetaClient,
+                                                                       String instantTime) {
+    List<HoodieRecord> records = new LinkedList<>();
+    commitMetadata.getPartitionToWriteStats().forEach((partitionStatName, writeStats) -> {
+      final String partition = partitionStatName.equals(EMPTY_PARTITION_NAME) ? NON_PARTITIONED_NAME : partitionStatName;
+      Map<String, Long> newFiles = new HashMap<>(writeStats.size());
+      writeStats.forEach(hoodieWriteStat -> {
+        // No action for delta logs
+        if (hoodieWriteStat instanceof HoodieDeltaWriteStat) {
+          return;
+        }
+
+        String pathWithPartition = hoodieWriteStat.getPath();
+        if (pathWithPartition == null) {
+          // Empty partition
+          LOG.error("Failed to find path in write stat to update metadata table " + hoodieWriteStat);
+          return;
+        }
+
+        int offset = partition.equals(NON_PARTITIONED_NAME) ? (pathWithPartition.startsWith("/") ? 1 : 0) :
+            partition.length() + 1;
+
+        final String fileName = pathWithPartition.substring(offset);
+        ValidationUtils.checkState(FSUtils.isBaseFile(new Path(fileName)));

Review comment:
       for MOR, we can get log files too here. why adding a constraint that it has to be base file

##########
File path: hudi-common/src/main/java/org/apache/hudi/metadata/HoodieBackedTableMetadata.java
##########
@@ -125,30 +129,43 @@ private void initIfNeeded() {
     return recordsByKeys.size() == 0 ? Option.empty() : recordsByKeys.get(0).getValue();
   }
 
-  protected List<Pair<String, Option<HoodieRecord<HoodieMetadataPayload>>>> getRecordsByKeys(List<String> keys, String partitionName) {
-    Pair<HoodieFileReader, HoodieMetadataMergedLogRecordReader> readers = openReadersIfNeeded(keys.get(0), partitionName);
-    try {
-      List<Long> timings = new ArrayList<>();
-      HoodieFileReader baseFileReader = readers.getKey();
-      HoodieMetadataMergedLogRecordReader logRecordScanner = readers.getRight();
+  @Override
+  protected List<Pair<String, Option<HoodieRecord<HoodieMetadataPayload>>>> getRecordsByKeys(List<String> keys,
+                                                                                             String partitionName) {
+    Map<Pair<String, FileSlice>, List<String>> partitionFileSliceToKeysMap = getPartitionFileSlices(partitionName, keys);
+    List<Pair<String, Option<HoodieRecord<HoodieMetadataPayload>>>> result = new ArrayList<>();
+    AtomicInteger fileSlicesKeysCount = new AtomicInteger();
+    partitionFileSliceToKeysMap.forEach((partitionFileSlicePair, fileSliceKeys) -> {
+      Pair<HoodieFileReader, HoodieMetadataMergedLogRecordReader> readers = openReadersIfNeeded(partitionName,
+          partitionFileSlicePair.getRight());
+      try {
+        List<Long> timings = new ArrayList<>();
+        HoodieFileReader baseFileReader = readers.getKey();
+        HoodieMetadataMergedLogRecordReader logRecordScanner = readers.getRight();
 
-      if (baseFileReader == null && logRecordScanner == null) {
-        return Collections.emptyList();
-      }
+        if (baseFileReader == null && logRecordScanner == null) {
+          return;
+        }
 
-      // local map to assist in merging with base file records
-      Map<String, Option<HoodieRecord<HoodieMetadataPayload>>> logRecords = readLogRecords(logRecordScanner, keys, timings);
-      List<Pair<String, Option<HoodieRecord<HoodieMetadataPayload>>>> result = readFromBaseAndMergeWithLogRecords(
-          baseFileReader, keys, logRecords, timings, partitionName);
-      LOG.info(String.format("Metadata read for %s keys took [baseFileRead, logMerge] %s ms", keys.size(), timings));
-      return result;
-    } catch (IOException ioe) {
-      throw new HoodieIOException("Error merging records from metadata table for  " + keys.size() + " key : ", ioe);
-    } finally {
-      if (!reuse) {
-        close(partitionName);
+        // local map to assist in merging with base file records
+        Map<String, Option<HoodieRecord<HoodieMetadataPayload>>> logRecords = readLogRecords(logRecordScanner,
+            fileSliceKeys, timings);
+        result.addAll(readFromBaseAndMergeWithLogRecords(baseFileReader, fileSliceKeys, logRecords,

Review comment:
       readFromBaseAndMergeWithLogRecords was written assuming FILES partition. so if a key is not found, it will return Option.empty. Here with multiple file slices, I assume we ensure each key is looked up only in the respective file slice? if not, it will lead to loss of data.

##########
File path: hudi-common/src/main/java/org/apache/hudi/metadata/HoodieTableMetadataUtil.java
##########
@@ -319,9 +635,90 @@ private static void processRollbackMetadata(HoodieActiveTimeline metadataTableTi
     return records;
   }
 
+  /**
+   * Convert rollback action metadata to bloom filter index records.
+   */
+  private static List<HoodieRecord> convertFilesToBloomFilterRecords(HoodieEngineContext engineContext,
+                                                                     HoodieTableMetaClient dataMetaClient,
+                                                                     Map<String, List<String>> partitionToDeletedFiles,
+                                                                     Map<String, Map<String, Long>> partitionToAppendedFiles,
+                                                                     String instantTime) {
+    List<HoodieRecord> records = new LinkedList<>();
+    partitionToDeletedFiles.forEach((partitionName, deletedFileList) -> deletedFileList.forEach(deletedFile -> {
+      if (!FSUtils.isBaseFile(new Path(deletedFile))) {

Review comment:
       again, why do we have this constraint ?

##########
File path: hudi-common/src/main/java/org/apache/hudi/metadata/HoodieTableMetadataUtil.java
##########
@@ -150,48 +324,187 @@ public static void deleteMetadataTable(String basePath, HoodieEngineContext cont
     return records;
   }
 
+  /**
+   * Convert clean metadata to bloom filter index records.
+   *
+   * @param cleanMetadata - Clean action metadata
+   * @param engineContext - Engine context
+   * @param instantTime   - Clean action instant time
+   * @return List of bloom filter index records for the clean metadata
+   */
+  public static List<HoodieRecord> convertMetadataToBloomFilterRecords(HoodieCleanMetadata cleanMetadata,
+                                                                       HoodieEngineContext engineContext,
+                                                                       String instantTime) {
+    List<Pair<String, String>> deleteFileList = new ArrayList<>();
+    cleanMetadata.getPartitionMetadata().forEach((partition, partitionMetadata) -> {
+      // Files deleted from a partition
+      List<String> deletedFiles = partitionMetadata.getDeletePathPatterns();
+      deletedFiles.forEach(entry -> {
+        if (FSUtils.isBaseFile(new Path(entry))) {

Review comment:
       why only base file ?

##########
File path: hudi-common/src/main/java/org/apache/hudi/metadata/HoodieTableMetadataUtil.java
##########
@@ -319,9 +635,90 @@ private static void processRollbackMetadata(HoodieActiveTimeline metadataTableTi
     return records;
   }
 
+  /**
+   * Convert rollback action metadata to bloom filter index records.
+   */
+  private static List<HoodieRecord> convertFilesToBloomFilterRecords(HoodieEngineContext engineContext,
+                                                                     HoodieTableMetaClient dataMetaClient,
+                                                                     Map<String, List<String>> partitionToDeletedFiles,
+                                                                     Map<String, Map<String, Long>> partitionToAppendedFiles,
+                                                                     String instantTime) {
+    List<HoodieRecord> records = new LinkedList<>();
+    partitionToDeletedFiles.forEach((partitionName, deletedFileList) -> deletedFileList.forEach(deletedFile -> {
+      if (!FSUtils.isBaseFile(new Path(deletedFile))) {
+        return;
+      }
+
+      final String partition = partitionName.equals(EMPTY_PARTITION_NAME) ? NON_PARTITIONED_NAME : partitionName;
+      records.add(HoodieMetadataPayload.createBloomFilterMetadataRecord(
+          new PartitionIndexID(partition), new FileIndexID(deletedFile),
+          instantTime, ByteBuffer.allocate(0), true));
+    }));
+
+    partitionToAppendedFiles.forEach((partitionName, appendedFileMap) -> {
+      final String partition = partitionName.equals(EMPTY_PARTITION_NAME) ? NON_PARTITIONED_NAME : partitionName;
+      appendedFileMap.forEach((appendedFile, length) -> {
+        if (!FSUtils.isBaseFile(new Path(appendedFile))) {
+          return;
+        }
+        final String pathWithPartition = partitionName + "/" + appendedFile;
+        final Path appendedFilePath = new Path(dataMetaClient.getBasePath(), pathWithPartition);
+        try {
+          HoodieFileReader<IndexedRecord> fileReader =
+              HoodieFileReaderFactory.getFileReader(dataMetaClient.getHadoopConf(), appendedFilePath);
+          final BloomFilter fileBloomFilter = fileReader.readBloomFilter();
+          if (fileBloomFilter == null) {
+            LOG.error("Failed to read bloom filter for " + appendedFilePath);
+            return;
+          }
+          ByteBuffer bloomByteBuffer = ByteBuffer.wrap(fileBloomFilter.serializeToString().getBytes());
+          HoodieRecord record = HoodieMetadataPayload.createBloomFilterMetadataRecord(
+              new PartitionIndexID(partition), new FileIndexID(appendedFile), instantTime,
+              bloomByteBuffer, false);
+          records.add(record);
+          fileReader.close();
+        } catch (IOException e) {
+          LOG.error("Failed to get bloom filter for file: " + appendedFilePath);
+        }
+      });
+    });
+    return records;
+  }
+
+  /**
+   * Convert rollback action metadata to column stats index records.
+   */
+  private static List<HoodieRecord> convertFilesToColumnStatsRecords(HoodieEngineContext engineContext,
+                                                                     HoodieTableMetaClient datasetMetaClient,
+                                                                     Map<String, List<String>> partitionToDeletedFiles,
+                                                                     Map<String, Map<String, Long>> partitionToAppendedFiles,
+                                                                     String instantTime) {
+    List<HoodieRecord> records = new LinkedList<>();
+    List<String> latestColumns = getLatestColumns(datasetMetaClient);
+    partitionToDeletedFiles.forEach((partitionName, deletedFileList) -> deletedFileList.forEach(deletedFile -> {
+      final String partition = partitionName.equals(EMPTY_PARTITION_NAME) ? NON_PARTITIONED_NAME : partitionName;

Review comment:
       can you move this to a method .
   ```
   partitionName.equals(EMPTY_PARTITION_NAME) ? NON_PARTITIONED_NAME : partitionName
   ```

##########
File path: hudi-common/src/main/java/org/apache/hudi/metadata/HoodieTableMetadataUtil.java
##########
@@ -319,9 +635,90 @@ private static void processRollbackMetadata(HoodieActiveTimeline metadataTableTi
     return records;
   }
 
+  /**
+   * Convert rollback action metadata to bloom filter index records.
+   */
+  private static List<HoodieRecord> convertFilesToBloomFilterRecords(HoodieEngineContext engineContext,
+                                                                     HoodieTableMetaClient dataMetaClient,
+                                                                     Map<String, List<String>> partitionToDeletedFiles,
+                                                                     Map<String, Map<String, Long>> partitionToAppendedFiles,
+                                                                     String instantTime) {
+    List<HoodieRecord> records = new LinkedList<>();
+    partitionToDeletedFiles.forEach((partitionName, deletedFileList) -> deletedFileList.forEach(deletedFile -> {
+      if (!FSUtils.isBaseFile(new Path(deletedFile))) {
+        return;
+      }
+
+      final String partition = partitionName.equals(EMPTY_PARTITION_NAME) ? NON_PARTITIONED_NAME : partitionName;
+      records.add(HoodieMetadataPayload.createBloomFilterMetadataRecord(
+          new PartitionIndexID(partition), new FileIndexID(deletedFile),
+          instantTime, ByteBuffer.allocate(0), true));
+    }));
+
+    partitionToAppendedFiles.forEach((partitionName, appendedFileMap) -> {

Review comment:
       appended files are always log files in my understanding. So, do we skip processing them entirely for bloom filter partition ? 

##########
File path: hudi-common/src/main/java/org/apache/hudi/metadata/HoodieTableMetadataUtil.java
##########
@@ -399,4 +796,110 @@ public static int mapRecordKeyToFileGroupIndex(String recordKey, int numFileGrou
     return fileSliceStream.sorted((s1, s2) -> s1.getFileId().compareTo(s2.getFileId())).collect(Collectors.toList());
   }
 
+  public static List<HoodieRecord> convertMetadataToColumnStatsRecords(HoodieCommitMetadata commitMetadata,
+                                                                       HoodieEngineContext engineContext,
+                                                                       HoodieTableMetaClient dataMetaClient,
+                                                                       boolean isMetaIndexColumnStatsForAllColumns,
+                                                                       String instantTime) {
+
+    try {
+      List<HoodieWriteStat> allWriteStats = commitMetadata.getPartitionToWriteStats().values().stream()
+          .flatMap(entry -> entry.stream()).collect(Collectors.toList());
+      return HoodieTableMetadataUtil.createColumnStatsFromWriteStats(engineContext, dataMetaClient, allWriteStats,
+          isMetaIndexColumnStatsForAllColumns);
+    } catch (Exception e) {
+      throw new HoodieException("Failed to generate column stats records for metadata table ", e);
+    }
+  }
+
+  /**
+   * @param engineContext
+   * @param datasetMetaClient
+   * @param allWriteStats
+   * @param isMetaIndexColumnStatsForAllColumns
+   */
+  public static List<HoodieRecord> createColumnStatsFromWriteStats(HoodieEngineContext engineContext,
+                                                                   HoodieTableMetaClient datasetMetaClient,
+                                                                   List<HoodieWriteStat> allWriteStats,
+                                                                   boolean isMetaIndexColumnStatsForAllColumns) throws Exception {
+    if (allWriteStats.isEmpty()) {
+      return Collections.emptyList();
+    }
+
+    List<HoodieWriteStat> prunedWriteStats = allWriteStats.stream().filter(writeStat -> {
+      return !(writeStat instanceof HoodieDeltaWriteStat);

Review comment:
       Do you know if HoodieDeltaWriteStat is used only for delta commits writing log files. whats the write status class used with delta commit writing base files? 

##########
File path: hudi-common/src/main/java/org/apache/hudi/metadata/HoodieMetadataPayload.java
##########
@@ -45,36 +57,64 @@
 import static org.apache.hudi.metadata.HoodieTableMetadata.RECORDKEY_PARTITION_LIST;
 
 /**
- * This is a payload which saves information about a single entry in the Metadata Table.
- *
- * The type of the entry is determined by the "type" saved within the record. The following types of entries are saved:
- *
- *   1. List of partitions: There is a single such record
- *         key="__all_partitions__"
- *
- *   2. List of files in a Partition: There is one such record for each partition
- *         key=Partition name
- *
- *  During compaction on the table, the deletions are merged with additions and hence pruned.
- *
- * Metadata Table records are saved with the schema defined in HoodieMetadata.avsc. This class encapsulates the
- * HoodieMetadataRecord for ease of operations.
+ * MetadataTable records are persisted with the schema defined in HoodieMetadata.avsc.
+ * This class represents the payload for the MetadataTable.
+ * <p>
+ * This single metadata payload is shared by all the partitions under the metadata table.
+ * The partition specific records are determined by the field "type" saved within the record.
+ * The following types are supported:
+ * <p>
+ * METADATA_TYPE_PARTITION_LIST (1):
+ * -- List of all partitions. There is a single such record
+ * -- key = @{@link HoodieTableMetadata.RECORDKEY_PARTITION_LIST}
+ * <p>
+ * METADATA_TYPE_FILE_LIST (2):
+ * -- List of all files in a partition. There is one such record for each partition
+ * -- key = partition name
+ * <p>
+ * METADATA_TYPE_COLUMN_STATS (3):
+ * -- This is an index for column stats in the table
+ * <p>
+ * METADATA_TYPE_BLOOM_FILTER (4):
+ * -- This is an index for base file bloom filters. This is a map of FileID to its BloomFilter byte[].
+ * <p>
+ * During compaction on the table, the deletions are merged with additions and hence records are pruned.
  */
 public class HoodieMetadataPayload implements HoodieRecordPayload<HoodieMetadataPayload> {
 
+  // Type of the record. This can be an enum in the schema but Avro1.8
+  // has a bug - https://issues.apache.org/jira/browse/AVRO-1810
+  protected static final int METADATA_TYPE_PARTITION_LIST = 1;
+  protected static final int METADATA_TYPE_FILE_LIST = 2;
+  protected static final int METADATA_TYPE_COLUMN_STATS = 3;
+  protected static final int METADATA_TYPE_BLOOM_FILTER = 4;
+
   // HoodieMetadata schema field ids
   public static final String SCHEMA_FIELD_ID_KEY = "key";
   public static final String SCHEMA_FIELD_ID_TYPE = "type";
-  public static final String SCHEMA_FIELD_ID_METADATA = "filesystemMetadata";
+  public static final String SCHEMA_FIELD_ID_FILESYSTEM = "filesystemMetadata";
+  private static final String SCHEMA_FIELD_ID_COLUMN_STATS = "ColumnStatsMetadata";
+  private static final String SCHEMA_FIELD_ID_BLOOM_FILTER = "BloomFilterMetadata";
 
-  // Type of the record
-  // This can be an enum in the schema but Avro 1.8 has a bug - https://issues.apache.org/jira/browse/AVRO-1810
-  private static final int PARTITION_LIST = 1;
-  private static final int FILE_LIST = 2;
+  // HoodieMetadata bloom filter payload field ids
+  private static final String BLOOM_FILTER_FIELD_TYPE = "type";
+  private static final String BLOOM_FILTER_FIELD_TIMESTAMP = "timestamp";
+  private static final String BLOOM_FILTER_FIELD_BLOOM_FILTER = "bloomFilter";
+  private static final String BLOOM_FILTER_FIELD_IS_DELETED = "isDeleted";
+  private static final String BLOOM_FILTER_FIELD_RESERVED = "reserved";
+
+  // HoodieMetadata column stats payload field ids
+  private static final String COLUMN_STATS_FIELD_MIN_VALUE = "minValue";
+  private static final String COLUMN_STATS_FIELD_MAX_VALUE = "maxValue";
+  private static final String COLUMN_STATS_FIELD_NULL_COUNT = "nullCount";
+  private static final String COLUMN_STATS_FIELD_IS_DELETED = "isDeleted";

Review comment:
       can we define one variable for "isDeleted"

##########
File path: hudi-common/src/main/java/org/apache/hudi/metadata/HoodieMetadataPayload.java
##########
@@ -109,55 +194,92 @@ private HoodieMetadataPayload(String key, int type, Map<String, HoodieMetadataFi
    */
   public static HoodieRecord<HoodieMetadataPayload> createPartitionListRecord(List<String> partitions) {
     Map<String, HoodieMetadataFileInfo> fileInfo = new HashMap<>();
-    partitions.forEach(partition -> fileInfo.put(partition, new HoodieMetadataFileInfo(0L,  false)));
+    partitions.forEach(partition -> fileInfo.put(partition, new HoodieMetadataFileInfo(0L, false)));
 
     HoodieKey key = new HoodieKey(RECORDKEY_PARTITION_LIST, MetadataPartitionType.FILES.partitionPath());
-    HoodieMetadataPayload payload = new HoodieMetadataPayload(key.getRecordKey(), PARTITION_LIST, fileInfo);
+    HoodieMetadataPayload payload = new HoodieMetadataPayload(key.getRecordKey(), METADATA_TYPE_PARTITION_LIST,
+        fileInfo);
     return new HoodieRecord<>(key, payload);
   }
 
   /**
    * Create and return a {@code HoodieMetadataPayload} to save list of files within a partition.
    *
-   * @param partition The name of the partition
-   * @param filesAdded Mapping of files to their sizes for files which have been added to this partition
+   * @param partition    The name of the partition
+   * @param filesAdded   Mapping of files to their sizes for files which have been added to this partition
    * @param filesDeleted List of files which have been deleted from this partition
    */
   public static HoodieRecord<HoodieMetadataPayload> createPartitionFilesRecord(String partition,
-                                                                               Option<Map<String, Long>> filesAdded, Option<List<String>> filesDeleted) {
+                                                                               Option<Map<String, Long>> filesAdded,
+                                                                               Option<List<String>> filesDeleted) {
     Map<String, HoodieMetadataFileInfo> fileInfo = new HashMap<>();
     filesAdded.ifPresent(
         m -> m.forEach((filename, size) -> fileInfo.put(filename, new HoodieMetadataFileInfo(size, false))));
     filesDeleted.ifPresent(
-        m -> m.forEach(filename -> fileInfo.put(filename, new HoodieMetadataFileInfo(0L,  true))));
+        m -> m.forEach(filename -> fileInfo.put(filename, new HoodieMetadataFileInfo(0L, true))));
 
     HoodieKey key = new HoodieKey(partition, MetadataPartitionType.FILES.partitionPath());
-    HoodieMetadataPayload payload = new HoodieMetadataPayload(key.getRecordKey(), FILE_LIST, fileInfo);
+    HoodieMetadataPayload payload = new HoodieMetadataPayload(key.getRecordKey(), METADATA_TYPE_FILE_LIST, fileInfo);
     return new HoodieRecord<>(key, payload);
   }
 
+  /**
+   * Create bloom filter metadata record.
+   *
+   * @param fileID      - FileID for which the bloom filter needs to persisted
+   * @param timestamp   - Instant timestamp responsible for this record
+   * @param bloomFilter - Bloom filter for the File
+   * @param isDeleted   - Is the bloom filter no more valid
+   * @return Metadata payload containing the fileID and its bloom filter record
+   */
+  public static HoodieRecord<HoodieMetadataPayload> createBloomFilterMetadataRecord(final PartitionIndexID partitionID,
+                                                                                    final FileIndexID fileID,
+                                                                                    final String timestamp,
+                                                                                    final ByteBuffer bloomFilter,
+                                                                                    final boolean isDeleted) {
+    final String bloomFilterKey = partitionID.asBase64EncodedString().concat(fileID.asBase64EncodedString());
+    HoodieKey key = new HoodieKey(bloomFilterKey, MetadataPartitionType.BLOOM_FILTERS.partitionPath());
+
+    // TODO: Get the bloom filter type from the file

Review comment:
       do you plan to fix this in this patch or later.

##########
File path: hudi-common/src/main/java/org/apache/hudi/metadata/HoodieMetadataPayload.java
##########
@@ -109,55 +194,92 @@ private HoodieMetadataPayload(String key, int type, Map<String, HoodieMetadataFi
    */
   public static HoodieRecord<HoodieMetadataPayload> createPartitionListRecord(List<String> partitions) {
     Map<String, HoodieMetadataFileInfo> fileInfo = new HashMap<>();
-    partitions.forEach(partition -> fileInfo.put(partition, new HoodieMetadataFileInfo(0L,  false)));
+    partitions.forEach(partition -> fileInfo.put(partition, new HoodieMetadataFileInfo(0L, false)));
 
     HoodieKey key = new HoodieKey(RECORDKEY_PARTITION_LIST, MetadataPartitionType.FILES.partitionPath());
-    HoodieMetadataPayload payload = new HoodieMetadataPayload(key.getRecordKey(), PARTITION_LIST, fileInfo);
+    HoodieMetadataPayload payload = new HoodieMetadataPayload(key.getRecordKey(), METADATA_TYPE_PARTITION_LIST,
+        fileInfo);
     return new HoodieRecord<>(key, payload);
   }
 
   /**
    * Create and return a {@code HoodieMetadataPayload} to save list of files within a partition.
    *
-   * @param partition The name of the partition
-   * @param filesAdded Mapping of files to their sizes for files which have been added to this partition
+   * @param partition    The name of the partition
+   * @param filesAdded   Mapping of files to their sizes for files which have been added to this partition
    * @param filesDeleted List of files which have been deleted from this partition
    */
   public static HoodieRecord<HoodieMetadataPayload> createPartitionFilesRecord(String partition,
-                                                                               Option<Map<String, Long>> filesAdded, Option<List<String>> filesDeleted) {
+                                                                               Option<Map<String, Long>> filesAdded,
+                                                                               Option<List<String>> filesDeleted) {
     Map<String, HoodieMetadataFileInfo> fileInfo = new HashMap<>();
     filesAdded.ifPresent(
         m -> m.forEach((filename, size) -> fileInfo.put(filename, new HoodieMetadataFileInfo(size, false))));
     filesDeleted.ifPresent(
-        m -> m.forEach(filename -> fileInfo.put(filename, new HoodieMetadataFileInfo(0L,  true))));
+        m -> m.forEach(filename -> fileInfo.put(filename, new HoodieMetadataFileInfo(0L, true))));
 
     HoodieKey key = new HoodieKey(partition, MetadataPartitionType.FILES.partitionPath());
-    HoodieMetadataPayload payload = new HoodieMetadataPayload(key.getRecordKey(), FILE_LIST, fileInfo);
+    HoodieMetadataPayload payload = new HoodieMetadataPayload(key.getRecordKey(), METADATA_TYPE_FILE_LIST, fileInfo);
     return new HoodieRecord<>(key, payload);
   }
 
+  /**
+   * Create bloom filter metadata record.
+   *
+   * @param fileID      - FileID for which the bloom filter needs to persisted
+   * @param timestamp   - Instant timestamp responsible for this record
+   * @param bloomFilter - Bloom filter for the File
+   * @param isDeleted   - Is the bloom filter no more valid
+   * @return Metadata payload containing the fileID and its bloom filter record
+   */
+  public static HoodieRecord<HoodieMetadataPayload> createBloomFilterMetadataRecord(final PartitionIndexID partitionID,
+                                                                                    final FileIndexID fileID,
+                                                                                    final String timestamp,
+                                                                                    final ByteBuffer bloomFilter,
+                                                                                    final boolean isDeleted) {
+    final String bloomFilterKey = partitionID.asBase64EncodedString().concat(fileID.asBase64EncodedString());
+    HoodieKey key = new HoodieKey(bloomFilterKey, MetadataPartitionType.BLOOM_FILTERS.partitionPath());
+
+    // TODO: Get the bloom filter type from the file
+    HoodieMetadataBloomFilter metadataBloomFilter =
+        new HoodieMetadataBloomFilter(BloomFilterTypeCode.DYNAMIC_V0.name(),
+            timestamp, bloomFilter, isDeleted, ByteBuffer.allocate(0));
+    HoodieMetadataPayload metadataPayload = new HoodieMetadataPayload(key.getRecordKey(),
+        HoodieMetadataPayload.METADATA_TYPE_BLOOM_FILTER, metadataBloomFilter);
+    return new HoodieRecord<>(key, metadataPayload);
+  }
+
   @Override
   public HoodieMetadataPayload preCombine(HoodieMetadataPayload previousRecord) {
     ValidationUtils.checkArgument(previousRecord.type == type,
-        "Cannot combine " + previousRecord.type  + " with " + type);
-
-    Map<String, HoodieMetadataFileInfo> combinedFileInfo = null;
+        "Cannot combine " + previousRecord.type + " with " + type);
 
     switch (type) {
-      case PARTITION_LIST:
-      case FILE_LIST:
-        combinedFileInfo = combineFilesystemMetadata(previousRecord);
-        break;
+      case METADATA_TYPE_PARTITION_LIST:
+      case METADATA_TYPE_FILE_LIST:
+        Map<String, HoodieMetadataFileInfo> combinedFileInfo = combineFilesystemMetadata(previousRecord);
+        return new HoodieMetadataPayload(key, type, combinedFileInfo);
+      case METADATA_TYPE_BLOOM_FILTER:
+        HoodieMetadataBloomFilter combineBloomFilterMetadata = combineBloomFilterMetadata(previousRecord);
+        return new HoodieMetadataPayload(key, type, combineBloomFilterMetadata);
+      case METADATA_TYPE_COLUMN_STATS:
+        return new HoodieMetadataPayload(key, type, combineColumnStats(previousRecord));
       default:
         throw new HoodieMetadataException("Unknown type of HoodieMetadataPayload: " + type);
     }
+  }
 
-    return new HoodieMetadataPayload(key, type, combinedFileInfo);
+  private HoodieMetadataBloomFilter combineBloomFilterMetadata(HoodieMetadataPayload previousRecord) {
+    return this.bloomFilterMetadata;

Review comment:
       we don't need to merge anything here is it? we just choose the latest version? Is it intentionally designed this way. 

##########
File path: hudi-common/src/main/java/org/apache/hudi/metadata/HoodieMetadataPayload.java
##########
@@ -109,55 +194,92 @@ private HoodieMetadataPayload(String key, int type, Map<String, HoodieMetadataFi
    */
   public static HoodieRecord<HoodieMetadataPayload> createPartitionListRecord(List<String> partitions) {
     Map<String, HoodieMetadataFileInfo> fileInfo = new HashMap<>();
-    partitions.forEach(partition -> fileInfo.put(partition, new HoodieMetadataFileInfo(0L,  false)));
+    partitions.forEach(partition -> fileInfo.put(partition, new HoodieMetadataFileInfo(0L, false)));
 
     HoodieKey key = new HoodieKey(RECORDKEY_PARTITION_LIST, MetadataPartitionType.FILES.partitionPath());
-    HoodieMetadataPayload payload = new HoodieMetadataPayload(key.getRecordKey(), PARTITION_LIST, fileInfo);
+    HoodieMetadataPayload payload = new HoodieMetadataPayload(key.getRecordKey(), METADATA_TYPE_PARTITION_LIST,
+        fileInfo);
     return new HoodieRecord<>(key, payload);
   }
 
   /**
    * Create and return a {@code HoodieMetadataPayload} to save list of files within a partition.
    *
-   * @param partition The name of the partition
-   * @param filesAdded Mapping of files to their sizes for files which have been added to this partition
+   * @param partition    The name of the partition
+   * @param filesAdded   Mapping of files to their sizes for files which have been added to this partition
    * @param filesDeleted List of files which have been deleted from this partition
    */
   public static HoodieRecord<HoodieMetadataPayload> createPartitionFilesRecord(String partition,
-                                                                               Option<Map<String, Long>> filesAdded, Option<List<String>> filesDeleted) {
+                                                                               Option<Map<String, Long>> filesAdded,
+                                                                               Option<List<String>> filesDeleted) {
     Map<String, HoodieMetadataFileInfo> fileInfo = new HashMap<>();
     filesAdded.ifPresent(
         m -> m.forEach((filename, size) -> fileInfo.put(filename, new HoodieMetadataFileInfo(size, false))));
     filesDeleted.ifPresent(
-        m -> m.forEach(filename -> fileInfo.put(filename, new HoodieMetadataFileInfo(0L,  true))));
+        m -> m.forEach(filename -> fileInfo.put(filename, new HoodieMetadataFileInfo(0L, true))));
 
     HoodieKey key = new HoodieKey(partition, MetadataPartitionType.FILES.partitionPath());
-    HoodieMetadataPayload payload = new HoodieMetadataPayload(key.getRecordKey(), FILE_LIST, fileInfo);
+    HoodieMetadataPayload payload = new HoodieMetadataPayload(key.getRecordKey(), METADATA_TYPE_FILE_LIST, fileInfo);
     return new HoodieRecord<>(key, payload);
   }
 
+  /**
+   * Create bloom filter metadata record.
+   *
+   * @param fileID      - FileID for which the bloom filter needs to persisted
+   * @param timestamp   - Instant timestamp responsible for this record
+   * @param bloomFilter - Bloom filter for the File
+   * @param isDeleted   - Is the bloom filter no more valid
+   * @return Metadata payload containing the fileID and its bloom filter record
+   */
+  public static HoodieRecord<HoodieMetadataPayload> createBloomFilterMetadataRecord(final PartitionIndexID partitionID,
+                                                                                    final FileIndexID fileID,
+                                                                                    final String timestamp,
+                                                                                    final ByteBuffer bloomFilter,
+                                                                                    final boolean isDeleted) {
+    final String bloomFilterKey = partitionID.asBase64EncodedString().concat(fileID.asBase64EncodedString());
+    HoodieKey key = new HoodieKey(bloomFilterKey, MetadataPartitionType.BLOOM_FILTERS.partitionPath());
+
+    // TODO: Get the bloom filter type from the file
+    HoodieMetadataBloomFilter metadataBloomFilter =
+        new HoodieMetadataBloomFilter(BloomFilterTypeCode.DYNAMIC_V0.name(),
+            timestamp, bloomFilter, isDeleted, ByteBuffer.allocate(0));
+    HoodieMetadataPayload metadataPayload = new HoodieMetadataPayload(key.getRecordKey(),
+        HoodieMetadataPayload.METADATA_TYPE_BLOOM_FILTER, metadataBloomFilter);
+    return new HoodieRecord<>(key, metadataPayload);
+  }
+
   @Override
   public HoodieMetadataPayload preCombine(HoodieMetadataPayload previousRecord) {
     ValidationUtils.checkArgument(previousRecord.type == type,
-        "Cannot combine " + previousRecord.type  + " with " + type);
-
-    Map<String, HoodieMetadataFileInfo> combinedFileInfo = null;
+        "Cannot combine " + previousRecord.type + " with " + type);
 
     switch (type) {
-      case PARTITION_LIST:
-      case FILE_LIST:
-        combinedFileInfo = combineFilesystemMetadata(previousRecord);
-        break;
+      case METADATA_TYPE_PARTITION_LIST:
+      case METADATA_TYPE_FILE_LIST:
+        Map<String, HoodieMetadataFileInfo> combinedFileInfo = combineFilesystemMetadata(previousRecord);
+        return new HoodieMetadataPayload(key, type, combinedFileInfo);
+      case METADATA_TYPE_BLOOM_FILTER:
+        HoodieMetadataBloomFilter combineBloomFilterMetadata = combineBloomFilterMetadata(previousRecord);
+        return new HoodieMetadataPayload(key, type, combineBloomFilterMetadata);
+      case METADATA_TYPE_COLUMN_STATS:
+        return new HoodieMetadataPayload(key, type, combineColumnStats(previousRecord));
       default:
         throw new HoodieMetadataException("Unknown type of HoodieMetadataPayload: " + type);
     }
+  }
 
-    return new HoodieMetadataPayload(key, type, combinedFileInfo);
+  private HoodieMetadataBloomFilter combineBloomFilterMetadata(HoodieMetadataPayload previousRecord) {
+    return this.bloomFilterMetadata;
+  }
+
+  private HoodieColumnStats combineColumnStats(HoodieMetadataPayload previousRecord) {
+    return this.columnStatMetadata;

Review comment:
       same question as above. 
   

##########
File path: hudi-common/src/main/java/org/apache/hudi/metadata/HoodieTableMetadataUtil.java
##########
@@ -319,9 +635,90 @@ private static void processRollbackMetadata(HoodieActiveTimeline metadataTableTi
     return records;
   }
 
+  /**
+   * Convert rollback action metadata to bloom filter index records.
+   */
+  private static List<HoodieRecord> convertFilesToBloomFilterRecords(HoodieEngineContext engineContext,
+                                                                     HoodieTableMetaClient dataMetaClient,
+                                                                     Map<String, List<String>> partitionToDeletedFiles,
+                                                                     Map<String, Map<String, Long>> partitionToAppendedFiles,
+                                                                     String instantTime) {
+    List<HoodieRecord> records = new LinkedList<>();
+    partitionToDeletedFiles.forEach((partitionName, deletedFileList) -> deletedFileList.forEach(deletedFile -> {
+      if (!FSUtils.isBaseFile(new Path(deletedFile))) {

Review comment:
       oh, bcoz, we don't store non base files in bloom partition

##########
File path: hudi-common/src/main/java/org/apache/hudi/common/util/ParquetUtils.java
##########
@@ -380,4 +382,76 @@ public Boolean apply(String recordKey) {
 
     return val;
   }
+
+  /**
+   * Parse min/max statistics stored in parquet footers for all columns.
+   */
+  public Collection<HoodieColumnStatsMetadata<Comparable>> readColumnStatsFromParquetMetadata(Configuration conf,
+                                                                                              String partitionPath,

Review comment:
       is the comment addressed ?

##########
File path: hudi-common/src/main/java/org/apache/hudi/metadata/HoodieMetadataPayload.java
##########
@@ -234,13 +379,65 @@ public HoodieMetadataPayload preCombine(HoodieMetadataPayload previousRecord) {
     return combinedFileInfo;
   }
 
+  public static Stream<HoodieRecord> createColumnStatsRecords(
+      Collection<HoodieColumnStatsMetadata<Comparable>> columnRangeInfo) {
+    return columnRangeInfo.stream().map(columnStatsMetadata -> {
+      HoodieKey key = new HoodieKey(getColumnStatsRecordKey(columnStatsMetadata),
+          MetadataPartitionType.COLUMN_STATS.partitionPath());
+
+      HoodieMetadataPayload payload = new HoodieMetadataPayload(key.getRecordKey(), METADATA_TYPE_COLUMN_STATS,
+          HoodieColumnStats.newBuilder()
+              .setMinValue(columnStatsMetadata.getMinValue() == null ? null :
+                  new String(((Binary) columnStatsMetadata.getMinValue()).getBytes()))
+              .setMaxValue(columnStatsMetadata.getMaxValue() == null ? null :
+                  new String(((Binary) columnStatsMetadata.getMaxValue()).getBytes()))
+              .setNullCount(columnStatsMetadata.getNullCount())
+              .setIsDeleted(false)
+              .setReserved(ByteBuffer.allocate(0))
+              .build());
+
+      return new HoodieRecord<>(key, payload);
+    });
+  }
+
+  // get record key from column stats metadata
+  public static String getColumnStatsRecordKey(HoodieColumnStatsMetadata<Comparable> columnStatsMetadata) {
+    final ColumnIndexID columnID = new ColumnIndexID(columnStatsMetadata.getColumnName());
+    final PartitionIndexID partitionID = new PartitionIndexID(columnStatsMetadata.getPartitionPath());
+    final FileIndexID fileID = new FileIndexID(FSUtils.getFileId(new Path(columnStatsMetadata.getFilePath()).getName()));
+    return columnID.asBase64EncodedString()
+        .concat(partitionID.asBase64EncodedString())
+        .concat(fileID.asBase64EncodedString());
+  }
+
+  // parse attribute in record key. TODO: find better way to get this attribute instaed of parsing key
+  public static String getAttributeFromRecordKey(String recordKey, String attribute) {
+    final String columnIDBase64EncodedString = recordKey.substring(0, ColumnIndexID.ID_COLUMN_HASH_SIZE.bits());
+
+    return "";

Review comment:
       is this used anywhere? why returning empty string.

##########
File path: hudi-common/src/main/java/org/apache/hudi/metadata/HoodieMetadataPayload.java
##########
@@ -109,55 +194,92 @@ private HoodieMetadataPayload(String key, int type, Map<String, HoodieMetadataFi
    */
   public static HoodieRecord<HoodieMetadataPayload> createPartitionListRecord(List<String> partitions) {
     Map<String, HoodieMetadataFileInfo> fileInfo = new HashMap<>();
-    partitions.forEach(partition -> fileInfo.put(partition, new HoodieMetadataFileInfo(0L,  false)));
+    partitions.forEach(partition -> fileInfo.put(partition, new HoodieMetadataFileInfo(0L, false)));
 
     HoodieKey key = new HoodieKey(RECORDKEY_PARTITION_LIST, MetadataPartitionType.FILES.partitionPath());
-    HoodieMetadataPayload payload = new HoodieMetadataPayload(key.getRecordKey(), PARTITION_LIST, fileInfo);
+    HoodieMetadataPayload payload = new HoodieMetadataPayload(key.getRecordKey(), METADATA_TYPE_PARTITION_LIST,
+        fileInfo);
     return new HoodieRecord<>(key, payload);
   }
 
   /**
    * Create and return a {@code HoodieMetadataPayload} to save list of files within a partition.
    *
-   * @param partition The name of the partition
-   * @param filesAdded Mapping of files to their sizes for files which have been added to this partition
+   * @param partition    The name of the partition
+   * @param filesAdded   Mapping of files to their sizes for files which have been added to this partition
    * @param filesDeleted List of files which have been deleted from this partition
    */
   public static HoodieRecord<HoodieMetadataPayload> createPartitionFilesRecord(String partition,
-                                                                               Option<Map<String, Long>> filesAdded, Option<List<String>> filesDeleted) {
+                                                                               Option<Map<String, Long>> filesAdded,
+                                                                               Option<List<String>> filesDeleted) {
     Map<String, HoodieMetadataFileInfo> fileInfo = new HashMap<>();
     filesAdded.ifPresent(
         m -> m.forEach((filename, size) -> fileInfo.put(filename, new HoodieMetadataFileInfo(size, false))));
     filesDeleted.ifPresent(
-        m -> m.forEach(filename -> fileInfo.put(filename, new HoodieMetadataFileInfo(0L,  true))));
+        m -> m.forEach(filename -> fileInfo.put(filename, new HoodieMetadataFileInfo(0L, true))));
 
     HoodieKey key = new HoodieKey(partition, MetadataPartitionType.FILES.partitionPath());
-    HoodieMetadataPayload payload = new HoodieMetadataPayload(key.getRecordKey(), FILE_LIST, fileInfo);
+    HoodieMetadataPayload payload = new HoodieMetadataPayload(key.getRecordKey(), METADATA_TYPE_FILE_LIST, fileInfo);
     return new HoodieRecord<>(key, payload);
   }
 
+  /**
+   * Create bloom filter metadata record.
+   *
+   * @param fileID      - FileID for which the bloom filter needs to persisted
+   * @param timestamp   - Instant timestamp responsible for this record
+   * @param bloomFilter - Bloom filter for the File
+   * @param isDeleted   - Is the bloom filter no more valid
+   * @return Metadata payload containing the fileID and its bloom filter record
+   */
+  public static HoodieRecord<HoodieMetadataPayload> createBloomFilterMetadataRecord(final PartitionIndexID partitionID,
+                                                                                    final FileIndexID fileID,
+                                                                                    final String timestamp,
+                                                                                    final ByteBuffer bloomFilter,
+                                                                                    final boolean isDeleted) {
+    final String bloomFilterKey = partitionID.asBase64EncodedString().concat(fileID.asBase64EncodedString());
+    HoodieKey key = new HoodieKey(bloomFilterKey, MetadataPartitionType.BLOOM_FILTERS.partitionPath());
+
+    // TODO: Get the bloom filter type from the file
+    HoodieMetadataBloomFilter metadataBloomFilter =
+        new HoodieMetadataBloomFilter(BloomFilterTypeCode.DYNAMIC_V0.name(),
+            timestamp, bloomFilter, isDeleted, ByteBuffer.allocate(0));
+    HoodieMetadataPayload metadataPayload = new HoodieMetadataPayload(key.getRecordKey(),
+        HoodieMetadataPayload.METADATA_TYPE_BLOOM_FILTER, metadataBloomFilter);
+    return new HoodieRecord<>(key, metadataPayload);
+  }
+
   @Override
   public HoodieMetadataPayload preCombine(HoodieMetadataPayload previousRecord) {
     ValidationUtils.checkArgument(previousRecord.type == type,
-        "Cannot combine " + previousRecord.type  + " with " + type);
-
-    Map<String, HoodieMetadataFileInfo> combinedFileInfo = null;
+        "Cannot combine " + previousRecord.type + " with " + type);
 
     switch (type) {
-      case PARTITION_LIST:
-      case FILE_LIST:
-        combinedFileInfo = combineFilesystemMetadata(previousRecord);
-        break;
+      case METADATA_TYPE_PARTITION_LIST:
+      case METADATA_TYPE_FILE_LIST:
+        Map<String, HoodieMetadataFileInfo> combinedFileInfo = combineFilesystemMetadata(previousRecord);
+        return new HoodieMetadataPayload(key, type, combinedFileInfo);
+      case METADATA_TYPE_BLOOM_FILTER:
+        HoodieMetadataBloomFilter combineBloomFilterMetadata = combineBloomFilterMetadata(previousRecord);
+        return new HoodieMetadataPayload(key, type, combineBloomFilterMetadata);
+      case METADATA_TYPE_COLUMN_STATS:
+        return new HoodieMetadataPayload(key, type, combineColumnStats(previousRecord));
       default:
         throw new HoodieMetadataException("Unknown type of HoodieMetadataPayload: " + type);
     }
+  }
 
-    return new HoodieMetadataPayload(key, type, combinedFileInfo);
+  private HoodieMetadataBloomFilter combineBloomFilterMetadata(HoodieMetadataPayload previousRecord) {
+    return this.bloomFilterMetadata;
+  }
+
+  private HoodieColumnStats combineColumnStats(HoodieMetadataPayload previousRecord) {
+    return this.columnStatMetadata;

Review comment:
       if the new record is deleted, still we should be good here right. can you confirm the flow please.

##########
File path: hudi-common/src/main/java/org/apache/hudi/metadata/HoodieTableMetadataUtil.java
##########
@@ -319,9 +635,90 @@ private static void processRollbackMetadata(HoodieActiveTimeline metadataTableTi
     return records;
   }
 
+  /**
+   * Convert rollback action metadata to bloom filter index records.
+   */
+  private static List<HoodieRecord> convertFilesToBloomFilterRecords(HoodieEngineContext engineContext,
+                                                                     HoodieTableMetaClient dataMetaClient,
+                                                                     Map<String, List<String>> partitionToDeletedFiles,
+                                                                     Map<String, Map<String, Long>> partitionToAppendedFiles,
+                                                                     String instantTime) {
+    List<HoodieRecord> records = new LinkedList<>();
+    partitionToDeletedFiles.forEach((partitionName, deletedFileList) -> deletedFileList.forEach(deletedFile -> {
+      if (!FSUtils.isBaseFile(new Path(deletedFile))) {
+        return;
+      }
+
+      final String partition = partitionName.equals(EMPTY_PARTITION_NAME) ? NON_PARTITIONED_NAME : partitionName;
+      records.add(HoodieMetadataPayload.createBloomFilterMetadataRecord(
+          new PartitionIndexID(partition), new FileIndexID(deletedFile),
+          instantTime, ByteBuffer.allocate(0), true));
+    }));
+
+    partitionToAppendedFiles.forEach((partitionName, appendedFileMap) -> {
+      final String partition = partitionName.equals(EMPTY_PARTITION_NAME) ? NON_PARTITIONED_NAME : partitionName;
+      appendedFileMap.forEach((appendedFile, length) -> {
+        if (!FSUtils.isBaseFile(new Path(appendedFile))) {
+          return;
+        }
+        final String pathWithPartition = partitionName + "/" + appendedFile;
+        final Path appendedFilePath = new Path(dataMetaClient.getBasePath(), pathWithPartition);
+        try {
+          HoodieFileReader<IndexedRecord> fileReader =
+              HoodieFileReaderFactory.getFileReader(dataMetaClient.getHadoopConf(), appendedFilePath);
+          final BloomFilter fileBloomFilter = fileReader.readBloomFilter();
+          if (fileBloomFilter == null) {
+            LOG.error("Failed to read bloom filter for " + appendedFilePath);
+            return;
+          }
+          ByteBuffer bloomByteBuffer = ByteBuffer.wrap(fileBloomFilter.serializeToString().getBytes());
+          HoodieRecord record = HoodieMetadataPayload.createBloomFilterMetadataRecord(
+              new PartitionIndexID(partition), new FileIndexID(appendedFile), instantTime,
+              bloomByteBuffer, false);
+          records.add(record);
+          fileReader.close();
+        } catch (IOException e) {
+          LOG.error("Failed to get bloom filter for file: " + appendedFilePath);
+        }
+      });
+    });
+    return records;
+  }
+
+  /**
+   * Convert rollback action metadata to column stats index records.
+   */
+  private static List<HoodieRecord> convertFilesToColumnStatsRecords(HoodieEngineContext engineContext,
+                                                                     HoodieTableMetaClient datasetMetaClient,
+                                                                     Map<String, List<String>> partitionToDeletedFiles,
+                                                                     Map<String, Map<String, Long>> partitionToAppendedFiles,
+                                                                     String instantTime) {
+    List<HoodieRecord> records = new LinkedList<>();
+    List<String> latestColumns = getLatestColumns(datasetMetaClient);
+    partitionToDeletedFiles.forEach((partitionName, deletedFileList) -> deletedFileList.forEach(deletedFile -> {
+      final String partition = partitionName.equals(EMPTY_PARTITION_NAME) ? NON_PARTITIONED_NAME : partitionName;
+      if (deletedFile.endsWith(HoodieFileFormat.PARQUET.getFileExtension())) {
+        final String filePathWithPartition = partitionName + "/" + deletedFile;
+        records.addAll(getColumnStats(partition, filePathWithPartition, datasetMetaClient,

Review comment:
       also, for a deleted file, all we need to do is to add a deleted entry to col stats partition right. we may never need to read the file only. can you please clarify.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@hudi.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org