You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hudi.apache.org by GitBox <gi...@apache.org> on 2022/03/08 23:06:15 UTC

[GitHub] [hudi] alexeykudinkin commented on a change in pull request #4848: [HUDI-3258] HoodieData for metadata index records, bloom and colstats init

alexeykudinkin commented on a change in pull request #4848:
URL: https://github.com/apache/hudi/pull/4848#discussion_r822126414



##########
File path: hudi-client/hudi-client-common/src/main/java/org/apache/hudi/io/HoodieAppendHandle.java
##########
@@ -339,6 +343,19 @@ private void processAppendResult(AppendResult result) {
       updateWriteStatus(stat, result);
     }
 
+    if (config.isMetadataIndexColumnStatsForAllColumnsEnabled()) {
+      Map<String, HoodieColumnRangeMetadata<Comparable>> columnRangeMap = stat.getRecordsStats().isPresent()
+          ? stat.getRecordsStats().get().getStats() : new HashMap<>();

Review comment:
       @codope that's what i was referring to with my comments regarding increased complexity in respect to `RecordStats`. Why not just have `stat.getRecordsStats().get()` instead?
   
   Now, when reading this code reader actually need to understand what is this additional `getStats()` call is about and why it's needed, while w/o it the call-site is crystal clear and doesn't require scanning through of `getRecordStats` to understand what's going on

##########
File path: hudi-client/hudi-client-common/src/main/java/org/apache/hudi/io/HoodieAppendHandle.java
##########
@@ -339,6 +343,19 @@ private void processAppendResult(AppendResult result) {
       updateWriteStatus(stat, result);
     }
 
+    if (config.isMetadataIndexColumnStatsForAllColumnsEnabled()) {
+      Map<String, HoodieColumnRangeMetadata<Comparable>> columnRangeMap = stat.getRecordsStats().isPresent()
+          ? stat.getRecordsStats().get().getStats() : new HashMap<>();
+      final String filePath = stat.getPath();
+      // initialize map of column name to map of stats name to stats value
+      Map<String, Map<String, Object>> columnToStats = new HashMap<>();
+      writeSchemaWithMetaFields.getFields().forEach(field -> columnToStats.putIfAbsent(field.name(), new HashMap<>()));
+      // collect stats for columns at once per record and keep iterating through every record to eventually find col stats for all fields.
+      recordList.forEach(record -> aggregateColumnStats(record, writeSchemaWithMetaFields, columnToStats, config.isConsistentLogicalTimestampEnabled()));

Review comment:
       Can we, instead of placing iteration and aggregation into separate methods, consolidate them in `aggregateColumnStats` so that its signature actually is:
   
   ```
   Map<String, Map<...>> aggregateColumnStats(records, writeSchema, ...)
   ```

##########
File path: hudi-common/src/main/java/org/apache/hudi/metadata/MetadataRecordsGenerationParams.java
##########
@@ -0,0 +1,72 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.hudi.metadata;
+
+import org.apache.hudi.common.table.HoodieTableMetaClient;
+
+import java.io.Serializable;
+import java.util.List;
+
+/**
+ * Encapsulates all parameters required to generate metadata index for enabled index types.
+ */
+public class MetadataRecordsGenerationParams implements Serializable {
+
+  private final HoodieTableMetaClient dataMetaClient;

Review comment:
       Let's limit the scope of this component to just _parameters_ for Index Generation. Otherwise this has a potential to become a dependency magnet, where random dependencies will be added here to avoid threading them through.

##########
File path: hudi-common/src/main/java/org/apache/hudi/metadata/HoodieTableMetadataUtil.java
##########
@@ -941,4 +978,72 @@ public static int getPartitionFileGroupCount(final MetadataPartitionType partiti
     }
   }
 
+  /**
+   * Accumulates column range metadata for the given field and updates the column range map.
+   *
+   * @param field          - column for which statistics will be computed
+   * @param filePath       - data file path
+   * @param columnRangeMap - old column range statistics, which will be merged in this computation
+   * @param columnToStats  - map of column to map of each stat and its value
+   */
+  public static void accumulateColumnRanges(Schema.Field field, String filePath,
+                                            Map<String, HoodieColumnRangeMetadata<Comparable>> columnRangeMap,
+                                            Map<String, Map<String, Object>> columnToStats) {
+    Map<String, Object> columnStats = columnToStats.get(field.name());
+    HoodieColumnRangeMetadata<Comparable> columnRangeMetadata = new HoodieColumnRangeMetadata<>(
+        filePath,
+        field.name(),
+        String.valueOf(columnStats.get(MIN)),
+        String.valueOf(columnStats.get(MAX)),
+        Long.parseLong(columnStats.getOrDefault(NULL_COUNT, 0).toString()),
+        Long.parseLong(columnStats.getOrDefault(VALUE_COUNT, 0).toString()),
+        Long.parseLong(columnStats.getOrDefault(TOTAL_SIZE, 0).toString()),
+        Long.parseLong(columnStats.getOrDefault(TOTAL_UNCOMPRESSED_SIZE, 0).toString())
+    );
+    columnRangeMap.merge(field.name(), columnRangeMetadata, COLUMN_RANGE_MERGE_FUNCTION);
+  }
+
+  /**
+   * Aggregates column stats for each field.
+   *
+   * @param record                            - current record
+   * @param schema                            - write schema
+   * @param columnToStats                     - map of column to map of each stat and its value which gets updates in this method
+   * @param consistentLogicalTimestampEnabled - flag to deal with logical timestamp type when getting column value
+   */
+  public static void aggregateColumnStats(IndexedRecord record, Schema schema,
+                                          Map<String, Map<String, Object>> columnToStats,
+                                          boolean consistentLogicalTimestampEnabled) {
+    if (!(record instanceof GenericRecord)) {
+      throw new HoodieIOException("Record is not a generic type to get column range metadata!");
+    }
+
+    schema.getFields().forEach(field -> {
+      Map<String, Object> columnStats = columnToStats.getOrDefault(field.name(), new HashMap<>());
+      final String fieldVal = getNestedFieldValAsString((GenericRecord) record, field.name(), true, consistentLogicalTimestampEnabled);
+      // update stats
+      final int fieldSize = fieldVal == null ? 0 : fieldVal.length();
+      columnStats.put(TOTAL_SIZE, Long.parseLong(columnStats.getOrDefault(TOTAL_SIZE, 0).toString()) + fieldSize);
+      columnStats.put(TOTAL_UNCOMPRESSED_SIZE, Long.parseLong(columnStats.getOrDefault(TOTAL_UNCOMPRESSED_SIZE, 0).toString()) + fieldSize);
+
+      if (!StringUtils.isNullOrEmpty(fieldVal)) {
+        // set the min value of the field
+        if (!columnStats.containsKey(MIN)) {
+          columnStats.put(MIN, fieldVal);
+        }
+        if (fieldVal.compareTo(String.valueOf(columnStats.get(MIN))) < 0) {

Review comment:
       We can't compare values as strings this is incorrect ("12" < "2")

##########
File path: hudi-common/src/main/java/org/apache/hudi/metadata/HoodieTableMetadataUtil.java
##########
@@ -329,14 +332,16 @@ public static void deleteMetadataTable(String basePath, HoodieEngineContext cont
   /**
    * Convert clean metadata to bloom filter index records.
    *
-   * @param cleanMetadata - Clean action metadata
-   * @param engineContext - Engine context
-   * @param instantTime   - Clean action instant time
+   * @param cleanMetadata           - Clean action metadata
+   * @param engineContext           - Engine context
+   * @param instantTime             - Clean action instant time
+   * @param recordsGenerationParams - Parameters for bloom filter record generation
    * @return List of bloom filter index records for the clean metadata
    */
-  public static List<HoodieRecord> convertMetadataToBloomFilterRecords(HoodieCleanMetadata cleanMetadata,
-                                                                       HoodieEngineContext engineContext,
-                                                                       String instantTime) {
+  public static HoodieData<HoodieRecord> convertMetadataToBloomFilterRecords(HoodieCleanMetadata cleanMetadata,

Review comment:
       nit: There's general convention that "context" objects are usually passed as first arg

##########
File path: hudi-common/src/main/java/org/apache/hudi/metadata/MetadataRecordsGenerationParams.java
##########
@@ -0,0 +1,72 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.hudi.metadata;
+
+import org.apache.hudi.common.table.HoodieTableMetaClient;
+
+import java.io.Serializable;
+import java.util.List;
+
+/**
+ * Encapsulates all parameters required to generate metadata index for enabled index types.
+ */
+public class MetadataRecordsGenerationParams implements Serializable {
+
+  private final HoodieTableMetaClient dataMetaClient;

Review comment:
       BTW, i see it as `Serializable`, how are we serializing the `metaClient`?

##########
File path: hudi-common/src/main/java/org/apache/hudi/metadata/HoodieTableMetadataUtil.java
##########
@@ -329,14 +332,16 @@ public static void deleteMetadataTable(String basePath, HoodieEngineContext cont
   /**
    * Convert clean metadata to bloom filter index records.
    *
-   * @param cleanMetadata - Clean action metadata
-   * @param engineContext - Engine context
-   * @param instantTime   - Clean action instant time
+   * @param cleanMetadata           - Clean action metadata
+   * @param engineContext           - Engine context
+   * @param instantTime             - Clean action instant time
+   * @param recordsGenerationParams - Parameters for bloom filter record generation
    * @return List of bloom filter index records for the clean metadata
    */
-  public static List<HoodieRecord> convertMetadataToBloomFilterRecords(HoodieCleanMetadata cleanMetadata,
-                                                                       HoodieEngineContext engineContext,
-                                                                       String instantTime) {
+  public static HoodieData<HoodieRecord> convertMetadataToBloomFilterRecords(HoodieCleanMetadata cleanMetadata,

Review comment:
       Just FYI, no need to fix this 

##########
File path: hudi-common/src/main/java/org/apache/hudi/metadata/HoodieTableMetadataUtil.java
##########
@@ -941,4 +978,72 @@ public static int getPartitionFileGroupCount(final MetadataPartitionType partiti
     }
   }
 
+  /**
+   * Accumulates column range metadata for the given field and updates the column range map.
+   *
+   * @param field          - column for which statistics will be computed
+   * @param filePath       - data file path
+   * @param columnRangeMap - old column range statistics, which will be merged in this computation
+   * @param columnToStats  - map of column to map of each stat and its value
+   */
+  public static void accumulateColumnRanges(Schema.Field field, String filePath,

Review comment:
       Can we unify both of these methods into one?

##########
File path: hudi-common/src/main/java/org/apache/hudi/metadata/HoodieTableMetadataUtil.java
##########
@@ -867,41 +889,56 @@ public static HoodieTableFileSystemView getFileSystemView(HoodieTableMetaClient
     }
   }
 
-  private static List<String> getLatestColumns(HoodieTableMetaClient datasetMetaClient) {
-    return getLatestColumns(datasetMetaClient, false);
+  public static HoodieMetadataColumnStats mergeColumnStats(HoodieMetadataColumnStats oldColumnStats, HoodieMetadataColumnStats newColumnStats) {
+    ValidationUtils.checkArgument(oldColumnStats.getFileName().equals(newColumnStats.getFileName()));
+    if (newColumnStats.getIsDeleted()) {
+      return newColumnStats;
+    }
+    return HoodieMetadataColumnStats.newBuilder()
+        .setFileName(newColumnStats.getFileName())
+        .setMinValue(Stream.of(oldColumnStats.getMinValue(), newColumnStats.getMinValue()).filter(Objects::nonNull).min(Comparator.naturalOrder()).orElse(null))
+        .setMaxValue(Stream.of(oldColumnStats.getMinValue(), newColumnStats.getMinValue()).filter(Objects::nonNull).max(Comparator.naturalOrder()).orElse(null))
+        .setValueCount(oldColumnStats.getValueCount() + newColumnStats.getValueCount())
+        .setNullCount(oldColumnStats.getNullCount() + newColumnStats.getNullCount())
+        .setTotalSize(oldColumnStats.getTotalSize() + newColumnStats.getTotalSize())
+        .setTotalUncompressedSize(oldColumnStats.getTotalUncompressedSize() + newColumnStats.getTotalUncompressedSize())
+        .setIsDeleted(newColumnStats.getIsDeleted())
+        .build();
   }
 
   public static Stream<HoodieRecord> translateWriteStatToColumnStats(HoodieWriteStat writeStat,
                                                                      HoodieTableMetaClient datasetMetaClient,
-                                                                     List<String> latestColumns) {
-    return getColumnStats(writeStat.getPartitionPath(), writeStat.getPath(), datasetMetaClient, latestColumns, false);
-
+                                                                     List<String> columnsToIndex) {
+    if (writeStat instanceof HoodieDeltaWriteStat && ((HoodieDeltaWriteStat) writeStat).getRecordsStats().isPresent()) {
+      Map<String, HoodieColumnRangeMetadata<Comparable>> columnRangeMap = ((HoodieDeltaWriteStat) writeStat).getRecordsStats().get().getStats();
+      List<HoodieColumnRangeMetadata<Comparable>> columnRangeMetadataList = new ArrayList<>(columnRangeMap.values());
+      return HoodieMetadataPayload.createColumnStatsRecords(writeStat.getPartitionPath(), columnRangeMetadataList, false);
+    }
+    return getColumnStats(writeStat.getPartitionPath(), writeStat.getPath(), datasetMetaClient, columnsToIndex,false);
   }
 
   private static Stream<HoodieRecord> getColumnStats(final String partitionPath, final String filePathWithPartition,
                                                      HoodieTableMetaClient datasetMetaClient,
-                                                     List<String> columns, boolean isDeleted) {
-    final String partition = partitionPath.equals(EMPTY_PARTITION_NAME) ? NON_PARTITIONED_NAME : partitionPath;
+                                                     List<String> columnsToIndex,
+                                                     boolean isDeleted) {
+    final String partition = getPartition(partitionPath);
     final int offset = partition.equals(NON_PARTITIONED_NAME) ? (filePathWithPartition.startsWith("/") ? 1 : 0)
         : partition.length() + 1;
     final String fileName = filePathWithPartition.substring(offset);
-    if (!FSUtils.isBaseFile(new Path(fileName))) {
-      return Stream.empty();
-    }
 
     if (filePathWithPartition.endsWith(HoodieFileFormat.PARQUET.getFileExtension())) {
       List<HoodieColumnRangeMetadata<Comparable>> columnRangeMetadataList = new ArrayList<>();
       final Path fullFilePath = new Path(datasetMetaClient.getBasePath(), filePathWithPartition);
       if (!isDeleted) {

Review comment:
       Deleted files handling is invariant of the file format, right?

##########
File path: hudi-common/src/main/java/org/apache/hudi/metadata/HoodieTableMetadataUtil.java
##########
@@ -941,4 +978,72 @@ public static int getPartitionFileGroupCount(final MetadataPartitionType partiti
     }
   }
 
+  /**
+   * Accumulates column range metadata for the given field and updates the column range map.
+   *
+   * @param field          - column for which statistics will be computed
+   * @param filePath       - data file path
+   * @param columnRangeMap - old column range statistics, which will be merged in this computation
+   * @param columnToStats  - map of column to map of each stat and its value
+   */
+  public static void accumulateColumnRanges(Schema.Field field, String filePath,
+                                            Map<String, HoodieColumnRangeMetadata<Comparable>> columnRangeMap,
+                                            Map<String, Map<String, Object>> columnToStats) {
+    Map<String, Object> columnStats = columnToStats.get(field.name());
+    HoodieColumnRangeMetadata<Comparable> columnRangeMetadata = new HoodieColumnRangeMetadata<>(
+        filePath,
+        field.name(),
+        String.valueOf(columnStats.get(MIN)),
+        String.valueOf(columnStats.get(MAX)),
+        Long.parseLong(columnStats.getOrDefault(NULL_COUNT, 0).toString()),
+        Long.parseLong(columnStats.getOrDefault(VALUE_COUNT, 0).toString()),
+        Long.parseLong(columnStats.getOrDefault(TOTAL_SIZE, 0).toString()),
+        Long.parseLong(columnStats.getOrDefault(TOTAL_UNCOMPRESSED_SIZE, 0).toString())
+    );
+    columnRangeMap.merge(field.name(), columnRangeMetadata, COLUMN_RANGE_MERGE_FUNCTION);
+  }
+
+  /**
+   * Aggregates column stats for each field.
+   *
+   * @param record                            - current record
+   * @param schema                            - write schema
+   * @param columnToStats                     - map of column to map of each stat and its value which gets updates in this method
+   * @param consistentLogicalTimestampEnabled - flag to deal with logical timestamp type when getting column value
+   */
+  public static void aggregateColumnStats(IndexedRecord record, Schema schema,
+                                          Map<String, Map<String, Object>> columnToStats,
+                                          boolean consistentLogicalTimestampEnabled) {
+    if (!(record instanceof GenericRecord)) {
+      throw new HoodieIOException("Record is not a generic type to get column range metadata!");
+    }
+
+    schema.getFields().forEach(field -> {
+      Map<String, Object> columnStats = columnToStats.getOrDefault(field.name(), new HashMap<>());
+      final String fieldVal = getNestedFieldValAsString((GenericRecord) record, field.name(), true, consistentLogicalTimestampEnabled);
+      // update stats
+      final int fieldSize = fieldVal == null ? 0 : fieldVal.length();
+      columnStats.put(TOTAL_SIZE, Long.parseLong(columnStats.getOrDefault(TOTAL_SIZE, 0).toString()) + fieldSize);
+      columnStats.put(TOTAL_UNCOMPRESSED_SIZE, Long.parseLong(columnStats.getOrDefault(TOTAL_UNCOMPRESSED_SIZE, 0).toString()) + fieldSize);
+
+      if (!StringUtils.isNullOrEmpty(fieldVal)) {
+        // set the min value of the field
+        if (!columnStats.containsKey(MIN)) {
+          columnStats.put(MIN, fieldVal);
+        }
+        if (fieldVal.compareTo(String.valueOf(columnStats.get(MIN))) < 0) {
+          columnStats.put(MIN, fieldVal);
+        }
+        // set the max value of the field
+        if (fieldVal.compareTo(String.valueOf(columnStats.getOrDefault(MAX, ""))) > 0) {
+          columnStats.put(MAX, fieldVal);

Review comment:
       We don't need Map for that, right? Let's instead create mutable object with all the statistics that we're collecting:
   
   ```
   class FileColumnStats {
     Object min, max;
     long count, totalSize;
     // ...
   }
   ```

##########
File path: hudi-common/src/main/java/org/apache/hudi/metadata/HoodieTableMetadataUtil.java
##########
@@ -941,4 +978,72 @@ public static int getPartitionFileGroupCount(final MetadataPartitionType partiti
     }
   }
 
+  /**
+   * Accumulates column range metadata for the given field and updates the column range map.
+   *
+   * @param field          - column for which statistics will be computed
+   * @param filePath       - data file path
+   * @param columnRangeMap - old column range statistics, which will be merged in this computation
+   * @param columnToStats  - map of column to map of each stat and its value
+   */
+  public static void accumulateColumnRanges(Schema.Field field, String filePath,
+                                            Map<String, HoodieColumnRangeMetadata<Comparable>> columnRangeMap,
+                                            Map<String, Map<String, Object>> columnToStats) {
+    Map<String, Object> columnStats = columnToStats.get(field.name());
+    HoodieColumnRangeMetadata<Comparable> columnRangeMetadata = new HoodieColumnRangeMetadata<>(
+        filePath,
+        field.name(),
+        String.valueOf(columnStats.get(MIN)),
+        String.valueOf(columnStats.get(MAX)),
+        Long.parseLong(columnStats.getOrDefault(NULL_COUNT, 0).toString()),
+        Long.parseLong(columnStats.getOrDefault(VALUE_COUNT, 0).toString()),
+        Long.parseLong(columnStats.getOrDefault(TOTAL_SIZE, 0).toString()),
+        Long.parseLong(columnStats.getOrDefault(TOTAL_UNCOMPRESSED_SIZE, 0).toString())
+    );
+    columnRangeMap.merge(field.name(), columnRangeMetadata, COLUMN_RANGE_MERGE_FUNCTION);
+  }
+
+  /**
+   * Aggregates column stats for each field.
+   *
+   * @param record                            - current record
+   * @param schema                            - write schema
+   * @param columnToStats                     - map of column to map of each stat and its value which gets updates in this method
+   * @param consistentLogicalTimestampEnabled - flag to deal with logical timestamp type when getting column value
+   */
+  public static void aggregateColumnStats(IndexedRecord record, Schema schema,
+                                          Map<String, Map<String, Object>> columnToStats,
+                                          boolean consistentLogicalTimestampEnabled) {
+    if (!(record instanceof GenericRecord)) {
+      throw new HoodieIOException("Record is not a generic type to get column range metadata!");
+    }
+
+    schema.getFields().forEach(field -> {
+      Map<String, Object> columnStats = columnToStats.getOrDefault(field.name(), new HashMap<>());

Review comment:
       Please avoid such `HashMap` allocations, since this is just churning objects 

##########
File path: hudi-common/src/main/java/org/apache/hudi/common/model/HoodieDeltaWriteStat.java
##########
@@ -69,4 +73,24 @@ public void addLogFiles(String logFile) {
   public List<String> getLogFiles() {
     return logFiles;
   }
+
+  public void setRecordsStats(RecordsStats<? extends Map> stats) {
+    recordsStats = Option.of(stats);
+  }
+
+  public Option<RecordsStats<? extends Map>> getRecordsStats() {
+    return recordsStats;
+  }
+
+  public static class RecordsStats<T> implements Serializable {

Review comment:
       @codope i'm concerned about it as an abstraction that isn't bringing much value, while increasing complexity: It adds cognitive load to understand what it does for anybody interacting with it.
   
   In general, i'd suggest to follow the principle to _keep things as simple as possible, but no simpler than needed to solve the problem_. It helps on many fronts:
   
   1. Makes the code easier to comprehend
   2. Makes component evolution easier (the simpler things are, the easier it is to evolve them)
   3. Makes component age better: if things change and we need to refactor it -- the simpler the system is, the easier the refactoring will be
   

##########
File path: hudi-client/hudi-client-common/src/main/java/org/apache/hudi/metadata/HoodieBackedTableMetadataWriter.java
##########
@@ -651,6 +641,14 @@ private void initializeFileGroups(HoodieTableMetaClient dataMetaClient, Metadata
     }
   }
 
+  private MetadataRecordsGenerationParams getRecordsGenerationParams() {
+    return new MetadataRecordsGenerationParams(

Review comment:
       BTW, why do we even need this component if we can just get all of this from the Writer Config?

##########
File path: hudi-common/src/main/java/org/apache/hudi/metadata/HoodieTableMetadataUtil.java
##########
@@ -941,4 +978,72 @@ public static int getPartitionFileGroupCount(final MetadataPartitionType partiti
     }
   }
 
+  /**
+   * Accumulates column range metadata for the given field and updates the column range map.
+   *
+   * @param field          - column for which statistics will be computed
+   * @param filePath       - data file path
+   * @param columnRangeMap - old column range statistics, which will be merged in this computation
+   * @param columnToStats  - map of column to map of each stat and its value
+   */
+  public static void accumulateColumnRanges(Schema.Field field, String filePath,
+                                            Map<String, HoodieColumnRangeMetadata<Comparable>> columnRangeMap,
+                                            Map<String, Map<String, Object>> columnToStats) {
+    Map<String, Object> columnStats = columnToStats.get(field.name());
+    HoodieColumnRangeMetadata<Comparable> columnRangeMetadata = new HoodieColumnRangeMetadata<>(
+        filePath,
+        field.name(),
+        String.valueOf(columnStats.get(MIN)),
+        String.valueOf(columnStats.get(MAX)),
+        Long.parseLong(columnStats.getOrDefault(NULL_COUNT, 0).toString()),
+        Long.parseLong(columnStats.getOrDefault(VALUE_COUNT, 0).toString()),
+        Long.parseLong(columnStats.getOrDefault(TOTAL_SIZE, 0).toString()),
+        Long.parseLong(columnStats.getOrDefault(TOTAL_UNCOMPRESSED_SIZE, 0).toString())
+    );
+    columnRangeMap.merge(field.name(), columnRangeMetadata, COLUMN_RANGE_MERGE_FUNCTION);
+  }
+
+  /**
+   * Aggregates column stats for each field.
+   *
+   * @param record                            - current record
+   * @param schema                            - write schema
+   * @param columnToStats                     - map of column to map of each stat and its value which gets updates in this method
+   * @param consistentLogicalTimestampEnabled - flag to deal with logical timestamp type when getting column value
+   */
+  public static void aggregateColumnStats(IndexedRecord record, Schema schema,
+                                          Map<String, Map<String, Object>> columnToStats,
+                                          boolean consistentLogicalTimestampEnabled) {
+    if (!(record instanceof GenericRecord)) {
+      throw new HoodieIOException("Record is not a generic type to get column range metadata!");
+    }
+
+    schema.getFields().forEach(field -> {
+      Map<String, Object> columnStats = columnToStats.getOrDefault(field.name(), new HashMap<>());
+      final String fieldVal = getNestedFieldValAsString((GenericRecord) record, field.name(), true, consistentLogicalTimestampEnabled);
+      // update stats
+      final int fieldSize = fieldVal == null ? 0 : fieldVal.length();
+      columnStats.put(TOTAL_SIZE, Long.parseLong(columnStats.getOrDefault(TOTAL_SIZE, 0).toString()) + fieldSize);

Review comment:
       Why do we need to `parseLong` every time?

##########
File path: hudi-common/src/main/java/org/apache/hudi/metadata/HoodieTableMetadataUtil.java
##########
@@ -867,41 +889,56 @@ public static HoodieTableFileSystemView getFileSystemView(HoodieTableMetaClient
     }
   }
 
-  private static List<String> getLatestColumns(HoodieTableMetaClient datasetMetaClient) {
-    return getLatestColumns(datasetMetaClient, false);
+  public static HoodieMetadataColumnStats mergeColumnStats(HoodieMetadataColumnStats oldColumnStats, HoodieMetadataColumnStats newColumnStats) {
+    ValidationUtils.checkArgument(oldColumnStats.getFileName().equals(newColumnStats.getFileName()));
+    if (newColumnStats.getIsDeleted()) {

Review comment:
       We need to handle inverse case as well -- when existing records is a deleted one, otherwise we will merge incorrectly

##########
File path: hudi-common/src/main/java/org/apache/hudi/metadata/HoodieTableMetadataUtil.java
##########
@@ -941,4 +978,72 @@ public static int getPartitionFileGroupCount(final MetadataPartitionType partiti
     }
   }
 
+  /**
+   * Accumulates column range metadata for the given field and updates the column range map.
+   *
+   * @param field          - column for which statistics will be computed
+   * @param filePath       - data file path
+   * @param columnRangeMap - old column range statistics, which will be merged in this computation
+   * @param columnToStats  - map of column to map of each stat and its value
+   */
+  public static void accumulateColumnRanges(Schema.Field field, String filePath,
+                                            Map<String, HoodieColumnRangeMetadata<Comparable>> columnRangeMap,
+                                            Map<String, Map<String, Object>> columnToStats) {
+    Map<String, Object> columnStats = columnToStats.get(field.name());
+    HoodieColumnRangeMetadata<Comparable> columnRangeMetadata = new HoodieColumnRangeMetadata<>(
+        filePath,
+        field.name(),
+        String.valueOf(columnStats.get(MIN)),
+        String.valueOf(columnStats.get(MAX)),
+        Long.parseLong(columnStats.getOrDefault(NULL_COUNT, 0).toString()),
+        Long.parseLong(columnStats.getOrDefault(VALUE_COUNT, 0).toString()),
+        Long.parseLong(columnStats.getOrDefault(TOTAL_SIZE, 0).toString()),
+        Long.parseLong(columnStats.getOrDefault(TOTAL_UNCOMPRESSED_SIZE, 0).toString())
+    );
+    columnRangeMap.merge(field.name(), columnRangeMetadata, COLUMN_RANGE_MERGE_FUNCTION);
+  }
+
+  /**
+   * Aggregates column stats for each field.
+   *
+   * @param record                            - current record
+   * @param schema                            - write schema
+   * @param columnToStats                     - map of column to map of each stat and its value which gets updates in this method
+   * @param consistentLogicalTimestampEnabled - flag to deal with logical timestamp type when getting column value
+   */
+  public static void aggregateColumnStats(IndexedRecord record, Schema schema,
+                                          Map<String, Map<String, Object>> columnToStats,
+                                          boolean consistentLogicalTimestampEnabled) {
+    if (!(record instanceof GenericRecord)) {
+      throw new HoodieIOException("Record is not a generic type to get column range metadata!");
+    }
+
+    schema.getFields().forEach(field -> {
+      Map<String, Object> columnStats = columnToStats.getOrDefault(field.name(), new HashMap<>());
+      final String fieldVal = getNestedFieldValAsString((GenericRecord) record, field.name(), true, consistentLogicalTimestampEnabled);
+      // update stats
+      final int fieldSize = fieldVal == null ? 0 : fieldVal.length();
+      columnStats.put(TOTAL_SIZE, Long.parseLong(columnStats.getOrDefault(TOTAL_SIZE, 0).toString()) + fieldSize);
+      columnStats.put(TOTAL_UNCOMPRESSED_SIZE, Long.parseLong(columnStats.getOrDefault(TOTAL_UNCOMPRESSED_SIZE, 0).toString()) + fieldSize);
+
+      if (!StringUtils.isNullOrEmpty(fieldVal)) {
+        // set the min value of the field
+        if (!columnStats.containsKey(MIN)) {
+          columnStats.put(MIN, fieldVal);
+        }
+        if (fieldVal.compareTo(String.valueOf(columnStats.get(MIN))) < 0) {

Review comment:
       We can leverage Parquet's comparators for that




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@hudi.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org