You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hudi.apache.org by GitBox <gi...@apache.org> on 2022/01/18 11:17:13 UTC

[GitHub] [hudi] codope commented on a change in pull request #4352: [HUDI-1295] Metadata Index - Bloom filter and Column stats index to speed up index lookups

codope commented on a change in pull request #4352:
URL: https://github.com/apache/hudi/pull/4352#discussion_r786556671



##########
File path: hudi-client/hudi-client-common/src/main/java/org/apache/hudi/index/bloom/HoodieBloomIndex.java
##########
@@ -133,30 +144,89 @@ public HoodieBloomIndex(HoodieWriteConfig config, BaseHoodieBloomIndexHelper blo
   /**
    * Load all involved files as <Partition, filename> pair List.
    */
-  List<Pair<String, BloomIndexFileInfo>> loadInvolvedFiles(
+  List<Pair<String, BloomIndexFileInfo>> loadColumnRangesFromFiles(
       List<String> partitions, final HoodieEngineContext context, final HoodieTable hoodieTable) {
     // Obtain the latest data files from all the partitions.
     List<Pair<String, String>> partitionPathFileIDList = getLatestBaseFilesForAllPartitions(partitions, context, hoodieTable).stream()
         .map(pair -> Pair.of(pair.getKey(), pair.getValue().getFileId()))
         .collect(toList());
 
-    if (config.getBloomIndexPruneByRanges()) {
-      // also obtain file ranges, if range pruning is enabled
-      context.setJobStatus(this.getClass().getName(), "Obtain key ranges for file slices (range pruning=on)");
-      return context.map(partitionPathFileIDList, pf -> {
-        try {
-          HoodieRangeInfoHandle rangeInfoHandle = new HoodieRangeInfoHandle(config, hoodieTable, pf);
-          String[] minMaxKeys = rangeInfoHandle.getMinMaxKeys();
-          return Pair.of(pf.getKey(), new BloomIndexFileInfo(pf.getValue(), minMaxKeys[0], minMaxKeys[1]));
-        } catch (MetadataNotFoundException me) {
-          LOG.warn("Unable to find range metadata in file :" + pf);
-          return Pair.of(pf.getKey(), new BloomIndexFileInfo(pf.getValue()));
+    context.setJobStatus(this.getClass().getName(), "Obtain key ranges for file slices (range pruning=on)");
+    return context.map(partitionPathFileIDList, pf -> {
+      try {
+        HoodieRangeInfoHandle rangeInfoHandle = new HoodieRangeInfoHandle(config, hoodieTable, pf);
+        String[] minMaxKeys = rangeInfoHandle.getMinMaxKeys();
+        return Pair.of(pf.getKey(), new BloomIndexFileInfo(pf.getValue(), minMaxKeys[0], minMaxKeys[1]));
+      } catch (MetadataNotFoundException me) {
+        LOG.warn("Unable to find range metadata in file :" + pf);
+        return Pair.of(pf.getKey(), new BloomIndexFileInfo(pf.getValue()));
+      }
+    }, Math.max(partitionPathFileIDList.size(), 1));
+  }
+
+  /**
+   * Get the latest base files for the requested partitions.
+   *
+   * @param partitions  - List of partitions to get the base files for
+   * @param context     - Engine context
+   * @param hoodieTable - Hoodie Table
+   * @return List of partition and file column range info pairs
+   */
+  List<Pair<String, BloomIndexFileInfo>> getLatestBaseFilesForPartitions(
+      List<String> partitions, final HoodieEngineContext context, final HoodieTable hoodieTable) {
+    List<Pair<String, String>> partitionPathFileIDList = getLatestBaseFilesForAllPartitions(partitions, context,
+        hoodieTable).stream()
+        .map(pair -> Pair.of(pair.getKey(), pair.getValue().getFileId()))
+        .collect(toList());
+    return partitionPathFileIDList.stream()
+        .map(pf -> Pair.of(pf.getKey(), new BloomIndexFileInfo(pf.getValue()))).collect(toList());
+  }
+
+  /**
+   * Load the column stats index as BloomIndexFileInfo for all the involved files in the partition.
+   *
+   * @param partitions  - List of partitions for which column stats need to be loaded
+   * @param context     - Engine context
+   * @param hoodieTable - Hoodie table
+   * @return List of partition and file column range info pairs
+   */
+  List<Pair<String, BloomIndexFileInfo>> loadColumnRangesFromMetaIndex(

Review comment:
       Can this method be private?

##########
File path: hudi-client/hudi-client-common/src/main/java/org/apache/hudi/io/HoodieKeyLookupHandle.java
##########
@@ -46,52 +50,54 @@
 
   private static final Logger LOG = LogManager.getLogger(HoodieKeyLookupHandle.class);
 
-  private final HoodieTableType tableType;
-
   private final BloomFilter bloomFilter;
-
   private final List<String> candidateRecordKeys;
-
+  private final boolean useMetadataTableIndex;
+  private Option<String> fileName = Option.empty();
   private long totalKeysChecked;
 
   public HoodieKeyLookupHandle(HoodieWriteConfig config, HoodieTable<T, I, K, O> hoodieTable,
-                               Pair<String, String> partitionPathFilePair) {
-    super(config, null, hoodieTable, partitionPathFilePair);
-    this.tableType = hoodieTable.getMetaClient().getTableType();
+                               Pair<String, String> partitionPathFileIDPair) {
+    this(config, hoodieTable, partitionPathFileIDPair, Option.empty(), false);
+  }
+
+  public HoodieKeyLookupHandle(HoodieWriteConfig config, HoodieTable<T, I, K, O> hoodieTable,
+                               Pair<String, String> partitionPathFileIDPair, Option<String> fileName,
+                               boolean useMetadataTableIndex) {
+    super(config, null, hoodieTable, partitionPathFileIDPair);

Review comment:
       I know this is not due to your change but can we take up replacing `null` by Option.empty() in this PR? If not, then at least we should track it as a separate task. I see a lot of places we're doing this without proper checks which causes NPE. 

##########
File path: hudi-client/hudi-client-common/src/main/java/org/apache/hudi/index/bloom/HoodieBloomIndex.java
##########
@@ -111,13 +116,19 @@ public HoodieBloomIndex(HoodieWriteConfig config, BaseHoodieBloomIndexHelper blo
   private HoodiePairData<HoodieKey, HoodieRecordLocation> lookupIndex(
       HoodiePairData<String, String> partitionRecordKeyPairs, final HoodieEngineContext context,
       final HoodieTable hoodieTable) {
-    // Obtain records per partition, in the incoming records
+    // Step 1: Obtain records per partition, in the incoming records
     Map<String, Long> recordsPerPartition = partitionRecordKeyPairs.countByKey();
     List<String> affectedPartitionPathList = new ArrayList<>(recordsPerPartition.keySet());
 
     // Step 2: Load all involved files as <Partition, filename> pairs
-    List<Pair<String, BloomIndexFileInfo>> fileInfoList =
-        loadInvolvedFiles(affectedPartitionPathList, context, hoodieTable);
+    List<Pair<String, BloomIndexFileInfo>> fileInfoList;
+    if (config.getBloomIndexPruneByRanges()) {
+      fileInfoList = (config.getMetadataConfig().isMetaIndexColumnStatsEnabled()
+          ? loadColumnRangesFromMetaIndex(affectedPartitionPathList, context, hoodieTable)
+          : loadColumnRangesFromFiles(affectedPartitionPathList, context, hoodieTable));
+    } else {
+      fileInfoList = getLatestBaseFilesForPartitions(affectedPartitionPathList, context, hoodieTable);
+    }

Review comment:
       Instead of branching here, why not branch in `loadColumnRangesFromFiles`? That way it will apply to subclasses too.

##########
File path: hudi-common/src/main/java/org/apache/hudi/metadata/MetadataPartitionType.java
##########
@@ -22,27 +22,53 @@
 import java.util.List;
 
 public enum MetadataPartitionType {
-  FILES("files", "files-");
+  // TODO: Make the file group count configurable or auto compute based on the scaling needs

Review comment:
       If file group count is going to be dynamic in future then let's not keep it in enum. For now, we can just define a static constants in HoodieTableMetadataUtil What do you think?

##########
File path: hudi-client/hudi-client-common/src/main/java/org/apache/hudi/index/HoodieIndexUtils.java
##########
@@ -101,4 +116,34 @@ public static HoodieRecord getTaggedRecord(HoodieRecord inputRecord, Option<Hood
     }
     return record;
   }
+
+  /**
+   * Given a list of row keys and one file, return only row keys existing in that file.
+   *
+   * @param filePath            - File to filter keys from
+   * @param candidateRecordKeys - Candidate keys to filter
+   * @return List of candidate keys that are available in the file
+   */
+  public static List<String> filterKeysFromFile(Path filePath, List<String> candidateRecordKeys,
+                                                Configuration configuration) throws HoodieIndexException {
+    ValidationUtils.checkArgument(FSUtils.isBaseFile(filePath));
+    List<String> foundRecordKeys = new ArrayList<>();
+    try {
+      // Load all rowKeys from the file, to double-confirm
+      if (!candidateRecordKeys.isEmpty()) {
+        HoodieTimer timer = new HoodieTimer().startTimer();
+        HoodieFileReader fileReader = HoodieFileReaderFactory.getFileReader(configuration, filePath);
+        Set<String> fileRowKeys = fileReader.filterKeys(new TreeSet<>(candidateRecordKeys));
+        foundRecordKeys.addAll(fileRowKeys);
+        LOG.info(String.format("Checked keys against file %s, in %d ms. #candidates (%d) #found (%d)", filePath,
+            timer.endTimer(), candidateRecordKeys.size(), foundRecordKeys.size()));
+        if (LOG.isDebugEnabled()) {

Review comment:
       Is it necessary to check `isDebugEnabled`? Looks redundant given that the next line is logging in debug mode.

##########
File path: hudi-client/hudi-client-common/src/main/java/org/apache/hudi/io/HoodieKeyLookupHandle.java
##########
@@ -46,52 +50,54 @@
 
   private static final Logger LOG = LogManager.getLogger(HoodieKeyLookupHandle.class);
 
-  private final HoodieTableType tableType;
-
   private final BloomFilter bloomFilter;
-
   private final List<String> candidateRecordKeys;
-
+  private final boolean useMetadataTableIndex;
+  private Option<String> fileName = Option.empty();
   private long totalKeysChecked;
 
   public HoodieKeyLookupHandle(HoodieWriteConfig config, HoodieTable<T, I, K, O> hoodieTable,
-                               Pair<String, String> partitionPathFilePair) {
-    super(config, null, hoodieTable, partitionPathFilePair);
-    this.tableType = hoodieTable.getMetaClient().getTableType();
+                               Pair<String, String> partitionPathFileIDPair) {
+    this(config, hoodieTable, partitionPathFileIDPair, Option.empty(), false);
+  }
+
+  public HoodieKeyLookupHandle(HoodieWriteConfig config, HoodieTable<T, I, K, O> hoodieTable,
+                               Pair<String, String> partitionPathFileIDPair, Option<String> fileName,
+                               boolean useMetadataTableIndex) {
+    super(config, null, hoodieTable, partitionPathFileIDPair);
     this.candidateRecordKeys = new ArrayList<>();
     this.totalKeysChecked = 0;
-    HoodieTimer timer = new HoodieTimer().startTimer();
-
-    try {
-      this.bloomFilter = createNewFileReader().readBloomFilter();
-    } catch (IOException e) {
-      throw new HoodieIndexException(String.format("Error reading bloom filter from %s: %s", partitionPathFilePair, e));
+    if (fileName.isPresent()) {
+      ValidationUtils.checkArgument(FSUtils.getFileId(fileName.get()).equals(getFileId()));
+      this.fileName = fileName;
     }
-    LOG.info(String.format("Read bloom filter from %s in %d ms", partitionPathFilePair, timer.endTimer()));
+    this.useMetadataTableIndex = useMetadataTableIndex;
+    this.bloomFilter = getBloomFilter();
   }
 
-  /**
-   * Given a list of row keys and one file, return only row keys existing in that file.
-   */
-  public List<String> checkCandidatesAgainstFile(Configuration configuration, List<String> candidateRecordKeys,
-                                                 Path filePath) throws HoodieIndexException {
-    List<String> foundRecordKeys = new ArrayList<>();
+  private BloomFilter getBloomFilter() {
+    BloomFilter bloomFilter = null;
+    HoodieTimer timer = new HoodieTimer().startTimer();
     try {
-      // Load all rowKeys from the file, to double-confirm
-      if (!candidateRecordKeys.isEmpty()) {
-        HoodieTimer timer = new HoodieTimer().startTimer();
-        Set<String> fileRowKeys = createNewFileReader().filterRowKeys(new HashSet<>(candidateRecordKeys));
-        foundRecordKeys.addAll(fileRowKeys);
-        LOG.info(String.format("Checked keys against file %s, in %d ms. #candidates (%d) #found (%d)", filePath,
-            timer.endTimer(), candidateRecordKeys.size(), foundRecordKeys.size()));
-        if (LOG.isDebugEnabled()) {
-          LOG.debug("Keys matching for file " + filePath + " => " + foundRecordKeys);
+      if (this.useMetadataTableIndex) {
+        ValidationUtils.checkArgument(this.fileName.isPresent());

Review comment:
       Let's add a meanigful message for all `ValidationUtils.checkArgument` calls.

##########
File path: hudi-client/hudi-client-common/src/main/java/org/apache/hudi/io/HoodieKeyLookupHandle.java
##########
@@ -46,52 +50,54 @@
 
   private static final Logger LOG = LogManager.getLogger(HoodieKeyLookupHandle.class);
 
-  private final HoodieTableType tableType;
-
   private final BloomFilter bloomFilter;
-
   private final List<String> candidateRecordKeys;
-
+  private final boolean useMetadataTableIndex;
+  private Option<String> fileName = Option.empty();
   private long totalKeysChecked;
 
   public HoodieKeyLookupHandle(HoodieWriteConfig config, HoodieTable<T, I, K, O> hoodieTable,
-                               Pair<String, String> partitionPathFilePair) {
-    super(config, null, hoodieTable, partitionPathFilePair);
-    this.tableType = hoodieTable.getMetaClient().getTableType();
+                               Pair<String, String> partitionPathFileIDPair) {
+    this(config, hoodieTable, partitionPathFileIDPair, Option.empty(), false);
+  }
+
+  public HoodieKeyLookupHandle(HoodieWriteConfig config, HoodieTable<T, I, K, O> hoodieTable,
+                               Pair<String, String> partitionPathFileIDPair, Option<String> fileName,
+                               boolean useMetadataTableIndex) {
+    super(config, null, hoodieTable, partitionPathFileIDPair);
     this.candidateRecordKeys = new ArrayList<>();
     this.totalKeysChecked = 0;
-    HoodieTimer timer = new HoodieTimer().startTimer();
-
-    try {
-      this.bloomFilter = createNewFileReader().readBloomFilter();
-    } catch (IOException e) {
-      throw new HoodieIndexException(String.format("Error reading bloom filter from %s: %s", partitionPathFilePair, e));
+    if (fileName.isPresent()) {
+      ValidationUtils.checkArgument(FSUtils.getFileId(fileName.get()).equals(getFileId()));
+      this.fileName = fileName;
     }
-    LOG.info(String.format("Read bloom filter from %s in %d ms", partitionPathFilePair, timer.endTimer()));
+    this.useMetadataTableIndex = useMetadataTableIndex;
+    this.bloomFilter = getBloomFilter();
   }
 
-  /**
-   * Given a list of row keys and one file, return only row keys existing in that file.
-   */
-  public List<String> checkCandidatesAgainstFile(Configuration configuration, List<String> candidateRecordKeys,
-                                                 Path filePath) throws HoodieIndexException {
-    List<String> foundRecordKeys = new ArrayList<>();
+  private BloomFilter getBloomFilter() {
+    BloomFilter bloomFilter = null;
+    HoodieTimer timer = new HoodieTimer().startTimer();
     try {
-      // Load all rowKeys from the file, to double-confirm
-      if (!candidateRecordKeys.isEmpty()) {
-        HoodieTimer timer = new HoodieTimer().startTimer();
-        Set<String> fileRowKeys = createNewFileReader().filterRowKeys(new HashSet<>(candidateRecordKeys));
-        foundRecordKeys.addAll(fileRowKeys);
-        LOG.info(String.format("Checked keys against file %s, in %d ms. #candidates (%d) #found (%d)", filePath,
-            timer.endTimer(), candidateRecordKeys.size(), foundRecordKeys.size()));
-        if (LOG.isDebugEnabled()) {
-          LOG.debug("Keys matching for file " + filePath + " => " + foundRecordKeys);
+      if (this.useMetadataTableIndex) {
+        ValidationUtils.checkArgument(this.fileName.isPresent());
+        Option<ByteBuffer> bloomFilterByteBuffer =
+            hoodieTable.getMetadataTable().getBloomFilter(partitionPathFileIDPair.getLeft(), fileName.get());
+        if (!bloomFilterByteBuffer.isPresent()) {
+          throw new HoodieIndexException("BloomFilter missing for " + partitionPathFileIDPair.getRight());

Review comment:
       Why not log warn/error message and fallback to `createNewFileReader().readBloomFilter()`?

##########
File path: hudi-common/src/main/java/org/apache/hudi/metadata/HoodieTableMetadataUtil.java
##########
@@ -62,12 +76,37 @@
 
   private static final Logger LOG = LogManager.getLogger(HoodieTableMetadataUtil.class);
 
+  protected static final String PARTITION_NAME_FILES = "files";
+  protected static final String PARTITION_NAME_COLUMN_STATS = "column_stats";
+  protected static final String PARTITION_NAME_BLOOM_FILTERS = "bloom_filters";
+
+  /**
+   * Get the metadata partition type from the partition path.
+   *
+   * @param partitionPath - Partition path to get the type for
+   * @return Partition type for the path
+   */
+  public static Option<MetadataPartitionType> fromPartitionPath(final String partitionPath) {

Review comment:
       Instead of having separate util method here, I would rather have this in enum itself.

##########
File path: hudi-common/src/main/java/org/apache/hudi/io/storage/HoodieFileReader.java
##########
@@ -35,6 +37,14 @@
 
   public Set<String> filterRowKeys(Set<String> candidateRowKeys);
 
+  default Map<String, R> getRecordsByKeys(TreeSet<String> sortedCandidateRowKeys) throws IOException {

Review comment:
       How is TreeSet helpful for these APIs?

##########
File path: hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/index/bloom/HoodieBloomMetaIndexLazyCheckFunction.java
##########
@@ -0,0 +1,137 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hudi.index.bloom;
+
+import org.apache.hudi.client.utils.LazyIterableIterator;
+import org.apache.hudi.common.model.HoodieBaseFile;
+import org.apache.hudi.common.model.HoodieKey;
+import org.apache.hudi.common.util.Option;
+import org.apache.hudi.common.util.ValidationUtils;
+import org.apache.hudi.common.util.collection.Pair;
+import org.apache.hudi.config.HoodieWriteConfig;
+import org.apache.hudi.exception.HoodieException;
+import org.apache.hudi.exception.HoodieIndexException;
+import org.apache.hudi.io.HoodieKeyLookupResult;
+import org.apache.hudi.io.HoodieKeyLookupHandle;
+import org.apache.hudi.table.HoodieTable;
+import org.apache.log4j.LogManager;
+import org.apache.log4j.Logger;
+import org.apache.spark.api.java.function.Function2;
+import scala.Tuple2;
+
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Map;
+
+/**
+ * Spark Function2 implementation for checking bloom filters for the
+ * requested keys from the metadata table index. The bloom filter
+ * checking for keys and the actual file verification for the
+ * candidate keys is done in a lazy fashion of file by file.
+ */
+public class HoodieBloomMetaIndexLazyCheckFunction implements Function2<Integer, Iterator<Tuple2<String, HoodieKey>>, Iterator<List<HoodieKeyLookupResult>>> {
+
+  private static final Logger LOG = LogManager.getLogger(HoodieBloomMetaIndexLazyCheckFunction.class);
+  private final HoodieTable hoodieTable;
+  private final HoodieWriteConfig config;
+
+  public HoodieBloomMetaIndexLazyCheckFunction(HoodieTable hoodieTable, HoodieWriteConfig config) {
+    this.hoodieTable = hoodieTable;
+    this.config = config;
+  }
+
+  @Override
+  public Iterator<List<HoodieKeyLookupResult>> call(Integer integer, Iterator<Tuple2<String, HoodieKey>> tuple2Iterator) throws Exception {
+    return new LazyKeyCheckIterator(tuple2Iterator);
+  }
+
+  class LazyKeyCheckIterator extends LazyIterableIterator<Tuple2<String, HoodieKey>, List<HoodieKeyLookupResult>> {
+
+    private HoodieKeyLookupHandle keyLookupHandle;
+
+    LazyKeyCheckIterator(Iterator<Tuple2<String, HoodieKey>> filePartitionRecordKeyTripletItr) {
+      super(filePartitionRecordKeyTripletItr);
+    }
+
+    @Override
+    protected void start() {
+    }
+
+    @Override
+    protected List<HoodieKeyLookupResult> computeNext() {
+
+      List<HoodieKeyLookupResult> ret = new ArrayList<>();
+      try {
+        final Map<String, HoodieBaseFile> fileIDBaseFileMap = new HashMap<>();
+        while (inputItr.hasNext()) {
+          Tuple2<String, HoodieKey> currentTuple = inputItr.next();
+          final String partitionPath = currentTuple._2.getPartitionPath();
+          final String recordKey = currentTuple._2.getRecordKey();
+          final String fileId = currentTuple._1;
+          ValidationUtils.checkState(!fileId.isEmpty());
+          if (!fileIDBaseFileMap.containsKey(fileId)) {
+            Option<HoodieBaseFile> baseFile = hoodieTable.getBaseFileOnlyView().getLatestBaseFile(partitionPath, fileId);

Review comment:
       Same question regarding base file as the batch check function.

##########
File path: hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/index/bloom/HoodieBloomMetaIndexBatchCheckFunction.java
##########
@@ -0,0 +1,140 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hudi.index.bloom;
+
+import org.apache.hadoop.fs.Path;
+import org.apache.hudi.common.bloom.BloomFilterTypeCode;
+import org.apache.hudi.common.bloom.HoodieDynamicBoundedBloomFilter;
+import org.apache.hudi.common.fs.FSUtils;
+import org.apache.hudi.common.model.HoodieBaseFile;
+import org.apache.hudi.common.model.HoodieKey;
+import org.apache.hudi.common.util.Option;
+import org.apache.hudi.common.util.ValidationUtils;
+import org.apache.hudi.common.util.collection.Pair;
+import org.apache.hudi.config.HoodieWriteConfig;
+import org.apache.hudi.exception.HoodieIndexException;
+import org.apache.hudi.index.HoodieIndexUtils;
+import org.apache.hudi.io.HoodieKeyLookupResult;
+import org.apache.hudi.table.HoodieTable;
+import org.apache.log4j.LogManager;
+import org.apache.log4j.Logger;
+import org.apache.spark.api.java.function.Function2;
+import scala.Tuple2;
+
+import java.nio.ByteBuffer;
+import java.nio.charset.StandardCharsets;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Map;
+import java.util.stream.Collectors;
+
+/**
+ * Spark Function2 implementation for checking bloom filters for the
+ * requested keys from the metadata table index. The bloom filter
+ * checking for keys and the actual file verification for the
+ * candidate keys is done in a batch fashion for all the provided
+ * keys at once.
+ */
+public class HoodieBloomMetaIndexBatchCheckFunction implements
+    Function2<Integer, Iterator<Tuple2<String, HoodieKey>>, Iterator<List<HoodieKeyLookupResult>>> {
+
+  private static final Logger LOG = LogManager.getLogger(HoodieBloomMetaIndexBatchCheckFunction.class);
+  private final HoodieTable hoodieTable;
+  private final HoodieWriteConfig config;
+
+  public HoodieBloomMetaIndexBatchCheckFunction(HoodieTable hoodieTable, HoodieWriteConfig config) {
+    this.hoodieTable = hoodieTable;
+    this.config = config;
+  }
+
+  @Override
+  public Iterator<List<HoodieKeyLookupResult>> call(Integer integer, Iterator<Tuple2<String, HoodieKey>> tuple2Iterator) throws Exception {
+    List<List<HoodieKeyLookupResult>> resultList = new ArrayList<>();
+    // Partition path and file name pair to list of keys
+    Map<Pair<String, String>, List<HoodieKey>> fileToKeysMap = new HashMap<>();
+
+    final Map<String, HoodieBaseFile> fileIDBaseFileMap = new HashMap<>();
+    while (tuple2Iterator.hasNext()) {
+      Tuple2<String, HoodieKey> entry = tuple2Iterator.next();
+      final String partitionPath = entry._2.getPartitionPath();
+      final String fileId = entry._1;
+      if (!fileIDBaseFileMap.containsKey(fileId)) {
+        Option<HoodieBaseFile> baseFile = hoodieTable.getBaseFileOnlyView().getLatestBaseFile(partitionPath, fileId);

Review comment:
       Is it possible at any point that there are no base files in the table? What happens then? Like for example, the MOR table due to kakfka-connect creates only log files.

##########
File path: hudi-client/hudi-client-common/src/main/java/org/apache/hudi/index/bloom/HoodieBloomIndex.java
##########
@@ -133,30 +144,89 @@ public HoodieBloomIndex(HoodieWriteConfig config, BaseHoodieBloomIndexHelper blo
   /**
    * Load all involved files as <Partition, filename> pair List.
    */
-  List<Pair<String, BloomIndexFileInfo>> loadInvolvedFiles(
+  List<Pair<String, BloomIndexFileInfo>> loadColumnRangesFromFiles(
       List<String> partitions, final HoodieEngineContext context, final HoodieTable hoodieTable) {
     // Obtain the latest data files from all the partitions.
     List<Pair<String, String>> partitionPathFileIDList = getLatestBaseFilesForAllPartitions(partitions, context, hoodieTable).stream()
         .map(pair -> Pair.of(pair.getKey(), pair.getValue().getFileId()))
         .collect(toList());
 
-    if (config.getBloomIndexPruneByRanges()) {
-      // also obtain file ranges, if range pruning is enabled
-      context.setJobStatus(this.getClass().getName(), "Obtain key ranges for file slices (range pruning=on)");
-      return context.map(partitionPathFileIDList, pf -> {
-        try {
-          HoodieRangeInfoHandle rangeInfoHandle = new HoodieRangeInfoHandle(config, hoodieTable, pf);
-          String[] minMaxKeys = rangeInfoHandle.getMinMaxKeys();
-          return Pair.of(pf.getKey(), new BloomIndexFileInfo(pf.getValue(), minMaxKeys[0], minMaxKeys[1]));
-        } catch (MetadataNotFoundException me) {
-          LOG.warn("Unable to find range metadata in file :" + pf);
-          return Pair.of(pf.getKey(), new BloomIndexFileInfo(pf.getValue()));
+    context.setJobStatus(this.getClass().getName(), "Obtain key ranges for file slices (range pruning=on)");
+    return context.map(partitionPathFileIDList, pf -> {
+      try {
+        HoodieRangeInfoHandle rangeInfoHandle = new HoodieRangeInfoHandle(config, hoodieTable, pf);
+        String[] minMaxKeys = rangeInfoHandle.getMinMaxKeys();
+        return Pair.of(pf.getKey(), new BloomIndexFileInfo(pf.getValue(), minMaxKeys[0], minMaxKeys[1]));
+      } catch (MetadataNotFoundException me) {
+        LOG.warn("Unable to find range metadata in file :" + pf);
+        return Pair.of(pf.getKey(), new BloomIndexFileInfo(pf.getValue()));
+      }
+    }, Math.max(partitionPathFileIDList.size(), 1));
+  }
+
+  /**
+   * Get the latest base files for the requested partitions.
+   *
+   * @param partitions  - List of partitions to get the base files for
+   * @param context     - Engine context
+   * @param hoodieTable - Hoodie Table
+   * @return List of partition and file column range info pairs
+   */
+  List<Pair<String, BloomIndexFileInfo>> getLatestBaseFilesForPartitions(

Review comment:
       Can this method be private?

##########
File path: hudi-common/src/main/java/org/apache/hudi/common/table/log/AbstractHoodieLogRecordReader.java
##########
@@ -174,11 +174,11 @@ protected String getKeyField() {
     return this.simpleKeyGenFields.get().getKey();
   }
 
-  public void scan() {
+  public synchronized void scan() {

Review comment:
       Why does scan have to be synchronized? Metadata table can be accessed in read mode by multiple readers right?

##########
File path: hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/index/bloom/HoodieBloomMetaIndexBatchCheckFunction.java
##########
@@ -0,0 +1,139 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hudi.index.bloom;
+
+import org.apache.hadoop.fs.Path;
+import org.apache.hudi.common.bloom.BloomFilterTypeCode;
+import org.apache.hudi.common.bloom.HoodieDynamicBoundedBloomFilter;
+import org.apache.hudi.common.fs.FSUtils;
+import org.apache.hudi.common.model.HoodieBaseFile;
+import org.apache.hudi.common.model.HoodieKey;
+import org.apache.hudi.common.util.Option;
+import org.apache.hudi.common.util.ValidationUtils;
+import org.apache.hudi.common.util.collection.Pair;
+import org.apache.hudi.config.HoodieWriteConfig;
+import org.apache.hudi.exception.HoodieIndexException;
+import org.apache.hudi.index.HoodieIndexUtils;
+import org.apache.hudi.io.HoodieKeyLookupResult;
+import org.apache.hudi.table.HoodieTable;
+import org.apache.log4j.LogManager;
+import org.apache.log4j.Logger;
+import org.apache.spark.api.java.function.Function2;
+import scala.Tuple2;
+
+import java.nio.ByteBuffer;
+import java.nio.charset.StandardCharsets;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Map;
+import java.util.stream.Collectors;
+
+/**
+ * Function performing actual checking of RDD partition containing (fileId, hoodieKeys) against the actual files.
+ */
+public class HoodieBloomMetaIndexBatchCheckFunction implements
+    Function2<Integer, Iterator<Tuple2<String, HoodieKey>>, Iterator<List<HoodieKeyLookupResult>>> {
+
+  private static final Logger LOG = LogManager.getLogger(HoodieBloomMetaIndexBatchCheckFunction.class);
+  private final HoodieTable hoodieTable;
+  private final HoodieWriteConfig config;
+
+  public HoodieBloomMetaIndexBatchCheckFunction(HoodieTable hoodieTable, HoodieWriteConfig config) {
+    this.hoodieTable = hoodieTable;
+    this.config = config;
+  }
+
+  @Override
+  public Iterator<List<HoodieKeyLookupResult>> call(Integer integer, Iterator<Tuple2<String, HoodieKey>> tuple2Iterator) throws Exception {
+    List<List<HoodieKeyLookupResult>> resultList = new ArrayList<>();
+    Map<Pair<String, String>, List<HoodieKey>> fileToKeysMap = new HashMap<>();
+
+    final Map<String, HoodieBaseFile> fileIDBaseFileMap = new HashMap<>();
+    while (tuple2Iterator.hasNext()) {
+      Tuple2<String, HoodieKey> entry = tuple2Iterator.next();
+      final String partitionPath = entry._2.getPartitionPath();
+      final String fileId = entry._1;
+      if (!fileIDBaseFileMap.containsKey(fileId)) {

Review comment:
       Is it possible for iterator to have a fileId multiple times in the same task? If not, then construction of map may not be necessary.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@hudi.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org