You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hudi.apache.org by GitBox <gi...@apache.org> on 2022/02/24 02:32:39 UTC

[GitHub] [hudi] alexeykudinkin commented on a change in pull request #4848: [HUDI-3356][HUDI-3203] HoodieData for metadata index records, bloom and colstats init

alexeykudinkin commented on a change in pull request #4848:
URL: https://github.com/apache/hudi/pull/4848#discussion_r813474629



##########
File path: hudi-client/hudi-client-common/src/main/java/org/apache/hudi/io/HoodieAppendHandle.java
##########
@@ -339,6 +385,13 @@ private void processAppendResult(AppendResult result) {
       updateWriteStatus(stat, result);
     }
 
+    if (config.isMetadataIndexColumnStatsForAllColumnsEnabled()) {

Review comment:
       Why is this check so specific to whether all columns are enabled? It's ok if we don't handle the use-case of collecting stats for subset of columns for now (since we don't have config for it) and leave a TODO here, but i don't think we need to be so specific in this check

##########
File path: hudi-client/hudi-client-common/src/main/java/org/apache/hudi/io/HoodieAppendHandle.java
##########
@@ -320,7 +325,48 @@ private void updateWriteStatus(HoodieDeltaWriteStat stat, AppendResult result) {
     statuses.add(this.writeStatus);
   }
 
-  private void processAppendResult(AppendResult result) {
+  /**
+   * Compute column statistics for the records part of this append handle.
+   *
+   * @param filePath       - Log file that records are part of
+   * @param recordList     - List of records appended to the log for which column statistics is needed for
+   * @param columnRangeMap - Output map to accumulate the column statistics for the records
+   */
+  private void computeRecordsStats(final String filePath, List<IndexedRecord> recordList,
+                                   Map<String, HoodieColumnRangeMetadata<Comparable>> columnRangeMap) {
+    recordList.forEach(record -> accumulateColumnRanges(record, writeSchemaWithMetaFields, filePath, columnRangeMap, config.isConsistentLogicalTimestampEnabled()));
+  }
+
+  /**
+   * Accumulate column range statistics for the requested record.
+   *
+   * @param record   - Record to get the column range statistics for
+   * @param schema   - Schema for the record
+   * @param filePath - File that record belongs to
+   */
+  private static void accumulateColumnRanges(IndexedRecord record, Schema schema, String filePath,
+          Map<String, HoodieColumnRangeMetadata<Comparable>> columnRangeMap, boolean consistentLogicalTimestampEnabled) {
+    if (!(record instanceof GenericRecord)) {
+      throw new HoodieIOException("Record is not a generic type to get column range metadata!");
+    }
+    schema.getFields().forEach(field -> {
+      final String fieldVal = HoodieAvroUtils.getNestedFieldValAsString((GenericRecord) record, field.name(), true, consistentLogicalTimestampEnabled);
+      final int fieldSize = fieldVal == null ? 0 : fieldVal.length();
+      final HoodieColumnRangeMetadata<Comparable> fieldRange = new HoodieColumnRangeMetadata<>(

Review comment:
       I don't think it's a good idea to create new `HoodieColumnRangeMetadata` object for every record (it's a pretty large object.
   
   Instead, for every field we can iterate over all records computing metrics locally (on the stack, in local vars) then populate `HoodieColumnRangeMetadata` once

##########
File path: hudi-common/src/main/java/org/apache/hudi/metadata/HoodieTableMetadataUtil.java
##########
@@ -848,41 +857,40 @@ public static HoodieTableFileSystemView getFileSystemView(HoodieTableMetaClient
     }
   }
 
-  private static List<String> getLatestColumns(HoodieTableMetaClient datasetMetaClient) {
-    return getLatestColumns(datasetMetaClient, false);
-  }
-
   public static Stream<HoodieRecord> translateWriteStatToColumnStats(HoodieWriteStat writeStat,
                                                                      HoodieTableMetaClient datasetMetaClient,
-                                                                     List<String> latestColumns) {
-    return getColumnStats(writeStat.getPartitionPath(), writeStat.getPath(), datasetMetaClient, latestColumns, false);
-
+                                                                     List<String> columnsToIndex) {
+    if (writeStat instanceof HoodieDeltaWriteStat && ((HoodieDeltaWriteStat) writeStat).getRecordsStats().isPresent()) {
+      Option<Map<String, HoodieColumnRangeMetadata<Comparable>>> columnRangeMap =

Review comment:
       What's the point of Option here?

##########
File path: hudi-common/src/main/java/org/apache/hudi/common/model/HoodieDeltaWriteStat.java
##########
@@ -69,4 +73,24 @@ public void addLogFiles(String logFile) {
   public List<String> getLogFiles() {
     return logFiles;
   }
+
+  public void setRecordsStats(RecordsStats<? extends Map> stats) {
+    recordsStats = Option.of(stats);
+  }
+
+  public Option<RecordsStats<? extends Map>> getRecordsStats() {
+    return recordsStats;
+  }
+
+  public static class RecordsStats<T> implements Serializable {

Review comment:
       What do we need this wrapper for? 
   

##########
File path: hudi-client/hudi-client-common/src/main/java/org/apache/hudi/io/HoodieAppendHandle.java
##########
@@ -320,7 +325,48 @@ private void updateWriteStatus(HoodieDeltaWriteStat stat, AppendResult result) {
     statuses.add(this.writeStatus);
   }
 
-  private void processAppendResult(AppendResult result) {
+  /**
+   * Compute column statistics for the records part of this append handle.
+   *
+   * @param filePath       - Log file that records are part of
+   * @param recordList     - List of records appended to the log for which column statistics is needed for
+   * @param columnRangeMap - Output map to accumulate the column statistics for the records
+   */
+  private void computeRecordsStats(final String filePath, List<IndexedRecord> recordList,
+                                   Map<String, HoodieColumnRangeMetadata<Comparable>> columnRangeMap) {
+    recordList.forEach(record -> accumulateColumnRanges(record, writeSchemaWithMetaFields, filePath, columnRangeMap, config.isConsistentLogicalTimestampEnabled()));

Review comment:
       I think we can inline this method given it's oneliner, and is not used anywhere else

##########
File path: hudi-common/src/main/java/org/apache/hudi/metadata/HoodieTableMetadataUtil.java
##########
@@ -330,78 +322,67 @@ public static void deleteMetadataTable(String basePath, HoodieEngineContext cont
       });
     });
 
-    return engineContext.map(deleteFileList, deleteFileInfo -> {
-      return HoodieMetadataPayload.createBloomFilterMetadataRecord(
-          deleteFileInfo.getLeft(), deleteFileInfo.getRight(), instantTime, ByteBuffer.allocate(0), true);
-    }, 1).stream().collect(Collectors.toList());
+    HoodieData<Pair<String, String>> deleteFileListRDD = engineContext.parallelize(deleteFileList,

Review comment:
       Do we really need to cast this action t/h RDD? Do we envision that this will scale past the point when we won't be able to handle this on the driver? 
   
   I'm worried about serialization cost we incur for every record we handle t/h RDD (serializing/de closure) to be able to create a single object

##########
File path: hudi-common/src/main/java/org/apache/hudi/common/config/HoodieMetadataConfig.java
##########
@@ -169,6 +169,12 @@
           + "store the column ranges and will be used for pruning files during the index lookups. "
           + "Only applies if " + ENABLE_METADATA_INDEX_COLUMN_STATS.key() + " is enabled.");
 
+  public static final ConfigProperty<Integer> COLUMN_STATS_INDEX_PARALLELISM = ConfigProperty

Review comment:
       This seems to be too low-level lever to expose as config:
   
    - If we want to determine optimal parallelism we should use # of cores as a proxy
    - If we want this to be a cap on how much parallelism we allow, we should rename it accordingly (and also generalize to cover all of Metadata activities)

##########
File path: hudi-client/hudi-client-common/src/main/java/org/apache/hudi/metadata/HoodieBackedTableMetadataWriter.java
##########
@@ -347,64 +348,62 @@ public void initTableMetadata() {
   /**
    * Bootstrap the metadata table if needed.
    *
-   * @param engineContext  - Engine context
-   * @param dataMetaClient - Meta client for the data table
-   * @param actionMetadata - Optional action metadata
-   * @param <T>            - Action metadata types extending Avro generated SpecificRecordBase
+   * @param dataMetaClient           - Meta client for the data table

Review comment:
       This seems off. Was this intentional?
   

##########
File path: hudi-common/src/main/java/org/apache/hudi/metadata/HoodieMetadataPayload.java
##########
@@ -121,10 +123,28 @@
   private HoodieMetadataBloomFilter bloomFilterMetadata = null;
   private HoodieMetadataColumnStats columnStatMetadata = null;
 
+  public static final BiFunction<HoodieMetadataColumnStats, HoodieMetadataColumnStats, HoodieMetadataColumnStats> COLUMN_STATS_MERGE_FUNCTION =

Review comment:
       Let's extract this to `HoodieMetadatUtil` as just a normal function. There's not much value in maintaining it as static constant

##########
File path: hudi-client/hudi-client-common/src/main/java/org/apache/hudi/io/HoodieKeyLookupHandle.java
##########
@@ -53,56 +45,23 @@
 
   private final BloomFilter bloomFilter;
   private final List<String> candidateRecordKeys;
-  private final boolean useMetadataTableIndex;
-  private Option<String> fileName = Option.empty();
   private long totalKeysChecked;
 
   public HoodieKeyLookupHandle(HoodieWriteConfig config, HoodieTable<T, I, K, O> hoodieTable,
                                Pair<String, String> partitionPathFileIDPair) {
-    this(config, hoodieTable, partitionPathFileIDPair, Option.empty(), false);
-  }
-
-  public HoodieKeyLookupHandle(HoodieWriteConfig config, HoodieTable<T, I, K, O> hoodieTable,
-                               Pair<String, String> partitionPathFileIDPair, Option<String> fileName,
-                               boolean useMetadataTableIndex) {
     super(config, hoodieTable, partitionPathFileIDPair);
     this.candidateRecordKeys = new ArrayList<>();
     this.totalKeysChecked = 0;
-    if (fileName.isPresent()) {
-      ValidationUtils.checkArgument(FSUtils.getFileId(fileName.get()).equals(getFileId()),
-          "File name '" + fileName.get() + "' doesn't match this lookup handle fileid '" + getFileId() + "'");
-      this.fileName = fileName;
-    }
-    this.useMetadataTableIndex = useMetadataTableIndex;
     this.bloomFilter = getBloomFilter();
   }
 
   private BloomFilter getBloomFilter() {
-    BloomFilter bloomFilter = null;
-    HoodieTimer timer = new HoodieTimer().startTimer();
-    try {
-      if (this.useMetadataTableIndex) {

Review comment:
       Can you please help me understand why this is changing?

##########
File path: hudi-client/hudi-client-common/src/main/java/org/apache/hudi/metadata/HoodieBackedTableMetadataWriter.java
##########
@@ -203,7 +204,7 @@ private void enablePartitions() {
    * @param metadataConfig       - Table config
    * @param metaClient           - Meta client for the metadata table
    * @param fsView               - Metadata table filesystem view to use
-   * @param isBootstrapCompleted - Is metadata table bootstrap completed
+   * @param isBootstrapCompleted - Is metadata table initialize completed

Review comment:
       nit: correct form would be "initializing"

##########
File path: hudi-client/hudi-client-common/src/main/java/org/apache/hudi/io/HoodieAppendHandle.java
##########
@@ -339,6 +385,13 @@ private void processAppendResult(AppendResult result) {
       updateWriteStatus(stat, result);
     }
 
+    if (config.isMetadataIndexColumnStatsForAllColumnsEnabled()) {
+      Map<String, HoodieColumnRangeMetadata<Comparable>> columnRangeMap = stat.getRecordsStats().isPresent()

Review comment:
       You can do `getRecordStats().getOrElse(() -> new HashMap())`

##########
File path: hudi-common/src/main/java/org/apache/hudi/metadata/HoodieTableMetadataUtil.java
##########
@@ -330,78 +322,67 @@ public static void deleteMetadataTable(String basePath, HoodieEngineContext cont
       });
     });
 
-    return engineContext.map(deleteFileList, deleteFileInfo -> {
-      return HoodieMetadataPayload.createBloomFilterMetadataRecord(
-          deleteFileInfo.getLeft(), deleteFileInfo.getRight(), instantTime, ByteBuffer.allocate(0), true);
-    }, 1).stream().collect(Collectors.toList());
+    HoodieData<Pair<String, String>> deleteFileListRDD = engineContext.parallelize(deleteFileList,
+        Math.max(deleteFileList.size(), recordsGenerationParams.getBloomIndexParallelism()));
+    return deleteFileListRDD.map(deleteFileInfo -> HoodieMetadataPayload.createBloomFilterMetadataRecord(
+        deleteFileInfo.getLeft(), deleteFileInfo.getRight(), instantTime, StringUtils.EMPTY_STRING,
+        ByteBuffer.allocate(0), true));
   }
 
   /**
    * Convert clean metadata to column stats index records.
    *
-   * @param cleanMetadata     - Clean action metadata
-   * @param engineContext     - Engine context
-   * @param datasetMetaClient - data table meta client
+   * @param cleanMetadata           - Clean action metadata
+   * @param engineContext           - Engine context
+   * @param recordsGenerationParams - Parameters for bloom filter record generation
    * @return List of column stats index records for the clean metadata
    */
-  public static List<HoodieRecord> convertMetadataToColumnStatsRecords(HoodieCleanMetadata cleanMetadata,
-                                                                       HoodieEngineContext engineContext,
-                                                                       HoodieTableMetaClient datasetMetaClient) {
+  public static HoodieData<HoodieRecord> convertMetadataToColumnStatsRecords(HoodieCleanMetadata cleanMetadata,
+                                                                             HoodieEngineContext engineContext,
+                                                                             MetadataRecordsGenerationParams recordsGenerationParams) {
     List<Pair<String, String>> deleteFileList = new ArrayList<>();
     cleanMetadata.getPartitionMetadata().forEach((partition, partitionMetadata) -> {
       // Files deleted from a partition
       List<String> deletedFiles = partitionMetadata.getDeletePathPatterns();
       deletedFiles.forEach(entry -> deleteFileList.add(Pair.of(partition, entry)));
     });
 
-    List<String> latestColumns = getLatestColumns(datasetMetaClient);
-    return engineContext.flatMap(deleteFileList,
-        deleteFileInfo -> {
-          if (deleteFileInfo.getRight().endsWith(HoodieFileFormat.PARQUET.getFileExtension())) {
-            return getColumnStats(deleteFileInfo.getKey(), deleteFileInfo.getValue(), datasetMetaClient,
-                latestColumns, true);
-          }
-          return Stream.empty();
-        }, 1).stream().collect(Collectors.toList());
+    final List<String> columnsToIndex = getColumnsToIndex(recordsGenerationParams.getDataMetaClient(), recordsGenerationParams.isAllColumnStatsIndexEnabled());
+    HoodieData<Pair<String, String>> deleteFileListRDD = engineContext.parallelize(deleteFileList,
+        Math.max(deleteFileList.size(), recordsGenerationParams.getColumnStatsIndexParallelism()));
+    return deleteFileListRDD.flatMap(deleteFileInfo -> {
+      if (deleteFileInfo.getRight().endsWith(HoodieFileFormat.PARQUET.getFileExtension())) {

Review comment:
       Let's use Pair API consistently (either getKey/Value or Left/Right), it's quite confusing to see them mixed

##########
File path: hudi-common/src/main/java/org/apache/hudi/metadata/HoodieTableMetadataUtil.java
##########
@@ -330,78 +322,67 @@ public static void deleteMetadataTable(String basePath, HoodieEngineContext cont
       });
     });
 
-    return engineContext.map(deleteFileList, deleteFileInfo -> {
-      return HoodieMetadataPayload.createBloomFilterMetadataRecord(
-          deleteFileInfo.getLeft(), deleteFileInfo.getRight(), instantTime, ByteBuffer.allocate(0), true);
-    }, 1).stream().collect(Collectors.toList());
+    HoodieData<Pair<String, String>> deleteFileListRDD = engineContext.parallelize(deleteFileList,
+        Math.max(deleteFileList.size(), recordsGenerationParams.getBloomIndexParallelism()));
+    return deleteFileListRDD.map(deleteFileInfo -> HoodieMetadataPayload.createBloomFilterMetadataRecord(

Review comment:
       Let's create common override for this method (it seems to be used in 3 more places at least)




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@hudi.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org