You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hudi.apache.org by GitBox <gi...@apache.org> on 2022/02/04 09:43:52 UTC

[GitHub] [hudi] codope commented on a change in pull request #4740: [HUDI-3356][HUDI-3203] HoodieData for metadata index records; BloomFilter construction from index based on the type param

codope commented on a change in pull request #4740:
URL: https://github.com/apache/hudi/pull/4740#discussion_r799291405



##########
File path: hudi-client/hudi-client-common/src/main/java/org/apache/hudi/io/HoodieKeyLookupHandle.java
##########
@@ -53,56 +46,24 @@
 
   private final BloomFilter bloomFilter;
   private final List<String> candidateRecordKeys;
-  private final boolean useMetadataTableIndex;
-  private Option<String> fileName = Option.empty();
   private long totalKeysChecked;
 
   public HoodieKeyLookupHandle(HoodieWriteConfig config, HoodieTable<T, I, K, O> hoodieTable,
                                Pair<String, String> partitionPathFileIDPair) {
-    this(config, hoodieTable, partitionPathFileIDPair, Option.empty(), false);
-  }
-
-  public HoodieKeyLookupHandle(HoodieWriteConfig config, HoodieTable<T, I, K, O> hoodieTable,
-                               Pair<String, String> partitionPathFileIDPair, Option<String> fileName,
-                               boolean useMetadataTableIndex) {
     super(config, hoodieTable, partitionPathFileIDPair);
     this.candidateRecordKeys = new ArrayList<>();
     this.totalKeysChecked = 0;
-    if (fileName.isPresent()) {
-      ValidationUtils.checkArgument(FSUtils.getFileId(fileName.get()).equals(getFileId()),
-          "File name '" + fileName.get() + "' doesn't match this lookup handle fileid '" + getFileId() + "'");
-      this.fileName = fileName;
-    }
-    this.useMetadataTableIndex = useMetadataTableIndex;
     this.bloomFilter = getBloomFilter();
   }
 
   private BloomFilter getBloomFilter() {
-    BloomFilter bloomFilter = null;
     HoodieTimer timer = new HoodieTimer().startTimer();
-    try {
-      if (this.useMetadataTableIndex) {
-        ValidationUtils.checkArgument(this.fileName.isPresent(),
-            "File name not available to fetch bloom filter from the metadata table index.");
-        Option<ByteBuffer> bloomFilterByteBuffer =
-            hoodieTable.getMetadataTable().getBloomFilter(partitionPathFileIDPair.getLeft(), fileName.get());
-        if (!bloomFilterByteBuffer.isPresent()) {
-          throw new HoodieIndexException("BloomFilter missing for " + partitionPathFileIDPair.getRight());
-        }
-        bloomFilter =
-            new HoodieDynamicBoundedBloomFilter(StandardCharsets.UTF_8.decode(bloomFilterByteBuffer.get()).toString(),
-                BloomFilterTypeCode.DYNAMIC_V0);
-      } else {
-        try (HoodieFileReader reader = createNewFileReader()) {
-          bloomFilter = reader.readBloomFilter();
-        }
-      }
+    try (HoodieFileReader reader = createNewFileReader()) {
+      LOG.debug(String.format("Read bloom filter from %s in %d ms", partitionPathFileIDPair, timer.endTimer()));
+      return reader.readBloomFilter();

Review comment:
       I see you hae removed `useMetadataTableIndex`. Will `readBloomFilter()` use MT bloom index by default?

##########
File path: hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/index/bloom/HoodieMetadataBloomIndexCheckFunction.java
##########
@@ -113,7 +110,7 @@ protected void start() {
       }
 
       List<Pair<String, String>> partitionNameFileNameList = new ArrayList<>(fileToKeysMap.keySet());
-      Map<Pair<String, String>, ByteBuffer> fileToBloomFilterMap =
+      Map<Pair<String, String>, BloomFilter> fileToBloomFilterMap =

Review comment:
       Is my understanding correct that change from ByteBuffer to BloomFilter is only to avoid repetitive decoding? Is there any other reason?

##########
File path: hudi-client/hudi-client-common/src/main/java/org/apache/hudi/io/HoodieKeyLookupHandle.java
##########
@@ -53,56 +46,24 @@
 
   private final BloomFilter bloomFilter;
   private final List<String> candidateRecordKeys;
-  private final boolean useMetadataTableIndex;
-  private Option<String> fileName = Option.empty();
   private long totalKeysChecked;
 
   public HoodieKeyLookupHandle(HoodieWriteConfig config, HoodieTable<T, I, K, O> hoodieTable,
                                Pair<String, String> partitionPathFileIDPair) {
-    this(config, hoodieTable, partitionPathFileIDPair, Option.empty(), false);
-  }
-
-  public HoodieKeyLookupHandle(HoodieWriteConfig config, HoodieTable<T, I, K, O> hoodieTable,
-                               Pair<String, String> partitionPathFileIDPair, Option<String> fileName,
-                               boolean useMetadataTableIndex) {
     super(config, hoodieTable, partitionPathFileIDPair);
     this.candidateRecordKeys = new ArrayList<>();
     this.totalKeysChecked = 0;
-    if (fileName.isPresent()) {
-      ValidationUtils.checkArgument(FSUtils.getFileId(fileName.get()).equals(getFileId()),
-          "File name '" + fileName.get() + "' doesn't match this lookup handle fileid '" + getFileId() + "'");
-      this.fileName = fileName;
-    }
-    this.useMetadataTableIndex = useMetadataTableIndex;
     this.bloomFilter = getBloomFilter();
   }
 
   private BloomFilter getBloomFilter() {
-    BloomFilter bloomFilter = null;
     HoodieTimer timer = new HoodieTimer().startTimer();
-    try {
-      if (this.useMetadataTableIndex) {
-        ValidationUtils.checkArgument(this.fileName.isPresent(),
-            "File name not available to fetch bloom filter from the metadata table index.");
-        Option<ByteBuffer> bloomFilterByteBuffer =
-            hoodieTable.getMetadataTable().getBloomFilter(partitionPathFileIDPair.getLeft(), fileName.get());
-        if (!bloomFilterByteBuffer.isPresent()) {
-          throw new HoodieIndexException("BloomFilter missing for " + partitionPathFileIDPair.getRight());
-        }
-        bloomFilter =
-            new HoodieDynamicBoundedBloomFilter(StandardCharsets.UTF_8.decode(bloomFilterByteBuffer.get()).toString(),
-                BloomFilterTypeCode.DYNAMIC_V0);
-      } else {
-        try (HoodieFileReader reader = createNewFileReader()) {
-          bloomFilter = reader.readBloomFilter();
-        }
-      }
+    try (HoodieFileReader reader = createNewFileReader()) {
+      LOG.debug(String.format("Read bloom filter from %s in %d ms", partitionPathFileIDPair, timer.endTimer()));

Review comment:
       Bloom filter is being read in the next line so why end the timer here?

##########
File path: hudi-common/src/main/java/org/apache/hudi/metadata/BaseTableMetadata.java
##########
@@ -203,15 +206,19 @@ protected BaseTableMetadata(HoodieEngineContext engineContext, HoodieMetadataCon
     metrics.ifPresent(m -> m.updateMetrics(HoodieMetadataMetrics.LOOKUP_BLOOM_FILTERS_METADATA_STR,
         (timer.endTimer() / partitionIDFileIDStrings.size())));
 
-    Map<Pair<String, String>, ByteBuffer> partitionFileToBloomFilterMap = new HashMap<>();
+    Map<Pair<String, String>, BloomFilter> partitionFileToBloomFilterMap = new HashMap<>();
     for (final Pair<String, Option<HoodieRecord<HoodieMetadataPayload>>> entry : hoodieRecordList) {
       if (entry.getRight().isPresent()) {
         final Option<HoodieMetadataBloomFilter> bloomFilterMetadata =
             entry.getRight().get().getData().getBloomFilterMetadata();

Review comment:
       Method throws `HoodieMetadataException` but this chained call can throw other exception as well. Should we handle in a try-catch?

##########
File path: hudi-common/src/main/java/org/apache/hudi/metadata/MetadataRecordsGenerationParams.java
##########
@@ -0,0 +1,64 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.hudi.metadata;
+
+import org.apache.hudi.common.table.HoodieTableMetaClient;
+
+import java.io.Serializable;
+import java.util.List;
+
+public class MetadataRecordsGenerationParams implements Serializable {
+
+  private final HoodieTableMetaClient dataMetaClient;

Review comment:
       Do we need to serialize `dataMetaClient` as well?

##########
File path: hudi-common/src/main/java/org/apache/hudi/metadata/HoodieTableMetadataUtil.java
##########
@@ -186,94 +179,92 @@ public static void deleteMetadataTable(String basePath, HoodieEngineContext cont
   /**
    * Convert commit action metadata to bloom filter records.
    *
-   * @param commitMetadata - Commit action metadata
-   * @param dataMetaClient - Meta client for the data table
-   * @param instantTime    - Action instant time
-   * @return List of metadata table records
+   * @param context                 - Engine context to use
+   * @param commitMetadata          - Commit action metadata
+   * @param instantTime             - Action instant time
+   * @param recordsGenerationParams - Parameters for bloom filter record generation
+   * @return HoodieData of metadata table records
    */
-  public static List<HoodieRecord> convertMetadataToBloomFilterRecords(HoodieCommitMetadata commitMetadata,
-                                                                       HoodieTableMetaClient dataMetaClient,
-                                                                       String instantTime) {
-    List<HoodieRecord> records = new LinkedList<>();
-    commitMetadata.getPartitionToWriteStats().forEach((partitionStatName, writeStats) -> {
-      final String partition = partitionStatName.equals(EMPTY_PARTITION_NAME) ? NON_PARTITIONED_NAME : partitionStatName;
-      Map<String, Long> newFiles = new HashMap<>(writeStats.size());
-      writeStats.forEach(hoodieWriteStat -> {
-        // No action for delta logs
-        if (hoodieWriteStat instanceof HoodieDeltaWriteStat) {
-          return;
-        }
+  public static HoodieData<HoodieRecord> convertMetadataToBloomFilterRecords(
+      HoodieEngineContext context, HoodieCommitMetadata commitMetadata,
+      String instantTime, MetadataRecordsGenerationParams recordsGenerationParams) {
+    final List<HoodieWriteStat> allWriteStats = commitMetadata.getPartitionToWriteStats().values().stream()
+        .flatMap(entry -> entry.stream()).collect(Collectors.toList());
+    if (allWriteStats.isEmpty()) {
+      return context.emptyHoodieData();
+    }
 
-        String pathWithPartition = hoodieWriteStat.getPath();
-        if (pathWithPartition == null) {
-          // Empty partition
-          LOG.error("Failed to find path in write stat to update metadata table " + hoodieWriteStat);
-          return;
-        }
-        int offset = partition.equals(NON_PARTITIONED_NAME) ? (pathWithPartition.startsWith("/") ? 1 : 0) :
-            partition.length() + 1;
+    HoodieData<HoodieWriteStat> allWriteStatsRDD = context.parallelize(allWriteStats,
+        Math.max(recordsGenerationParams.getBloomIndexParallelism(), allWriteStats.size()));
+    return allWriteStatsRDD.flatMap(hoodieWriteStat -> {
+      final String partition = hoodieWriteStat.getPartitionPath();
 
-        final String fileName = pathWithPartition.substring(offset);
-        if (!FSUtils.isBaseFile(new Path(fileName))) {
-          return;
-        }
-        ValidationUtils.checkState(!newFiles.containsKey(fileName), "Duplicate files in HoodieCommitMetadata");
+      // TODO: HUDI-1492 Delta write stat handling for schemes supporting appends
+      if (hoodieWriteStat instanceof HoodieDeltaWriteStat) {
+        return Collections.emptyListIterator();
+      }
 
-        final Path writeFilePath = new Path(dataMetaClient.getBasePath(), pathWithPartition);
+      String pathWithPartition = hoodieWriteStat.getPath();
+      if (pathWithPartition == null) {
+        // Empty partition
+        LOG.error("Failed to find path in write stat to update metadata table " + hoodieWriteStat);
+        return Collections.emptyListIterator();
+      }
+      int offset = partition.equals(NON_PARTITIONED_NAME) ? (pathWithPartition.startsWith("/") ? 1 : 0) :
+          partition.length() + 1;
+
+      final String fileName = pathWithPartition.substring(offset);
+      if (!FSUtils.isBaseFile(new Path(fileName))) {
+        return Collections.emptyListIterator();
+      }
+
+      final Path writeFilePath = new Path(recordsGenerationParams.getDataMetaClient().getBasePath(), pathWithPartition);
+      try (HoodieFileReader<IndexedRecord> fileReader =
+               HoodieFileReaderFactory.getFileReader(recordsGenerationParams.getDataMetaClient().getHadoopConf(), writeFilePath)) {
         try {
-          HoodieFileReader<IndexedRecord> fileReader =
-              HoodieFileReaderFactory.getFileReader(dataMetaClient.getHadoopConf(), writeFilePath);
-          try {
-            final BloomFilter fileBloomFilter = fileReader.readBloomFilter();
-            if (fileBloomFilter == null) {
-              LOG.error("Failed to read bloom filter for " + writeFilePath);
-              return;
-            }
-            ByteBuffer bloomByteBuffer = ByteBuffer.wrap(fileBloomFilter.serializeToString().getBytes());
-            HoodieRecord record = HoodieMetadataPayload.createBloomFilterMetadataRecord(
-                partition, fileName, instantTime, bloomByteBuffer, false);
-            records.add(record);
-          } catch (Exception e) {
+          final BloomFilter fileBloomFilter = fileReader.readBloomFilter();
+          if (fileBloomFilter == null) {
             LOG.error("Failed to read bloom filter for " + writeFilePath);
-            return;
+            return Collections.emptyListIterator();
           }
+          ByteBuffer bloomByteBuffer = ByteBuffer.wrap(fileBloomFilter.serializeToString().getBytes());
+          HoodieRecord record = HoodieMetadataPayload.createBloomFilterMetadataRecord(
+              partition, fileName, instantTime, recordsGenerationParams.getBloomFilterType(), bloomByteBuffer, false);

Review comment:
       This part looks similar to whta is being done at #L650. Is there a way to extract and and enahnce reuse?




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@hudi.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org