You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hudi.apache.org by GitBox <gi...@apache.org> on 2022/01/26 08:58:27 UTC

[GitHub] [hudi] prashantwason commented on a change in pull request #4352: [HUDI-1295] Metadata Index - Bloom filter and Column stats index to speed up index lookups

prashantwason commented on a change in pull request #4352:
URL: https://github.com/apache/hudi/pull/4352#discussion_r792019607



##########
File path: hudi-client/hudi-client-common/src/main/java/org/apache/hudi/config/HoodieWriteConfig.java
##########
@@ -1444,6 +1444,14 @@ public boolean useBloomIndexBucketizedChecking() {
     return getBoolean(HoodieIndexConfig.BLOOM_INDEX_BUCKETIZED_CHECKING);
   }
 
+  public boolean isMetadataIndexBloomFilterEnabled() {
+    return isMetadataTableEnabled() && getMetadataConfig().isMetadataIndexBloomFilterEnabled();
+  }
+
+  public boolean isMetadataIndexColumnStatsForAllColumns() {

Review comment:
       No enabled suffix here? For consistency with other such configs, maybe rename to isMetadataColumnStatsIndexEnabled.

##########
File path: hudi-client/hudi-client-common/src/main/java/org/apache/hudi/index/bloom/HoodieBloomIndex.java
##########
@@ -133,30 +144,89 @@ public HoodieBloomIndex(HoodieWriteConfig config, BaseHoodieBloomIndexHelper blo
   /**
    * Load all involved files as <Partition, filename> pair List.
    */
-  List<Pair<String, BloomIndexFileInfo>> loadInvolvedFiles(
+  List<Pair<String, BloomIndexFileInfo>> loadColumnRangesFromFiles(
       List<String> partitions, final HoodieEngineContext context, final HoodieTable hoodieTable) {
     // Obtain the latest data files from all the partitions.
     List<Pair<String, String>> partitionPathFileIDList = getLatestBaseFilesForAllPartitions(partitions, context, hoodieTable).stream()
         .map(pair -> Pair.of(pair.getKey(), pair.getValue().getFileId()))
         .collect(toList());
 
-    if (config.getBloomIndexPruneByRanges()) {
-      // also obtain file ranges, if range pruning is enabled
-      context.setJobStatus(this.getClass().getName(), "Obtain key ranges for file slices (range pruning=on)");
-      return context.map(partitionPathFileIDList, pf -> {
-        try {
-          HoodieRangeInfoHandle rangeInfoHandle = new HoodieRangeInfoHandle(config, hoodieTable, pf);
-          String[] minMaxKeys = rangeInfoHandle.getMinMaxKeys();
-          return Pair.of(pf.getKey(), new BloomIndexFileInfo(pf.getValue(), minMaxKeys[0], minMaxKeys[1]));
-        } catch (MetadataNotFoundException me) {
-          LOG.warn("Unable to find range metadata in file :" + pf);
-          return Pair.of(pf.getKey(), new BloomIndexFileInfo(pf.getValue()));
+    context.setJobStatus(this.getClass().getName(), "Obtain key ranges for file slices (range pruning=on)");
+    return context.map(partitionPathFileIDList, pf -> {
+      try {
+        HoodieRangeInfoHandle rangeInfoHandle = new HoodieRangeInfoHandle(config, hoodieTable, pf);
+        String[] minMaxKeys = rangeInfoHandle.getMinMaxKeys();
+        return Pair.of(pf.getKey(), new BloomIndexFileInfo(pf.getValue(), minMaxKeys[0], minMaxKeys[1]));
+      } catch (MetadataNotFoundException me) {
+        LOG.warn("Unable to find range metadata in file :" + pf);
+        return Pair.of(pf.getKey(), new BloomIndexFileInfo(pf.getValue()));
+      }
+    }, Math.max(partitionPathFileIDList.size(), 1));
+  }
+
+  /**
+   * Get BloomIndexFileInfo for all the latest base files for the requested partitions.
+   *
+   * @param partitions  - List of partitions to get the base files for
+   * @param context     - Engine context
+   * @param hoodieTable - Hoodie Table
+   * @return List of partition and file column range info pairs
+   */
+  private List<Pair<String, BloomIndexFileInfo>> getFileInfoForLatestBaseFiles(
+      List<String> partitions, final HoodieEngineContext context, final HoodieTable hoodieTable) {
+    List<Pair<String, String>> partitionPathFileIDList = getLatestBaseFilesForAllPartitions(partitions, context,
+        hoodieTable).stream()
+        .map(pair -> Pair.of(pair.getKey(), pair.getValue().getFileId()))
+        .collect(toList());
+    return partitionPathFileIDList.stream()
+        .map(pf -> Pair.of(pf.getKey(), new BloomIndexFileInfo(pf.getValue()))).collect(toList());
+  }
+
+  /**
+   * Load the column stats index as BloomIndexFileInfo for all the involved files in the partition.
+   *
+   * @param partitions  - List of partitions for which column stats need to be loaded
+   * @param context     - Engine context
+   * @param hoodieTable - Hoodie table
+   * @return List of partition and file column range info pairs
+   */
+  protected List<Pair<String, BloomIndexFileInfo>> loadColumnRangesFromMetaIndex(
+      List<String> partitions, final HoodieEngineContext context, final HoodieTable hoodieTable) {
+    // also obtain file ranges, if range pruning is enabled
+    context.setJobStatus(this.getClass().getName(), "Load meta index key ranges for file slices");
+
+    final String keyField = hoodieTable.getMetaClient().getTableConfig().getRecordKeyFieldProp();
+    return context.flatMap(partitions, partitionName -> {
+      List<String> partitionFileNameList = HoodieIndexUtils.getLatestBaseFilesForPartition(partitionName,
+              hoodieTable).stream().map(baseFile -> baseFile.getFileName())
+          .collect(toList());
+      try {
+        List<Pair<String, String>> columnStatKeys = new ArrayList<>();

Review comment:
       Simpler way to implement this is to stream this on the partitionFileNameList instead of a for-loop.
   
   HoodieIndexUtils.getLatestBaseFilesForPartition(partitionName, hoodieTable).stream().map(baseFile -> baseFile.getFileName())
       .map(fileName -> new Pair.of(partitionName, fileName))
       .sorted()
       .collect(toList());

##########
File path: hudi-client/hudi-client-common/src/main/java/org/apache/hudi/index/HoodieIndexUtils.java
##########
@@ -101,4 +116,34 @@ public static HoodieRecord getTaggedRecord(HoodieRecord inputRecord, Option<Hood
     }
     return record;
   }
+
+  /**
+   * Given a list of row keys and one file, return only row keys existing in that file.
+   *
+   * @param filePath            - File to filter keys from
+   * @param candidateRecordKeys - Candidate keys to filter
+   * @return List of candidate keys that are available in the file
+   */
+  public static List<String> filterKeysFromFile(Path filePath, List<String> candidateRecordKeys,
+                                                Configuration configuration) throws HoodieIndexException {
+    ValidationUtils.checkArgument(FSUtils.isBaseFile(filePath));
+    List<String> foundRecordKeys = new ArrayList<>();
+    try {
+      // Load all rowKeys from the file, to double-confirm
+      if (!candidateRecordKeys.isEmpty()) {
+        HoodieTimer timer = new HoodieTimer().startTimer();
+        HoodieFileReader fileReader = HoodieFileReaderFactory.getFileReader(configuration, filePath);
+        Set<String> fileRowKeys = fileReader.filterKeys(new TreeSet<>(candidateRecordKeys));

Review comment:
       TreeSet or HashMap here? Lookups are much faster in HashMap

##########
File path: hudi-client/hudi-client-common/src/main/java/org/apache/hudi/metadata/HoodieBackedTableMetadataWriter.java
##########
@@ -128,8 +130,21 @@
     this.dataWriteConfig = writeConfig;
     this.engineContext = engineContext;
     this.hadoopConf = new SerializableConfiguration(hadoopConf);
+    this.metrics = Option.empty();
+    this.enabledPartitionTypes = new ArrayList<>();
 
     if (writeConfig.isMetadataTableEnabled()) {
+      this.enabledPartitionTypes.add(MetadataPartitionType.FILES);
+      if (writeConfig.getMetadataConfig().isMetadataIndexBloomFilterEnabled()) {
+        MetadataPartitionType.BLOOM_FILTERS.setFileGroupCount(
+            writeConfig.getMetadataConfig().getMetadataIndexBloomFilterFileGroupCount());
+        this.enabledPartitionTypes.add(MetadataPartitionType.BLOOM_FILTERS);
+      }
+      if (writeConfig.getMetadataConfig().isMetadataIndexColumnStatsEnabled()) {
+        MetadataPartitionType.COLUMN_STATS.setFileGroupCount(

Review comment:
       Same as above. We should not require this to be specified all the time.

##########
File path: hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/index/bloom/HoodieMetadataBloomIndexCheckFunction.java
##########
@@ -0,0 +1,166 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hudi.index.bloom;
+
+import org.apache.hadoop.fs.Path;
+import org.apache.hudi.client.utils.LazyIterableIterator;
+import org.apache.hudi.common.bloom.BloomFilterTypeCode;
+import org.apache.hudi.common.bloom.HoodieDynamicBoundedBloomFilter;
+import org.apache.hudi.common.fs.FSUtils;
+import org.apache.hudi.common.model.HoodieBaseFile;
+import org.apache.hudi.common.model.HoodieKey;
+import org.apache.hudi.common.util.Option;
+import org.apache.hudi.common.util.ValidationUtils;
+import org.apache.hudi.common.util.collection.Pair;
+import org.apache.hudi.config.HoodieWriteConfig;
+import org.apache.hudi.exception.HoodieIndexException;
+import org.apache.hudi.index.HoodieIndexUtils;
+import org.apache.hudi.io.HoodieKeyLookupResult;
+import org.apache.hudi.table.HoodieTable;
+import org.apache.log4j.LogManager;
+import org.apache.log4j.Logger;
+import org.apache.spark.api.java.function.Function2;
+import scala.Tuple2;
+
+import java.nio.ByteBuffer;
+import java.nio.charset.StandardCharsets;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.atomic.AtomicInteger;
+import java.util.stream.Collectors;
+
+/**
+ * Spark Function2 implementation for checking bloom filters for the
+ * requested keys from the metadata table index. The bloom filter
+ * checking for keys and the actual file verification for the
+ * candidate keys is done in an iterative fashion. In each iteration,
+ * bloom filters are requested for a batch of partition files and the
+ * keys are checked against them.
+ */
+public class HoodieMetadataBloomIndexCheckFunction implements

Review comment:
       I did not get why a new file for metadata specific bloom check is required? 
   
   

##########
File path: hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/metadata/SparkHoodieBackedTableMetadataWriter.java
##########
@@ -171,17 +172,34 @@ protected void commit(HoodieData<HoodieRecord> hoodieDataRecords, String partiti
 
   /**
    * Tag each record with the location in the given partition.
-   *
+   * <p>
    * The record is tagged with respective file slice's location based on its record key.
    */
-  private JavaRDD<HoodieRecord> prepRecords(JavaRDD<HoodieRecord> recordsRDD, String partitionName, int numFileGroups) {
-    List<FileSlice> fileSlices = HoodieTableMetadataUtil.getPartitionLatestFileSlices(metadataMetaClient, partitionName);
-    ValidationUtils.checkArgument(fileSlices.size() == numFileGroups, String.format("Invalid number of file groups: found=%d, required=%d", fileSlices.size(), numFileGroups));
-
-    return recordsRDD.map(r -> {
-      FileSlice slice = fileSlices.get(HoodieTableMetadataUtil.mapRecordKeyToFileGroupIndex(r.getRecordKey(), numFileGroups));
-      r.setCurrentLocation(new HoodieRecordLocation(slice.getBaseInstantTime(), slice.getFileId()));
-      return r;
-    });
+  private JavaRDD<HoodieRecord> prepRecords(Map<MetadataPartitionType, HoodieData<HoodieRecord>> partitionRecordsMap) {

Review comment:
       This can be moved to the base class. Some ideas applicable here:
     1. HoodieData.map() to tag records instead of RDD.map
     2. HoodieData.union() instead of RDD.union
   
   If so, you dont need a flink specific implementation too.

##########
File path: hudi-common/src/main/java/org/apache/hudi/metadata/BaseTableMetadata.java
##########
@@ -146,12 +156,119 @@ protected BaseTableMetadata(HoodieEngineContext engineContext, HoodieMetadataCon
         .getAllFilesInPartitions(partitions);
   }
 
+  @Override
+  public Option<ByteBuffer> getBloomFilter(final String partitionName, final String fileName)
+      throws HoodieMetadataException {
+    if (!isMetaIndexBloomFilterEnabled) {
+      LOG.error("Meta index for bloom filters is disabled!");
+      return Option.empty();
+    }
+
+    final Pair<String, String> partitionFileName = Pair.of(partitionName, fileName);
+    Map<Pair<String, String>, ByteBuffer> bloomFilters = getBloomFilters(Collections.singletonList(partitionFileName));
+    if (bloomFilters.isEmpty()) {
+      LOG.error("Meta index: missing bloom filter for partition: " + partitionName + ", file: " + fileName);
+      return Option.empty();
+    }
+
+    ValidationUtils.checkState(bloomFilters.containsKey(partitionFileName));
+    return Option.of(bloomFilters.get(partitionFileName));
+  }
+
+  @Override
+  public Map<Pair<String, String>, ByteBuffer> getBloomFilters(final List<Pair<String, String>> partitionNameFileNameList)
+      throws HoodieMetadataException {
+    if (!isMetaIndexBloomFilterEnabled) {
+      LOG.error("Meta index for bloom filter is disabled!");
+      return Collections.emptyMap();
+    }
+
+    HoodieTimer timer = new HoodieTimer().startTimer();
+    Set<String> partitionIDFileIDSortedStrings = new TreeSet<>();
+    Map<String, Pair<String, String>> fileToKeyMap = new HashMap<>();
+    partitionNameFileNameList.forEach(partitionNameFileNamePair -> {
+          final String bloomKey = new PartitionIndexID(partitionNameFileNamePair.getLeft()).asBase64EncodedString()
+              .concat(new FileIndexID(partitionNameFileNamePair.getRight()).asBase64EncodedString());
+          partitionIDFileIDSortedStrings.add(bloomKey);
+          fileToKeyMap.put(bloomKey, partitionNameFileNamePair);
+        }
+    );
+
+    List<String> partitionIDFileIDStrings = new ArrayList<>(partitionIDFileIDSortedStrings);
+    List<Pair<String, Option<HoodieRecord<HoodieMetadataPayload>>>> hoodieRecordList =
+        getRecordsByKeys(partitionIDFileIDStrings, MetadataPartitionType.BLOOM_FILTERS.getPartitionPath());
+    metrics.ifPresent(m -> m.updateMetrics(HoodieMetadataMetrics.LOOKUP_BLOOM_FILTERS_METADATA_STR, timer.endTimer()));
+
+    Map<Pair<String, String>, ByteBuffer> partitionFileToBloomFilterMap = new HashMap<>();
+    for (final Pair<String, Option<HoodieRecord<HoodieMetadataPayload>>> entry : hoodieRecordList) {
+      if (entry.getRight().isPresent()) {
+        final Option<HoodieMetadataBloomFilter> bloomFilterMetadata =
+            entry.getRight().get().getData().getBloomFilterMetadata();
+        if (bloomFilterMetadata.isPresent()) {
+          if (!bloomFilterMetadata.get().getIsDeleted()) {
+            ValidationUtils.checkState(fileToKeyMap.containsKey(entry.getLeft()));
+            partitionFileToBloomFilterMap.put(fileToKeyMap.get(entry.getLeft()), bloomFilterMetadata.get().getBloomFilter());

Review comment:
       Shoudn't we return the actual bloom filter implementation here instead of the ByteArray?
   
   I see that in the schema you also save "Bloom filter type code" which means that different types of bloom filters are possible. So how will the caller know which bloom filter type to instantiate using the byte array?

##########
File path: hudi-common/src/main/java/org/apache/hudi/io/storage/HoodieHFileReader.java
##########
@@ -133,21 +138,43 @@ public BloomFilter readBloomFilter() {
 
   @Override
   public Set<String> filterRowKeys(Set candidateRowKeys) {
-    // Current implementation reads all records and filters them. In certain cases, it many be better to:
-    //  1. Scan a limited subset of keys (min/max range of candidateRowKeys)
-    //  2. Lookup keys individually (if the size of candidateRowKeys is much less than the total keys in file)
     try {
-      List<Pair<String, R>> allRecords = readAllRecords();
-      Set<String> rowKeys = new HashSet<>();
-      allRecords.forEach(t -> {
-        if (candidateRowKeys.contains(t.getFirst())) {
-          rowKeys.add(t.getFirst());
-        }
-      });
-      return rowKeys;
+      return filterKeys(new TreeSet<>(candidateRowKeys));
     } catch (IOException e) {
-      throw new HoodieIOException("Failed to read row keys from " + path, e);
+      LOG.error("Failed to fetch keys from " + path);
+    }
+    return Collections.emptySet();
+  }
+
+  @Override
+  public Set<String> filterKeys(TreeSet<String> sortedCandidateRowKeys) throws IOException {
+    return filterRecordsImpl(sortedCandidateRowKeys).keySet();

Review comment:
       There is a faster way to simply filter keys - when scanning to a key, if the key does not exist then the scan fails. So there is no need to read the entire records.

##########
File path: hudi-client/hudi-client-common/src/main/java/org/apache/hudi/index/bloom/HoodieBloomIndex.java
##########
@@ -133,30 +144,89 @@ public HoodieBloomIndex(HoodieWriteConfig config, BaseHoodieBloomIndexHelper blo
   /**
    * Load all involved files as <Partition, filename> pair List.
    */
-  List<Pair<String, BloomIndexFileInfo>> loadInvolvedFiles(
+  List<Pair<String, BloomIndexFileInfo>> loadColumnRangesFromFiles(
       List<String> partitions, final HoodieEngineContext context, final HoodieTable hoodieTable) {
     // Obtain the latest data files from all the partitions.
     List<Pair<String, String>> partitionPathFileIDList = getLatestBaseFilesForAllPartitions(partitions, context, hoodieTable).stream()
         .map(pair -> Pair.of(pair.getKey(), pair.getValue().getFileId()))
         .collect(toList());
 
-    if (config.getBloomIndexPruneByRanges()) {
-      // also obtain file ranges, if range pruning is enabled
-      context.setJobStatus(this.getClass().getName(), "Obtain key ranges for file slices (range pruning=on)");
-      return context.map(partitionPathFileIDList, pf -> {
-        try {
-          HoodieRangeInfoHandle rangeInfoHandle = new HoodieRangeInfoHandle(config, hoodieTable, pf);
-          String[] minMaxKeys = rangeInfoHandle.getMinMaxKeys();
-          return Pair.of(pf.getKey(), new BloomIndexFileInfo(pf.getValue(), minMaxKeys[0], minMaxKeys[1]));
-        } catch (MetadataNotFoundException me) {
-          LOG.warn("Unable to find range metadata in file :" + pf);
-          return Pair.of(pf.getKey(), new BloomIndexFileInfo(pf.getValue()));
+    context.setJobStatus(this.getClass().getName(), "Obtain key ranges for file slices (range pruning=on)");
+    return context.map(partitionPathFileIDList, pf -> {
+      try {
+        HoodieRangeInfoHandle rangeInfoHandle = new HoodieRangeInfoHandle(config, hoodieTable, pf);
+        String[] minMaxKeys = rangeInfoHandle.getMinMaxKeys();
+        return Pair.of(pf.getKey(), new BloomIndexFileInfo(pf.getValue(), minMaxKeys[0], minMaxKeys[1]));
+      } catch (MetadataNotFoundException me) {
+        LOG.warn("Unable to find range metadata in file :" + pf);
+        return Pair.of(pf.getKey(), new BloomIndexFileInfo(pf.getValue()));
+      }
+    }, Math.max(partitionPathFileIDList.size(), 1));
+  }
+
+  /**
+   * Get BloomIndexFileInfo for all the latest base files for the requested partitions.
+   *
+   * @param partitions  - List of partitions to get the base files for
+   * @param context     - Engine context
+   * @param hoodieTable - Hoodie Table
+   * @return List of partition and file column range info pairs
+   */
+  private List<Pair<String, BloomIndexFileInfo>> getFileInfoForLatestBaseFiles(
+      List<String> partitions, final HoodieEngineContext context, final HoodieTable hoodieTable) {
+    List<Pair<String, String>> partitionPathFileIDList = getLatestBaseFilesForAllPartitions(partitions, context,
+        hoodieTable).stream()
+        .map(pair -> Pair.of(pair.getKey(), pair.getValue().getFileId()))
+        .collect(toList());
+    return partitionPathFileIDList.stream()
+        .map(pf -> Pair.of(pf.getKey(), new BloomIndexFileInfo(pf.getValue()))).collect(toList());
+  }
+
+  /**
+   * Load the column stats index as BloomIndexFileInfo for all the involved files in the partition.
+   *
+   * @param partitions  - List of partitions for which column stats need to be loaded
+   * @param context     - Engine context
+   * @param hoodieTable - Hoodie table
+   * @return List of partition and file column range info pairs
+   */
+  protected List<Pair<String, BloomIndexFileInfo>> loadColumnRangesFromMetaIndex(
+      List<String> partitions, final HoodieEngineContext context, final HoodieTable hoodieTable) {
+    // also obtain file ranges, if range pruning is enabled
+    context.setJobStatus(this.getClass().getName(), "Load meta index key ranges for file slices");
+
+    final String keyField = hoodieTable.getMetaClient().getTableConfig().getRecordKeyFieldProp();
+    return context.flatMap(partitions, partitionName -> {
+      List<String> partitionFileNameList = HoodieIndexUtils.getLatestBaseFilesForPartition(partitionName,

Review comment:
       is this also coming from the metadata table itself?
   
   

##########
File path: hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/metadata/FlinkHoodieBackedTableMetadataWriter.java
##########
@@ -150,18 +152,30 @@ protected void commit(HoodieData<HoodieRecord> hoodieDataRecords, String partiti
 
   /**
    * Tag each record with the location in the given partition.
-   *
+   * <p>
    * The record is tagged with respective file slice's location based on its record key.
    */
-  private List<HoodieRecord> prepRecords(List<HoodieRecord> records, String partitionName, int numFileGroups) {
-    List<FileSlice> fileSlices = HoodieTableMetadataUtil.getPartitionLatestFileSlices(metadataMetaClient, partitionName);
-    ValidationUtils.checkArgument(fileSlices.size() == numFileGroups, String.format("Invalid number of file groups: found=%d, required=%d", fileSlices.size(), numFileGroups));
-
-    return records.stream().map(r -> {
-      FileSlice slice = fileSlices.get(HoodieTableMetadataUtil.mapRecordKeyToFileGroupIndex(r.getRecordKey(), numFileGroups));
-      final String instantTime = slice.isEmpty() ? "I" : "U";
-      r.setCurrentLocation(new HoodieRecordLocation(instantTime, slice.getFileId()));
-      return r;
-    }).collect(Collectors.toList());
+  private List<HoodieRecord> prepRecords(Map<MetadataPartitionType, HoodieData<HoodieRecord>> partitionRecordsMap) {

Review comment:
       Can we move this function to the base class as it does not seem to have any flink specific handling? 
   
   HoodieData allows iterating over all records for tagging location.

##########
File path: hudi-client/hudi-client-common/src/main/java/org/apache/hudi/metadata/HoodieBackedTableMetadataWriter.java
##########
@@ -460,7 +478,7 @@ private boolean bootstrapFromFilesystem(HoodieEngineContext engineContext, Hoodi
         .initTable(hadoopConf.get(), metadataWriteConfig.getBasePath());
 
     initTableMetadata();
-    initializeFileGroups(dataMetaClient, MetadataPartitionType.FILES, createInstantTime, 1);
+    initializeEnabledFileGroups(dataMetaClient, createInstantTime);

Review comment:
       Mght be better to do this when bootstrapping each individual type. For existing tables, it may be that the various partition types will be enabled one at a time. Either way, we will need to handle enabling after metadata table has already been created so having a single way to initialize may save duplication.

##########
File path: hudi-client/hudi-client-common/src/main/java/org/apache/hudi/config/HoodieWriteConfig.java
##########
@@ -1444,6 +1444,14 @@ public boolean useBloomIndexBucketizedChecking() {
     return getBoolean(HoodieIndexConfig.BLOOM_INDEX_BUCKETIZED_CHECKING);
   }
 
+  public boolean isMetadataIndexBloomFilterEnabled() {

Review comment:
       More descriptive if you rename to isMetadataBloomFilterIndexEnabled
   
   It is a BloomFilterIndex so moving the work Index to the end.

##########
File path: hudi-client/hudi-client-common/src/main/java/org/apache/hudi/io/HoodieIOHandle.java
##########
@@ -31,8 +33,8 @@
   protected final FileSystem fs;
   protected final HoodieTable<T, I, K, O> hoodieTable;
 
-  HoodieIOHandle(HoodieWriteConfig config, String instantTime, HoodieTable<T, I, K, O> hoodieTable) {
-    this.instantTime = instantTime;
+  HoodieIOHandle(HoodieWriteConfig config, Option<String> instantTime, HoodieTable<T, I, K, O> hoodieTable) {
+    this.instantTime = instantTime.orElse(StringUtils.EMPTY_STRING);

Review comment:
       Does it have any side effects for the various handles which implement HoodieIOHandle? What does an empty instantTime mean for hoodie handles?

##########
File path: hudi-client/hudi-client-common/src/main/java/org/apache/hudi/metadata/HoodieBackedTableMetadataWriter.java
##########
@@ -128,8 +130,21 @@
     this.dataWriteConfig = writeConfig;
     this.engineContext = engineContext;
     this.hadoopConf = new SerializableConfiguration(hadoopConf);
+    this.metrics = Option.empty();
+    this.enabledPartitionTypes = new ArrayList<>();
 
     if (writeConfig.isMetadataTableEnabled()) {
+      this.enabledPartitionTypes.add(MetadataPartitionType.FILES);
+      if (writeConfig.getMetadataConfig().isMetadataIndexBloomFilterEnabled()) {
+        MetadataPartitionType.BLOOM_FILTERS.setFileGroupCount(

Review comment:
       This has two problems that I see:
   1. What if once the user has created the file groups, the config setting is changed? 
   2. How does the user find the correct setting to be begin with?
   
   I suggest that:
   1. Once the fiel groups count has been used to initialize the file groups, we dont allow it to be changed. Hence, we get that value from the actual partitions in metadata table rather than from configs.
   2. This also frees the user from having to specify a value to initialize the file group. I see no easy to estimate the correct value when there are a large number of files in an existing dataset.

##########
File path: hudi-common/src/main/avro/HoodieMetadata.avsc
##########
@@ -30,27 +30,108 @@
             "doc": "Type of the metadata record",
             "type": "int"
         },
-        {   "name": "filesystemMetadata",
+        {
             "doc": "Contains information about partitions and files within the dataset",
-            "type": ["null", {
-               "type": "map",
-               "values": {
+            "name": "filesystemMetadata",
+            "type": [
+                "null",
+                {
+                    "type": "map",
+                    "values": {
+                        "type": "record",
+                        "name": "HoodieMetadataFileInfo",
+                        "fields": [
+                            {
+                                "name": "size",
+                                "type": "long",
+                                "doc": "Size of the file"
+                            },
+                            {
+                                "name": "isDeleted",
+                                "type": "boolean",
+                                "doc": "True if this file has been deleted"
+                            }
+                        ]
+                    }
+                }
+            ]
+        },
+        {
+            "doc": "Metadata Index of bloom filters for all data files in the user table",
+            "name": "BloomFilterMetadata",
+            "type": [
+                "null",
+                {
+                    "doc": "Data file bloom filter details",
+                    "name": "HoodieMetadataBloomFilter",
                     "type": "record",
-                    "name": "HoodieMetadataFileInfo",
                     "fields": [
                         {
-                            "name": "size",
-                            "type": "long",
-                            "doc": "Size of the file"
+                            "doc": "Bloom filter type code",
+                            "name": "type",
+                            "type": "string"

Review comment:
       can this be int? (doc says bloom filter type code so probably an int) else change the doc to bloom filter type.

##########
File path: hudi-common/src/main/avro/HoodieMetadata.avsc
##########
@@ -30,27 +30,108 @@
             "doc": "Type of the metadata record",
             "type": "int"
         },
-        {   "name": "filesystemMetadata",
+        {
             "doc": "Contains information about partitions and files within the dataset",
-            "type": ["null", {
-               "type": "map",
-               "values": {
+            "name": "filesystemMetadata",
+            "type": [
+                "null",
+                {
+                    "type": "map",
+                    "values": {
+                        "type": "record",
+                        "name": "HoodieMetadataFileInfo",
+                        "fields": [
+                            {
+                                "name": "size",
+                                "type": "long",
+                                "doc": "Size of the file"
+                            },
+                            {
+                                "name": "isDeleted",
+                                "type": "boolean",
+                                "doc": "True if this file has been deleted"
+                            }
+                        ]
+                    }
+                }
+            ]
+        },
+        {
+            "doc": "Metadata Index of bloom filters for all data files in the user table",
+            "name": "BloomFilterMetadata",
+            "type": [
+                "null",
+                {
+                    "doc": "Data file bloom filter details",
+                    "name": "HoodieMetadataBloomFilter",
                     "type": "record",
-                    "name": "HoodieMetadataFileInfo",
                     "fields": [
                         {
-                            "name": "size",
-                            "type": "long",
-                            "doc": "Size of the file"
+                            "doc": "Bloom filter type code",
+                            "name": "type",
+                            "type": "string"
+                        },
+                        {
+                            "doc": "Instant timestamp when this metadata was created/updated",
+                            "name": "timestamp",
+                            "type": "string"
+                        },
+                        {
+                            "doc": "Bloom filter binary byte array",
+                            "name": "bloomFilter",
+                            "type": "bytes"
+                        },
+                        {
+                            "doc": "Bloom filter entry valid/deleted flag",
+                            "name": "isDeleted",
+                            "type": "boolean"
+                        }
+                    ]
+                }
+            ]
+        },
+        {
+            "doc": "Metadata Index of column ranges for all data files in the user table",
+            "name": "ColumnStatsMetadata",
+            "type": [
+                "null",
+                {
+                    "doc": "Data file column ranges details",
+                    "name": "HoodieColumnStats",
+                    "type": "record",
+                    "fields": [
+                        {
+                            "doc": "Minimum value in the range. Based on user data table schema, we can convert this to appropriate type",
+                            "name": "minValue",
+                            "type": [
+                                "null",
+                                "string"
+                            ]
+                        },
+                        {
+                            "doc": "Maximum value in the range. Based on user data table schema, we can convert it to appropriate type",
+                            "name": "maxValue",
+                            "type": [
+                                "null",
+                                "string"
+                            ]
+                        },
+                        {
+                            "doc": "Maximum value in the range. Based on user data table schema, we can convert it to appropriate type",
+                            "name": "nullCount",

Review comment:
       How is nullCount useful for column stats filtering?

##########
File path: hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/metadata/SparkHoodieBackedTableMetadataWriter.java
##########
@@ -171,17 +172,34 @@ protected void commit(HoodieData<HoodieRecord> hoodieDataRecords, String partiti
 
   /**
    * Tag each record with the location in the given partition.
-   *
+   * <p>
    * The record is tagged with respective file slice's location based on its record key.
    */
-  private JavaRDD<HoodieRecord> prepRecords(JavaRDD<HoodieRecord> recordsRDD, String partitionName, int numFileGroups) {
-    List<FileSlice> fileSlices = HoodieTableMetadataUtil.getPartitionLatestFileSlices(metadataMetaClient, partitionName);
-    ValidationUtils.checkArgument(fileSlices.size() == numFileGroups, String.format("Invalid number of file groups: found=%d, required=%d", fileSlices.size(), numFileGroups));
-
-    return recordsRDD.map(r -> {
-      FileSlice slice = fileSlices.get(HoodieTableMetadataUtil.mapRecordKeyToFileGroupIndex(r.getRecordKey(), numFileGroups));
-      r.setCurrentLocation(new HoodieRecordLocation(slice.getBaseInstantTime(), slice.getFileId()));
-      return r;
-    });
+  private JavaRDD<HoodieRecord> prepRecords(Map<MetadataPartitionType, HoodieData<HoodieRecord>> partitionRecordsMap) {
+    // The result set
+    JavaRDD<HoodieRecord> rddAllPartitionRecords = ((HoodieSparkEngineContext) engineContext).getJavaSparkContext().emptyRDD();
+
+    for (Map.Entry<MetadataPartitionType, HoodieData<HoodieRecord>> entry : partitionRecordsMap.entrySet()) {
+      final String partitionName = entry.getKey().getPartitionPath();
+      final int fileGroupCount = entry.getKey().getFileGroupCount();

Review comment:
       I feel this should not be required to be specified simply because it is hard to correctly estimate it across a wide range of datasets. 
   
   We already have the invariant that this value should always match the file group count found via getPartitionLatestFileSlices().

##########
File path: hudi-common/src/main/java/org/apache/hudi/metadata/BaseTableMetadata.java
##########
@@ -63,7 +72,9 @@
   // Directory used for Spillable Map when merging records
   protected final String spillableMapDirectory;
 
-  protected boolean enabled;
+  protected boolean isMetadataTableEnabled;
+  protected boolean isMetaIndexBloomFilterEnabled = false;

Review comment:
       1. This will be hard to keep updating when more and more indexes are added. 
   2. I didnt see where these are set to true.
   3. Just enabling them is not enough as the bootstrap should have also taken place.

##########
File path: hudi-common/src/main/java/org/apache/hudi/common/config/HoodieMetadataConfig.java
##########
@@ -157,6 +197,26 @@ public boolean enabled() {
     return getBoolean(ENABLE);
   }
 
+  public boolean isMetadataIndexBloomFilterEnabled() {
+    return getBooleanOrDefault(ENABLE_METADATA_INDEX_BLOOM_FILTER);
+  }
+
+  public boolean isMetadataIndexColumnStatsEnabled() {
+    return getBooleanOrDefault(ENABLE_METADATA_INDEX_COLUMN_STATS);
+  }
+
+  public boolean isMetadataIndexColumnStatsForAllColumns() {

Review comment:
       suffix of Enabled

##########
File path: hudi-common/src/main/java/org/apache/hudi/common/config/HoodieMetadataConfig.java
##########
@@ -199,6 +259,16 @@ public Builder enable(boolean enable) {
       return this;
     }
 
+    public Builder withMetadataIndexBloomFilter(boolean enable) {

Review comment:
       Since this is MetadataConfig, it is implied that all of these are metadata related. So you may drop the word "Metadata" from these function names to shorten them.

##########
File path: hudi-common/src/main/java/org/apache/hudi/common/config/HoodieMetadataConfig.java
##########
@@ -157,6 +197,26 @@ public boolean enabled() {
     return getBoolean(ENABLE);
   }
 
+  public boolean isMetadataIndexBloomFilterEnabled() {

Review comment:
       Since this is MetadataConfig, it is implied that all of these are metadata related. So you may drop the word "Metadata" from these function names to shorten them.

##########
File path: hudi-common/src/main/avro/HoodieMetadata.avsc
##########
@@ -30,27 +30,108 @@
             "doc": "Type of the metadata record",
             "type": "int"
         },
-        {   "name": "filesystemMetadata",
+        {
             "doc": "Contains information about partitions and files within the dataset",
-            "type": ["null", {
-               "type": "map",
-               "values": {
+            "name": "filesystemMetadata",
+            "type": [
+                "null",
+                {
+                    "type": "map",
+                    "values": {
+                        "type": "record",
+                        "name": "HoodieMetadataFileInfo",
+                        "fields": [
+                            {
+                                "name": "size",
+                                "type": "long",
+                                "doc": "Size of the file"
+                            },
+                            {
+                                "name": "isDeleted",
+                                "type": "boolean",
+                                "doc": "True if this file has been deleted"
+                            }
+                        ]
+                    }
+                }
+            ]
+        },
+        {
+            "doc": "Metadata Index of bloom filters for all data files in the user table",
+            "name": "BloomFilterMetadata",
+            "type": [
+                "null",
+                {
+                    "doc": "Data file bloom filter details",
+                    "name": "HoodieMetadataBloomFilter",
                     "type": "record",
-                    "name": "HoodieMetadataFileInfo",
                     "fields": [
                         {
-                            "name": "size",
-                            "type": "long",
-                            "doc": "Size of the file"
+                            "doc": "Bloom filter type code",
+                            "name": "type",
+                            "type": "string"
+                        },
+                        {
+                            "doc": "Instant timestamp when this metadata was created/updated",
+                            "name": "timestamp",

Review comment:
       Schema evolution allows you to add such field in the future too.

##########
File path: hudi-common/src/main/java/org/apache/hudi/metadata/BaseTableMetadata.java
##########
@@ -146,12 +156,119 @@ protected BaseTableMetadata(HoodieEngineContext engineContext, HoodieMetadataCon
         .getAllFilesInPartitions(partitions);
   }
 
+  @Override
+  public Option<ByteBuffer> getBloomFilter(final String partitionName, final String fileName)
+      throws HoodieMetadataException {
+    if (!isMetaIndexBloomFilterEnabled) {
+      LOG.error("Meta index for bloom filters is disabled!");
+      return Option.empty();
+    }
+
+    final Pair<String, String> partitionFileName = Pair.of(partitionName, fileName);
+    Map<Pair<String, String>, ByteBuffer> bloomFilters = getBloomFilters(Collections.singletonList(partitionFileName));
+    if (bloomFilters.isEmpty()) {
+      LOG.error("Meta index: missing bloom filter for partition: " + partitionName + ", file: " + fileName);
+      return Option.empty();
+    }
+
+    ValidationUtils.checkState(bloomFilters.containsKey(partitionFileName));
+    return Option.of(bloomFilters.get(partitionFileName));
+  }
+
+  @Override
+  public Map<Pair<String, String>, ByteBuffer> getBloomFilters(final List<Pair<String, String>> partitionNameFileNameList)
+      throws HoodieMetadataException {
+    if (!isMetaIndexBloomFilterEnabled) {
+      LOG.error("Meta index for bloom filter is disabled!");
+      return Collections.emptyMap();
+    }
+
+    HoodieTimer timer = new HoodieTimer().startTimer();
+    Set<String> partitionIDFileIDSortedStrings = new TreeSet<>();
+    Map<String, Pair<String, String>> fileToKeyMap = new HashMap<>();
+    partitionNameFileNameList.forEach(partitionNameFileNamePair -> {
+          final String bloomKey = new PartitionIndexID(partitionNameFileNamePair.getLeft()).asBase64EncodedString()
+              .concat(new FileIndexID(partitionNameFileNamePair.getRight()).asBase64EncodedString());
+          partitionIDFileIDSortedStrings.add(bloomKey);
+          fileToKeyMap.put(bloomKey, partitionNameFileNamePair);
+        }
+    );
+
+    List<String> partitionIDFileIDStrings = new ArrayList<>(partitionIDFileIDSortedStrings);
+    List<Pair<String, Option<HoodieRecord<HoodieMetadataPayload>>>> hoodieRecordList =
+        getRecordsByKeys(partitionIDFileIDStrings, MetadataPartitionType.BLOOM_FILTERS.getPartitionPath());
+    metrics.ifPresent(m -> m.updateMetrics(HoodieMetadataMetrics.LOOKUP_BLOOM_FILTERS_METADATA_STR, timer.endTimer()));

Review comment:
       This is the total time to lookup N bloom filters. We should probably be saving the "Average" time here so that this value is useful for comparison against runs.

##########
File path: hudi-common/src/main/java/org/apache/hudi/io/storage/HoodieFileReader.java
##########
@@ -35,6 +37,14 @@
 
   public Set<String> filterRowKeys(Set<String> candidateRowKeys);
 
+  default Map<String, R> getRecordsByKeys(TreeSet<String> sortedCandidateRowKeys) throws IOException {

Review comment:
       Sorting keys is only useful for HFileReader. Since this is an interface, it should only impose the common functionality. I feel List<String> is more suited here and HFileReader should internally sort the keys.
   
   Additionally, wherever these keys are being read from will most probably be returning a List<String>. So there is already a step somewhere to create a TreeSet out of a List<> which is an additional copy. So having HFileReader sort internally is not worse in any way and seems more cleaner to me.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@hudi.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org