You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hudi.apache.org by GitBox <gi...@apache.org> on 2022/01/11 04:02:01 UTC

[GitHub] [hudi] vinothchandar commented on a change in pull request #4352: [HUDI-1295] Metadata Index - Bloom filter and Column stats index to speed up index lookups

vinothchandar commented on a change in pull request #4352:
URL: https://github.com/apache/hudi/pull/4352#discussion_r781737678



##########
File path: hudi-client/hudi-client-common/src/main/java/org/apache/hudi/index/bloom/HoodieBloomIndex.java
##########
@@ -159,6 +170,75 @@ public HoodieBloomIndex(HoodieWriteConfig config, BaseHoodieBloomIndexHelper blo
     }
   }
 
+  /**
+   * Load the column stats index as BloomIndexFileInfo for all the involved files in the partition.
+   *
+   * @param partitions  - List of partitions for which column stats need to be loaded
+   * @param context     - Engine context
+   * @param hoodieTable - Hoodie table
+   * @return List of partition and file column range info pairs
+   */
+  List<Pair<String, BloomIndexFileInfo>> loadColumnStats(
+      List<String> partitions, final HoodieEngineContext context, final HoodieTable hoodieTable) {
+    HoodieTimer timer = new HoodieTimer().startTimer();
+    if (config.getBloomIndexPruneByRanges()) {
+      // also obtain file ranges, if range pruning is enabled
+      context.setJobStatus(this.getClass().getName(), "Obtain key ranges for file slices (range pruning=on)");
+
+      final String keyField = hoodieTable.getMetaClient().getTableConfig().getRecordKeyFieldProp();
+      return context.flatMap(partitions, new SerializableFunction<String, Stream<Pair<String, BloomIndexFileInfo>>>() {
+        @Override
+        public Stream<Pair<String, BloomIndexFileInfo>> apply(String partitionName) throws Exception {
+          final String columnIndexID = new ColumnIndexID(keyField).asBase64EncodedString();

Review comment:
       Need a deeper look, but lets use `id` in naming only when is actually the id and not a field name

##########
File path: hudi-client/hudi-client-common/src/main/java/org/apache/hudi/io/HoodieKeyMetaIndexLookupHandle.java
##########
@@ -0,0 +1,169 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hudi.io;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.Path;
+import org.apache.hudi.common.bloom.BloomFilter;
+import org.apache.hudi.common.bloom.BloomFilterTypeCode;
+import org.apache.hudi.common.bloom.HoodieDynamicBoundedBloomFilter;
+import org.apache.hudi.common.model.HoodieBaseFile;
+import org.apache.hudi.common.model.HoodieRecordPayload;
+import org.apache.hudi.common.model.HoodieTableType;
+import org.apache.hudi.common.util.HoodieTimer;
+import org.apache.hudi.common.util.Option;
+import org.apache.hudi.common.util.collection.Pair;
+import org.apache.hudi.common.util.hash.FileIndexID;
+import org.apache.hudi.common.util.hash.PartitionIndexID;
+import org.apache.hudi.config.HoodieWriteConfig;
+import org.apache.hudi.exception.HoodieIndexException;
+import org.apache.hudi.table.HoodieTable;
+import org.apache.log4j.LogManager;
+import org.apache.log4j.Logger;
+
+import java.nio.ByteBuffer;
+import java.nio.charset.StandardCharsets;
+import java.util.ArrayList;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Set;
+
+/**
+ * Takes a bunch of keys and returns ones that are present in the file group.

Review comment:
       fix doc to talk about metadata table specific details

##########
File path: hudi-client/hudi-client-common/src/main/java/org/apache/hudi/index/bloom/HoodieBloomIndex.java
##########
@@ -159,6 +170,75 @@ public HoodieBloomIndex(HoodieWriteConfig config, BaseHoodieBloomIndexHelper blo
     }
   }
 
+  /**
+   * Load the column stats index as BloomIndexFileInfo for all the involved files in the partition.
+   *
+   * @param partitions  - List of partitions for which column stats need to be loaded
+   * @param context     - Engine context
+   * @param hoodieTable - Hoodie table
+   * @return List of partition and file column range info pairs
+   */
+  List<Pair<String, BloomIndexFileInfo>> loadColumnStats(
+      List<String> partitions, final HoodieEngineContext context, final HoodieTable hoodieTable) {
+    HoodieTimer timer = new HoodieTimer().startTimer();
+    if (config.getBloomIndexPruneByRanges()) {
+      // also obtain file ranges, if range pruning is enabled
+      context.setJobStatus(this.getClass().getName(), "Obtain key ranges for file slices (range pruning=on)");
+
+      final String keyField = hoodieTable.getMetaClient().getTableConfig().getRecordKeyFieldProp();
+      return context.flatMap(partitions, new SerializableFunction<String, Stream<Pair<String, BloomIndexFileInfo>>>() {

Review comment:
       lets write a lambda here, instead of an anonymous class?

##########
File path: hudi-client/hudi-client-common/src/main/java/org/apache/hudi/io/HoodieKeyMetaIndexLookupHandle.java
##########
@@ -0,0 +1,169 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hudi.io;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.Path;
+import org.apache.hudi.common.bloom.BloomFilter;
+import org.apache.hudi.common.bloom.BloomFilterTypeCode;
+import org.apache.hudi.common.bloom.HoodieDynamicBoundedBloomFilter;
+import org.apache.hudi.common.model.HoodieBaseFile;
+import org.apache.hudi.common.model.HoodieRecordPayload;
+import org.apache.hudi.common.model.HoodieTableType;
+import org.apache.hudi.common.util.HoodieTimer;
+import org.apache.hudi.common.util.Option;
+import org.apache.hudi.common.util.collection.Pair;
+import org.apache.hudi.common.util.hash.FileIndexID;
+import org.apache.hudi.common.util.hash.PartitionIndexID;
+import org.apache.hudi.config.HoodieWriteConfig;
+import org.apache.hudi.exception.HoodieIndexException;
+import org.apache.hudi.table.HoodieTable;
+import org.apache.log4j.LogManager;
+import org.apache.log4j.Logger;
+
+import java.nio.ByteBuffer;
+import java.nio.charset.StandardCharsets;
+import java.util.ArrayList;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Set;
+
+/**
+ * Takes a bunch of keys and returns ones that are present in the file group.
+ */
+public class HoodieKeyMetaIndexLookupHandle<T extends HoodieRecordPayload, I, K, O> extends HoodieReadHandle<T,

Review comment:
       Can we share more code with the existing LookupHandle

##########
File path: hudi-client/hudi-client-common/src/main/java/org/apache/hudi/io/HoodieKeyMetaIndexLookupHandle.java
##########
@@ -0,0 +1,169 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hudi.io;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.Path;
+import org.apache.hudi.common.bloom.BloomFilter;
+import org.apache.hudi.common.bloom.BloomFilterTypeCode;
+import org.apache.hudi.common.bloom.HoodieDynamicBoundedBloomFilter;
+import org.apache.hudi.common.model.HoodieBaseFile;
+import org.apache.hudi.common.model.HoodieRecordPayload;
+import org.apache.hudi.common.model.HoodieTableType;
+import org.apache.hudi.common.util.HoodieTimer;
+import org.apache.hudi.common.util.Option;
+import org.apache.hudi.common.util.collection.Pair;
+import org.apache.hudi.common.util.hash.FileIndexID;
+import org.apache.hudi.common.util.hash.PartitionIndexID;
+import org.apache.hudi.config.HoodieWriteConfig;
+import org.apache.hudi.exception.HoodieIndexException;
+import org.apache.hudi.table.HoodieTable;
+import org.apache.log4j.LogManager;
+import org.apache.log4j.Logger;
+
+import java.nio.ByteBuffer;
+import java.nio.charset.StandardCharsets;
+import java.util.ArrayList;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Set;
+
+/**
+ * Takes a bunch of keys and returns ones that are present in the file group.
+ */
+public class HoodieKeyMetaIndexLookupHandle<T extends HoodieRecordPayload, I, K, O> extends HoodieReadHandle<T,
+    I, K, O> {
+
+  private static final Logger LOG = LogManager.getLogger(HoodieKeyMetaIndexLookupHandle.class);
+  private final HoodieTableType tableType;
+  private final BloomFilter bloomFilter;
+  private final List<String> candidateRecordKeys;
+  private long totalKeysChecked;
+
+  public HoodieKeyMetaIndexLookupHandle(HoodieWriteConfig config, HoodieTable<T, I, K, O> hoodieTable,
+                                        Pair<String, String> partitionPathFileIdPair, String fileName) {
+    super(config, null, hoodieTable, partitionPathFileIdPair);
+    this.tableType = hoodieTable.getMetaClient().getTableType();
+    this.candidateRecordKeys = new ArrayList<>();
+    this.totalKeysChecked = 0;
+
+    HoodieTimer timer = new HoodieTimer().startTimer();
+    Option<ByteBuffer> bloomFilterByteBuffer =
+        hoodieTable.getMetadataTable().getBloomFilter(new PartitionIndexID(partitionPathFileIdPair.getLeft()),
+            new FileIndexID(fileName));
+    if (!bloomFilterByteBuffer.isPresent()) {
+      throw new HoodieIndexException("BloomFilter missing for " + fileName);
+    }
+
+    this.bloomFilter =
+        new HoodieDynamicBoundedBloomFilter(StandardCharsets.UTF_8.decode(bloomFilterByteBuffer.get()).toString(),
+            BloomFilterTypeCode.DYNAMIC_V0);
+    LOG.debug(String.format("Read bloom filter from %s,%s, size: %s in %d ms", partitionPathFileIdPair, fileName,
+        bloomFilterByteBuffer.get().array().length, timer.endTimer()));
+  }
+
+  /**
+   * Given a list of row keys and one file, return only row keys existing in that file.
+   */
+  public List<String> checkCandidatesAgainstFile(Configuration configuration, List<String> candidateRecordKeys,

Review comment:
       Can you add comment on the PR to indicate how much of this file is new vs copied over

##########
File path: hudi-client/hudi-client-common/src/main/java/org/apache/hudi/index/bloom/HoodieBloomIndex.java
##########
@@ -159,6 +170,75 @@ public HoodieBloomIndex(HoodieWriteConfig config, BaseHoodieBloomIndexHelper blo
     }
   }
 
+  /**
+   * Load the column stats index as BloomIndexFileInfo for all the involved files in the partition.
+   *
+   * @param partitions  - List of partitions for which column stats need to be loaded
+   * @param context     - Engine context
+   * @param hoodieTable - Hoodie table
+   * @return List of partition and file column range info pairs
+   */
+  List<Pair<String, BloomIndexFileInfo>> loadColumnStats(
+      List<String> partitions, final HoodieEngineContext context, final HoodieTable hoodieTable) {
+    HoodieTimer timer = new HoodieTimer().startTimer();
+    if (config.getBloomIndexPruneByRanges()) {
+      // also obtain file ranges, if range pruning is enabled
+      context.setJobStatus(this.getClass().getName(), "Obtain key ranges for file slices (range pruning=on)");
+
+      final String keyField = hoodieTable.getMetaClient().getTableConfig().getRecordKeyFieldProp();
+      return context.flatMap(partitions, new SerializableFunction<String, Stream<Pair<String, BloomIndexFileInfo>>>() {
+        @Override
+        public Stream<Pair<String, BloomIndexFileInfo>> apply(String partitionName) throws Exception {

Review comment:
       we discussed reading all of this from the driver correct? like fetch the entire list of stats for a key column alone?

##########
File path: hudi-client/hudi-client-common/src/main/java/org/apache/hudi/index/bloom/HoodieBloomIndex.java
##########
@@ -159,6 +170,75 @@ public HoodieBloomIndex(HoodieWriteConfig config, BaseHoodieBloomIndexHelper blo
     }
   }
 
+  /**
+   * Load the column stats index as BloomIndexFileInfo for all the involved files in the partition.
+   *
+   * @param partitions  - List of partitions for which column stats need to be loaded
+   * @param context     - Engine context
+   * @param hoodieTable - Hoodie table
+   * @return List of partition and file column range info pairs
+   */
+  List<Pair<String, BloomIndexFileInfo>> loadColumnStats(
+      List<String> partitions, final HoodieEngineContext context, final HoodieTable hoodieTable) {
+    HoodieTimer timer = new HoodieTimer().startTimer();
+    if (config.getBloomIndexPruneByRanges()) {
+      // also obtain file ranges, if range pruning is enabled
+      context.setJobStatus(this.getClass().getName(), "Obtain key ranges for file slices (range pruning=on)");
+
+      final String keyField = hoodieTable.getMetaClient().getTableConfig().getRecordKeyFieldProp();
+      return context.flatMap(partitions, new SerializableFunction<String, Stream<Pair<String, BloomIndexFileInfo>>>() {
+        @Override
+        public Stream<Pair<String, BloomIndexFileInfo>> apply(String partitionName) throws Exception {
+          final String columnIndexID = new ColumnIndexID(keyField).asBase64EncodedString();
+          final String partitionIndexID = new PartitionIndexID(partitionName).asBase64EncodedString();
+
+          List<Pair<String, String>> partitionFileIdList =
+              HoodieIndexUtils.getLatestBaseFilesForPartition(partitionName,
+                      hoodieTable).stream().map(baseFile -> Pair.of(baseFile.getFileId(), baseFile.getFileName()))
+                  .collect(toList());
+          try {
+            Map<String, String> columnStatKeyToFileIdMap = new HashMap<>();
+            List<String> columnStatKeys = new ArrayList<>();
+            for (Pair<String, String> fileIdFileName : partitionFileIdList) {
+              final String columnStatIndexKey = columnIndexID
+                  .concat(partitionIndexID)
+                  .concat(new FileIndexID(fileIdFileName.getRight()).asBase64EncodedString());
+              columnStatKeys.add(columnStatIndexKey);
+              columnStatKeyToFileIdMap.put(columnStatIndexKey, fileIdFileName.getLeft());
+            }
+            if (columnStatKeys.isEmpty()) {
+              return Stream.empty();
+            }
+
+            Collections.sort(columnStatKeys);
+            Map<String, HoodieColumnStats> columnKeyHashToStatMap = hoodieTable
+                .getMetadataTable().getColumnStats(columnStatKeys);

Review comment:
       This code is doing a lot of munging, object conversion.  Lets move this down into `getColumnStats()`. Callers of `getXXX()` methods of metadata table should not be aware of the id, sorting all these implementation details.

##########
File path: hudi-common/src/main/java/org/apache/hudi/common/config/HoodieMetadataConfig.java
##########
@@ -124,6 +124,37 @@
       .sinceVersion("0.10.0")
       .withDocumentation("Enable full scanning of log files while reading log records. If disabled, hudi does look up of only interested entries.");
 
+  public static final ConfigProperty<Boolean> ENABLE_META_INDEX_BLOOM_FILTER = ConfigProperty

Review comment:
       you are assuming its always the key field that the bloom filter points to. We need to also take another config where user can specify list of columns/fields to track bloom filters for

##########
File path: hudi-client/hudi-client-common/src/main/java/org/apache/hudi/index/bloom/HoodieBloomIndex.java
##########
@@ -159,6 +173,71 @@ public HoodieBloomIndex(HoodieWriteConfig config, BaseHoodieBloomIndexHelper blo
     }
   }
 
+  /**
+   * Load the column stats index as BloomIndexFileInfo for all the involved files in the partition.
+   *
+   * @param partitions  - List of partitions for which column stats need to be loaded
+   * @param context     - Engine context
+   * @param hoodieTable - Hoodie table
+   */
+  List<Pair<String, BloomIndexFileInfo>> loadColumnStats(
+      List<String> partitions, final HoodieEngineContext context, final HoodieTable hoodieTable) {
+    HoodieTimer timer = new HoodieTimer().startTimer();
+    if (config.getBloomIndexPruneByRanges()) {
+      // also obtain file ranges, if range pruning is enabled
+      context.setJobStatus(this.getClass().getName(), "Obtain key ranges for file slices (range pruning=on)");
+
+      final String keyField = hoodieTable.getMetaClient().getTableConfig().getRecordKeyFieldProp();
+      return context.flatMap(partitions, new SerializableFunction<String, Stream<Pair<String, BloomIndexFileInfo>>>() {
+        @Override
+        public Stream<Pair<String, BloomIndexFileInfo>> apply(String partitionName) throws Exception {
+          final String columnIndexID = new ColumnIndexID(keyField).asBase64EncodedString();
+          final String partitionIndexID = new PartitionIndexID(partitionName).asBase64EncodedString();
+
+          List<Pair<String, String>> partitionFileIdList =
+              HoodieIndexUtils.getLatestBaseFilesForPartition(partitionName,
+                      hoodieTable).stream().map(baseFile -> Pair.of(baseFile.getFileId(), baseFile.getFileName()))
+                  .collect(toList());
+          try {
+            Map<String, String> columnStatKeyToFileIdMap = new HashMap<>();
+            List<String> columnStatKeys = new ArrayList<>();
+            for (Pair<String, String> fileIdFileName : partitionFileIdList) {
+              final String columnStatIndexKey = columnIndexID
+                  .concat(partitionIndexID)
+                  .concat(new FileIndexID(fileIdFileName.getLeft()).asBase64EncodedString());
+              columnStatKeys.add(columnStatIndexKey);
+              columnStatKeyToFileIdMap.put(columnStatIndexKey, fileIdFileName.getLeft());
+            }
+            Collections.sort(columnStatKeys);
+
+            Map<String, HoodieColumnStats> columnKeyHashToStatMap = hoodieTable
+                .getMetadataTable().getColumnStats(columnStatKeys);
+            List<Pair<String, BloomIndexFileInfo>> result = new ArrayList<>();
+            for (Map.Entry<String, HoodieColumnStats> entry : columnKeyHashToStatMap.entrySet()) {
+              result.add(Pair.of(partitionName,
+                  new BloomIndexFileInfo(
+                      columnStatKeyToFileIdMap.get(entry.getKey()),
+                      entry.getValue().getMinValue(),
+                      entry.getValue().getMaxValue()
+                  )));
+            }
+            return result.stream();
+          } catch (MetadataNotFoundException me) {
+            throw new HoodieMetadataException("Unable to find column range metadata for partition:" + partitionName, me);
+          }
+        }
+      }, Math.max(partitions.size(), 1));
+    } else {
+      // Obtain the latest data files from all the partitions.
+      List<Pair<String, String>> partitionPathFileIDList = getLatestBaseFilesForAllPartitions(partitions, context,

Review comment:
       +1 

##########
File path: hudi-client/hudi-client-common/src/main/java/org/apache/hudi/io/HoodieKeyMetaIndexLookupHandle.java
##########
@@ -0,0 +1,170 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hudi.io;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.Path;
+import org.apache.hudi.common.bloom.BloomFilter;
+import org.apache.hudi.common.bloom.BloomFilterTypeCode;
+import org.apache.hudi.common.bloom.HoodieDynamicBoundedBloomFilter;
+import org.apache.hudi.common.model.HoodieBaseFile;
+import org.apache.hudi.common.model.HoodieRecordPayload;
+import org.apache.hudi.common.model.HoodieTableType;
+import org.apache.hudi.common.util.HoodieTimer;
+import org.apache.hudi.common.util.Option;
+import org.apache.hudi.common.util.collection.Pair;
+import org.apache.hudi.common.util.hash.FileIndexID;
+import org.apache.hudi.common.util.hash.PartitionIndexID;
+import org.apache.hudi.config.HoodieWriteConfig;
+import org.apache.hudi.exception.HoodieIndexException;
+import org.apache.hudi.table.HoodieTable;
+import org.apache.log4j.LogManager;
+import org.apache.log4j.Logger;
+
+import java.nio.ByteBuffer;
+import java.nio.charset.StandardCharsets;
+import java.util.ArrayList;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Set;
+
+/**
+ * Takes a bunch of keys and returns ones that are present in the file group.
+ */
+public class HoodieKeyMetaIndexLookupHandle<T extends HoodieRecordPayload, I, K, O> extends HoodieReadHandle<T,
+    I, K, O> {
+
+  private static final Logger LOG = LogManager.getLogger(HoodieKeyMetaIndexLookupHandle.class);
+  private final HoodieTableType tableType;
+  private final BloomFilter bloomFilter;
+  private final List<String> candidateRecordKeys;
+  private long totalKeysChecked;
+
+  public HoodieKeyMetaIndexLookupHandle(HoodieWriteConfig config, HoodieTable<T, I, K, O> hoodieTable,
+                                        Pair<String, String> partitionPathFileIdPair, String fileId) {
+    super(config, null, hoodieTable, partitionPathFileIdPair);
+    this.tableType = hoodieTable.getMetaClient().getTableType();
+    this.candidateRecordKeys = new ArrayList<>();
+    this.totalKeysChecked = 0;
+
+    HoodieTimer timer = new HoodieTimer().startTimer();
+    Option<ByteBuffer> bloomFilterByteBuffer =

Review comment:
       +1. its duplicating a lot at this point

##########
File path: hudi-common/src/main/avro/HoodieMetadata.avsc
##########
@@ -30,27 +30,118 @@
             "doc": "Type of the metadata record",
             "type": "int"
         },
-        {   "name": "filesystemMetadata",
+        {
             "doc": "Contains information about partitions and files within the dataset",
-            "type": ["null", {
-               "type": "map",
-               "values": {
+            "name": "filesystemMetadata",
+            "type": [
+                "null",
+                {
+                    "type": "map",
+                    "values": {
+                        "type": "record",
+                        "name": "HoodieMetadataFileInfo",
+                        "fields": [
+                            {
+                                "name": "size",
+                                "type": "long",
+                                "doc": "Size of the file"
+                            },
+                            {
+                                "name": "isDeleted",
+                                "type": "boolean",
+                                "doc": "True if this file has been deleted"
+                            }
+                        ]
+                    }
+                }
+            ]
+        },
+        {
+            "doc": "Metadata Index of bloom filters for all data files in the user table",
+            "name": "BloomFilterMetadata",
+            "type": [
+                "null",
+                {
+                    "doc": "Data file bloom filter details",
+                    "name": "HoodieMetadataBloomFilter",
                     "type": "record",
-                    "name": "HoodieMetadataFileInfo",
                     "fields": [
                         {
-                            "name": "size",
-                            "type": "long",
-                            "doc": "Size of the file"
+                            "doc": "Bloom filter type code",
+                            "name": "type",
+                            "type": "string"
+                        },
+                        {
+                            "doc": "Instant timestamp when this metadata was created/updated",
+                            "name": "timestamp",
+                            "type": "string"
+                        },
+                        {
+                            "doc": "Bloom filter binary byte array",
+                            "name": "bloomFilter",
+                            "type": "bytes"
                         },
                         {
+                            "doc": "Bloom filter entry valid/deleted flag",
                             "name": "isDeleted",
-                            "type": "boolean",
-                            "doc": "True if this file has been deleted"
+                            "type": "boolean"
+                        },
+                        {
+                            "doc": "Reserved bytes for future use",
+                            "name": "reserved",

Review comment:
       we can evolve the schema right?

##########
File path: hudi-common/src/main/avro/HoodieMetadata.avsc
##########
@@ -30,27 +30,118 @@
             "doc": "Type of the metadata record",
             "type": "int"
         },
-        {   "name": "filesystemMetadata",
+        {
             "doc": "Contains information about partitions and files within the dataset",
-            "type": ["null", {
-               "type": "map",
-               "values": {
+            "name": "filesystemMetadata",
+            "type": [
+                "null",
+                {
+                    "type": "map",
+                    "values": {
+                        "type": "record",
+                        "name": "HoodieMetadataFileInfo",
+                        "fields": [
+                            {
+                                "name": "size",
+                                "type": "long",
+                                "doc": "Size of the file"
+                            },
+                            {
+                                "name": "isDeleted",
+                                "type": "boolean",
+                                "doc": "True if this file has been deleted"
+                            }
+                        ]
+                    }
+                }
+            ]
+        },
+        {
+            "doc": "Metadata Index of bloom filters for all data files in the user table",
+            "name": "BloomFilterMetadata",
+            "type": [
+                "null",
+                {
+                    "doc": "Data file bloom filter details",
+                    "name": "HoodieMetadataBloomFilter",
                     "type": "record",
-                    "name": "HoodieMetadataFileInfo",
                     "fields": [
                         {
-                            "name": "size",
-                            "type": "long",
-                            "doc": "Size of the file"
+                            "doc": "Bloom filter type code",
+                            "name": "type",
+                            "type": "string"
+                        },
+                        {
+                            "doc": "Instant timestamp when this metadata was created/updated",
+                            "name": "timestamp",
+                            "type": "string"
+                        },
+                        {
+                            "doc": "Bloom filter binary byte array",
+                            "name": "bloomFilter",
+                            "type": "bytes"
                         },
                         {
+                            "doc": "Bloom filter entry valid/deleted flag",
                             "name": "isDeleted",
-                            "type": "boolean",
-                            "doc": "True if this file has been deleted"
+                            "type": "boolean"
+                        },
+                        {
+                            "doc": "Reserved bytes for future use",
+                            "name": "reserved",
+                            "type": "bytes"
+                        }
+                    ]
+                }
+            ]
+        },
+        {
+            "doc": "Metadata Index of column ranges for all data files in the user table",
+            "name": "ColumnStatsMetadata",
+            "type": [
+                "null",
+                {
+                    "doc": "Data file column ranges details",
+                    "name": "HoodieColumnStats",
+                    "type": "record",
+                    "fields": [
+                        {
+                            "doc": "Minimum value in the range. Based on user data table schema, we can convert this to appropriate type",
+                            "name": "minValue",
+                            "type": [
+                                "null",
+                                "string"
+                            ]
+                        },
+                        {
+                            "doc": "Maximum value in the range. Based on user data table schema, we can convert it to appropriate type",
+                            "name": "maxValue",
+                            "type": [
+                                "null",
+                                "string"
+                            ]
+                        },
+                        {
+                            "doc": "Maximum value in the range. Based on user data table schema, we can convert it to appropriate type",

Review comment:
       yes lets get them all added in one shot

##########
File path: hudi-client/hudi-client-common/src/main/java/org/apache/hudi/io/HoodieKeyMetaIndexLookupHandle.java
##########
@@ -0,0 +1,169 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hudi.io;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.Path;
+import org.apache.hudi.common.bloom.BloomFilter;
+import org.apache.hudi.common.bloom.BloomFilterTypeCode;
+import org.apache.hudi.common.bloom.HoodieDynamicBoundedBloomFilter;
+import org.apache.hudi.common.model.HoodieBaseFile;
+import org.apache.hudi.common.model.HoodieRecordPayload;
+import org.apache.hudi.common.model.HoodieTableType;
+import org.apache.hudi.common.util.HoodieTimer;
+import org.apache.hudi.common.util.Option;
+import org.apache.hudi.common.util.collection.Pair;
+import org.apache.hudi.common.util.hash.FileIndexID;
+import org.apache.hudi.common.util.hash.PartitionIndexID;
+import org.apache.hudi.config.HoodieWriteConfig;
+import org.apache.hudi.exception.HoodieIndexException;
+import org.apache.hudi.table.HoodieTable;
+import org.apache.log4j.LogManager;
+import org.apache.log4j.Logger;
+
+import java.nio.ByteBuffer;
+import java.nio.charset.StandardCharsets;
+import java.util.ArrayList;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Set;
+
+/**
+ * Takes a bunch of keys and returns ones that are present in the file group.
+ */
+public class HoodieKeyMetaIndexLookupHandle<T extends HoodieRecordPayload, I, K, O> extends HoodieReadHandle<T,
+    I, K, O> {
+
+  private static final Logger LOG = LogManager.getLogger(HoodieKeyMetaIndexLookupHandle.class);
+  private final HoodieTableType tableType;
+  private final BloomFilter bloomFilter;
+  private final List<String> candidateRecordKeys;
+  private long totalKeysChecked;
+
+  public HoodieKeyMetaIndexLookupHandle(HoodieWriteConfig config, HoodieTable<T, I, K, O> hoodieTable,
+                                        Pair<String, String> partitionPathFileIdPair, String fileName) {
+    super(config, null, hoodieTable, partitionPathFileIdPair);
+    this.tableType = hoodieTable.getMetaClient().getTableType();
+    this.candidateRecordKeys = new ArrayList<>();
+    this.totalKeysChecked = 0;
+
+    HoodieTimer timer = new HoodieTimer().startTimer();
+    Option<ByteBuffer> bloomFilterByteBuffer =
+        hoodieTable.getMetadataTable().getBloomFilter(new PartitionIndexID(partitionPathFileIdPair.getLeft()),

Review comment:
       again, up level these APIs

##########
File path: hudi-client/hudi-client-common/src/main/java/org/apache/hudi/io/HoodieKeyMetaIndexLookupResult.java
##########
@@ -0,0 +1,57 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hudi.io;
+
+import java.util.List;
+
+/**
+ * Encapsulates the result from a key lookup.
+ */
+public class HoodieKeyMetaIndexLookupResult {

Review comment:
       again can we reuse code

##########
File path: hudi-common/src/main/avro/HoodieMetadata.avsc
##########
@@ -30,27 +30,118 @@
             "doc": "Type of the metadata record",
             "type": "int"
         },
-        {   "name": "filesystemMetadata",
+        {
             "doc": "Contains information about partitions and files within the dataset",
-            "type": ["null", {
-               "type": "map",
-               "values": {
+            "name": "filesystemMetadata",
+            "type": [
+                "null",
+                {
+                    "type": "map",
+                    "values": {
+                        "type": "record",
+                        "name": "HoodieMetadataFileInfo",
+                        "fields": [
+                            {
+                                "name": "size",
+                                "type": "long",
+                                "doc": "Size of the file"
+                            },
+                            {
+                                "name": "isDeleted",
+                                "type": "boolean",
+                                "doc": "True if this file has been deleted"
+                            }
+                        ]
+                    }
+                }
+            ]
+        },
+        {
+            "doc": "Metadata Index of bloom filters for all data files in the user table",
+            "name": "BloomFilterMetadata",
+            "type": [
+                "null",
+                {
+                    "doc": "Data file bloom filter details",
+                    "name": "HoodieMetadataBloomFilter",
                     "type": "record",
-                    "name": "HoodieMetadataFileInfo",
                     "fields": [
                         {
-                            "name": "size",
-                            "type": "long",
-                            "doc": "Size of the file"
+                            "doc": "Bloom filter type code",
+                            "name": "type",
+                            "type": "string"
+                        },
+                        {
+                            "doc": "Instant timestamp when this metadata was created/updated",
+                            "name": "timestamp",
+                            "type": "string"
+                        },
+                        {
+                            "doc": "Bloom filter binary byte array",
+                            "name": "bloomFilter",
+                            "type": "bytes"
                         },
                         {
+                            "doc": "Bloom filter entry valid/deleted flag",
                             "name": "isDeleted",
-                            "type": "boolean",
-                            "doc": "True if this file has been deleted"
+                            "type": "boolean"
+                        },
+                        {
+                            "doc": "Reserved bytes for future use",
+                            "name": "reserved",
+                            "type": "bytes"
+                        }
+                    ]
+                }
+            ]
+        },
+        {
+            "doc": "Metadata Index of column ranges for all data files in the user table",
+            "name": "ColumnStatsMetadata",
+            "type": [
+                "null",
+                {
+                    "doc": "Data file column ranges details",
+                    "name": "HoodieColumnStats",
+                    "type": "record",
+                    "fields": [
+                        {
+                            "doc": "Minimum value in the range. Based on user data table schema, we can convert this to appropriate type",
+                            "name": "minValue",
+                            "type": [
+                                "null",
+                                "string"
+                            ]
+                        },
+                        {
+                            "doc": "Maximum value in the range. Based on user data table schema, we can convert it to appropriate type",
+                            "name": "maxValue",
+                            "type": [
+                                "null",
+                                "string"
+                            ]
+                        },
+                        {
+                            "doc": "Maximum value in the range. Based on user data table schema, we can convert it to appropriate type",
+                            "name": "nullCount",
+                            "type": [
+                                "null",
+                                "long"
+                            ]
+                        },
+                        {
+                            "doc": "Column range entry valid/deleted flag",
+                            "name": "isDeleted",
+                            "type": "boolean"
+                        },
+                        {
+                            "doc": "Reserved bits for future use",

Review comment:
       again why do we need this?

##########
File path: hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/metadata/FlinkHoodieBackedTableMetadataWriter.java
##########
@@ -101,10 +102,11 @@ protected void initRegistry() {
   }
 
   @Override
-  protected void commit(HoodieData<HoodieRecord> hoodieDataRecords, String partitionName, String instantTime, boolean canTriggerTableService) {
+  protected void commit(String instantTime, Map<MetadataPartitionType, HoodieData<HoodieRecord>> partitionRecordsMap,

Review comment:
       We should see if we can reduce the code duplication here between Flink and Spark writers.. now that we have the `HoodieData` abstraction. land in it a separate PR. 

##########
File path: hudi-client/hudi-client-common/src/main/java/org/apache/hudi/metadata/HoodieBackedTableMetadataWriter.java
##########
@@ -109,6 +109,7 @@
   protected boolean enabled;
   protected SerializableConfiguration hadoopConf;
   protected final transient HoodieEngineContext engineContext;
+  protected final List<MetadataPartitionType> enabledPartitionTypes;

Review comment:
       We should track `enabledPartitions` not just partition types. i.e this layer should have the ability for writers to create even multiple bloom filter partitions, to track say a secondary key's bloom filters as well




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@hudi.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org