You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hudi.apache.org by co...@apache.org on 2022/03/24 12:08:08 UTC

[hudi] branch master updated: [HUDI-3684] Fixing NPE in `ParquetUtils` (#5102)

This is an automated email from the ASF dual-hosted git repository.

codope pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/hudi.git


The following commit(s) were added to refs/heads/master by this push:
     new ccc3728  [HUDI-3684] Fixing NPE in `ParquetUtils` (#5102)
ccc3728 is described below

commit ccc3728002533978cb35b3b4c22cb4d0ef087347
Author: Alexey Kudinkin <al...@infinilake.com>
AuthorDate: Thu Mar 24 05:07:38 2022 -0700

    [HUDI-3684] Fixing NPE in `ParquetUtils` (#5102)
    
    * Make sure nulls are properly handled in `HoodieColumnRangeMetadata`
---
 .../common/model/HoodieColumnRangeMetadata.java    | 58 ++++++++++++++++++----
 .../org/apache/hudi/common/util/ParquetUtils.java  | 29 ++++++++---
 .../hudi/metadata/HoodieTableMetadataUtil.java     |  9 ++--
 .../utilities/HoodieMetadataTableValidator.java    |  4 +-
 4 files changed, 75 insertions(+), 25 deletions(-)

diff --git a/hudi-common/src/main/java/org/apache/hudi/common/model/HoodieColumnRangeMetadata.java b/hudi-common/src/main/java/org/apache/hudi/common/model/HoodieColumnRangeMetadata.java
index d098c4f..2afbd19 100644
--- a/hudi-common/src/main/java/org/apache/hudi/common/model/HoodieColumnRangeMetadata.java
+++ b/hudi-common/src/main/java/org/apache/hudi/common/model/HoodieColumnRangeMetadata.java
@@ -18,19 +18,27 @@
 
 package org.apache.hudi.common.model;
 
+import javax.annotation.Nullable;
 import java.io.Serializable;
-import java.util.Arrays;
 import java.util.Comparator;
 import java.util.Objects;
 import java.util.function.BiFunction;
+import java.util.stream.Stream;
 
 /**
- * Hoodie Range metadata.
+ * Hoodie metadata for the column range of data stored in columnar format (like Parquet)
+ *
+ * NOTE: {@link Comparable} is used as raw-type so that we can handle polymorphism, where
+ *        caller apriori is not aware of the type {@link HoodieColumnRangeMetadata} is
+ *        associated with
  */
-public class HoodieColumnRangeMetadata<T> implements Serializable {
+@SuppressWarnings("rawtype")
+public class HoodieColumnRangeMetadata<T extends Comparable> implements Serializable {
   private final String filePath;
   private final String columnName;
+  @Nullable
   private final T minValue;
+  @Nullable
   private final T maxValue;
   private final long nullCount;
   private final long valueCount;
@@ -38,21 +46,30 @@ public class HoodieColumnRangeMetadata<T> implements Serializable {
   private final long totalUncompressedSize;
 
   public static final BiFunction<HoodieColumnRangeMetadata<Comparable>, HoodieColumnRangeMetadata<Comparable>, HoodieColumnRangeMetadata<Comparable>> COLUMN_RANGE_MERGE_FUNCTION =
-      (oldColumnRange, newColumnRange) -> new HoodieColumnRangeMetadata<>(
+      (oldColumnRange, newColumnRange) -> new HoodieColumnRangeMetadata<Comparable>(
           newColumnRange.getFilePath(),
           newColumnRange.getColumnName(),
-          (Comparable) Arrays.asList(oldColumnRange.getMinValue(), newColumnRange.getMinValue())
-              .stream().filter(Objects::nonNull).min(Comparator.naturalOrder()).orElse(null),
-          (Comparable) Arrays.asList(oldColumnRange.getMinValue(), newColumnRange.getMinValue())
-              .stream().filter(Objects::nonNull).max(Comparator.naturalOrder()).orElse(null),
+          (Comparable) Stream.of(oldColumnRange.getMinValue(), newColumnRange.getMinValue())
+              .filter(Objects::nonNull)
+              .min(Comparator.naturalOrder())
+              .orElse(null),
+          (Comparable) Stream.of(oldColumnRange.getMinValue(), newColumnRange.getMinValue())
+              .filter(Objects::nonNull)
+              .max(Comparator.naturalOrder()).orElse(null),
           oldColumnRange.getNullCount() + newColumnRange.getNullCount(),
           oldColumnRange.getValueCount() + newColumnRange.getValueCount(),
           oldColumnRange.getTotalSize() + newColumnRange.getTotalSize(),
           oldColumnRange.getTotalUncompressedSize() + newColumnRange.getTotalUncompressedSize()
       );
 
-  public HoodieColumnRangeMetadata(final String filePath, final String columnName, final T minValue, final T maxValue,
-                                   final long nullCount, long valueCount, long totalSize, long totalUncompressedSize) {
+  private HoodieColumnRangeMetadata(String filePath,
+                                    String columnName,
+                                    @Nullable T minValue,
+                                    @Nullable T maxValue,
+                                    long nullCount,
+                                    long valueCount,
+                                    long totalSize,
+                                    long totalUncompressedSize) {
     this.filePath = filePath;
     this.columnName = columnName;
     this.minValue = minValue;
@@ -71,10 +88,12 @@ public class HoodieColumnRangeMetadata<T> implements Serializable {
     return this.columnName;
   }
 
+  @Nullable
   public T getMinValue() {
     return this.minValue;
   }
 
+  @Nullable
   public T getMaxValue() {
     return this.maxValue;
   }
@@ -133,6 +152,23 @@ public class HoodieColumnRangeMetadata<T> implements Serializable {
         + '}';
   }
 
+  public static <T extends Comparable<T>> HoodieColumnRangeMetadata<T> create(String filePath,
+                                                                              String columnName,
+                                                                              @Nullable T minValue,
+                                                                              @Nullable T maxValue,
+                                                                              long nullCount,
+                                                                              long valueCount,
+                                                                              long totalSize,
+                                                                              long totalUncompressedSize) {
+    return new HoodieColumnRangeMetadata<>(filePath, columnName, minValue, maxValue, nullCount, valueCount, totalSize, totalUncompressedSize);
+  }
+
+  @SuppressWarnings("rawtype")
+  public static HoodieColumnRangeMetadata<Comparable> stub(String filePath,
+                                                           String columnName) {
+    return new HoodieColumnRangeMetadata<>(filePath, columnName, null, null, -1, -1, -1, -1);
+  }
+
   /**
    * Statistics that is collected in {@link org.apache.hudi.metadata.MetadataPartitionType#COLUMN_STATS} index.
    */
@@ -144,6 +180,6 @@ public class HoodieColumnRangeMetadata<T> implements Serializable {
     public static final String TOTAL_SIZE = "total_size";
     public static final String TOTAL_UNCOMPRESSED_SIZE = "total_uncompressed_size";
 
-    private Stats() {  }
+    private Stats() {}
   }
 }
diff --git a/hudi-common/src/main/java/org/apache/hudi/common/util/ParquetUtils.java b/hudi-common/src/main/java/org/apache/hudi/common/util/ParquetUtils.java
index e74f4f7..c0f7aab 100644
--- a/hudi-common/src/main/java/org/apache/hudi/common/util/ParquetUtils.java
+++ b/hudi-common/src/main/java/org/apache/hudi/common/util/ParquetUtils.java
@@ -58,6 +58,7 @@ import java.util.List;
 import java.util.Map;
 import java.util.Set;
 import java.util.function.Function;
+import java.util.stream.Collector;
 import java.util.stream.Collectors;
 import java.util.stream.Stream;
 
@@ -288,18 +289,27 @@ public class ParquetUtils extends BaseFileUtils {
   /**
    * Parse min/max statistics stored in parquet footers for all columns.
    */
+  @SuppressWarnings("rawtype")
   public List<HoodieColumnRangeMetadata<Comparable>> readRangeFromParquetMetadata(
       @Nonnull Configuration conf,
       @Nonnull Path parquetFilePath,
       @Nonnull List<String> cols
   ) {
     ParquetMetadata metadata = readMetadata(conf, parquetFilePath);
+
+    // NOTE: This collector has to have fully specialized generic type params since
+    //       Java 1.8 struggles to infer them
+    Collector<HoodieColumnRangeMetadata<Comparable>, ?, Map<String, List<HoodieColumnRangeMetadata<Comparable>>>> groupingByCollector =
+        Collectors.groupingBy(HoodieColumnRangeMetadata::getColumnName);
+
     // Collect stats from all individual Parquet blocks
-    Map<String, List<HoodieColumnRangeMetadata<Comparable>>> columnToStatsListMap = metadata.getBlocks().stream().sequential()
-            .flatMap(blockMetaData -> blockMetaData.getColumns().stream()
-                    .filter(f -> cols.contains(f.getPath().toDotString()))
+    Map<String, List<HoodieColumnRangeMetadata<Comparable>>> columnToStatsListMap =
+        (Map<String, List<HoodieColumnRangeMetadata<Comparable>>>) metadata.getBlocks().stream().sequential()
+          .flatMap(blockMetaData ->
+              blockMetaData.getColumns().stream()
+                .filter(f -> cols.contains(f.getPath().toDotString()))
                 .map(columnChunkMetaData ->
-                    new HoodieColumnRangeMetadata<Comparable>(
+                    HoodieColumnRangeMetadata.<Comparable>create(
                         parquetFilePath.getName(),
                         columnChunkMetaData.getPath().toDotString(),
                         convertToNativeJavaType(
@@ -312,7 +322,8 @@ public class ParquetUtils extends BaseFileUtils {
                         columnChunkMetaData.getValueCount(),
                         columnChunkMetaData.getTotalSize(),
                         columnChunkMetaData.getTotalUncompressedSize()))
-            ).collect(Collectors.groupingBy(HoodieColumnRangeMetadata::getColumnName));
+          )
+          .collect(groupingByCollector);
 
     // Combine those into file-level statistics
     // NOTE: Inlining this var makes javac (1.8) upset (due to its inability to infer
@@ -360,7 +371,7 @@ public class ParquetUtils extends BaseFileUtils {
       maxValue = one.getMaxValue();
     }
 
-    return new HoodieColumnRangeMetadata<T>(
+    return HoodieColumnRangeMetadata.create(
         one.getFilePath(),
         one.getColumnName(), minValue, maxValue,
         one.getNullCount() + another.getNullCount(),
@@ -369,7 +380,11 @@ public class ParquetUtils extends BaseFileUtils {
         one.getTotalUncompressedSize() + another.getTotalUncompressedSize());
   }
 
-  private static Comparable<?> convertToNativeJavaType(PrimitiveType primitiveType, Comparable val) {
+  private static Comparable<?> convertToNativeJavaType(PrimitiveType primitiveType, Comparable<?> val) {
+    if (val == null) {
+      return null;
+    }
+
     if (primitiveType.getOriginalType() == OriginalType.DECIMAL) {
       return extractDecimal(val, primitiveType.getDecimalMetadata());
     } else if (primitiveType.getOriginalType() == OriginalType.DATE) {
diff --git a/hudi-common/src/main/java/org/apache/hudi/metadata/HoodieTableMetadataUtil.java b/hudi-common/src/main/java/org/apache/hudi/metadata/HoodieTableMetadataUtil.java
index 4390e87..4d6c602 100644
--- a/hudi-common/src/main/java/org/apache/hudi/metadata/HoodieTableMetadataUtil.java
+++ b/hudi-common/src/main/java/org/apache/hudi/metadata/HoodieTableMetadataUtil.java
@@ -960,8 +960,7 @@ public class HoodieTableMetadataUtil {
       } else {
         // TODO we should delete records instead of stubbing them
         columnRangeMetadataList =
-            columnsToIndex.stream().map(entry -> new HoodieColumnRangeMetadata<Comparable>(fileName,
-                    entry, null, null, 0, 0, 0, 0))
+            columnsToIndex.stream().map(entry -> HoodieColumnRangeMetadata.stub(fileName, entry))
                 .collect(Collectors.toList());
       }
       return HoodieMetadataPayload.createColumnStatsRecords(partitionPath, columnRangeMetadataList, isDeleted);
@@ -1012,11 +1011,11 @@ public class HoodieTableMetadataUtil {
                                             Map<String, HoodieColumnRangeMetadata<Comparable>> columnRangeMap,
                                             Map<String, Map<String, Object>> columnToStats) {
     Map<String, Object> columnStats = columnToStats.get(field.name());
-    HoodieColumnRangeMetadata<Comparable> columnRangeMetadata = new HoodieColumnRangeMetadata<>(
+    HoodieColumnRangeMetadata<Comparable> columnRangeMetadata = HoodieColumnRangeMetadata.create(
         filePath,
         field.name(),
-        String.valueOf(columnStats.get(MIN)),
-        String.valueOf(columnStats.get(MAX)),
+        (Comparable) String.valueOf(columnStats.get(MIN)),
+        (Comparable) String.valueOf(columnStats.get(MAX)),
         Long.parseLong(columnStats.getOrDefault(NULL_COUNT, 0).toString()),
         Long.parseLong(columnStats.getOrDefault(VALUE_COUNT, 0).toString()),
         Long.parseLong(columnStats.getOrDefault(TOTAL_SIZE, 0).toString()),
diff --git a/hudi-utilities/src/main/java/org/apache/hudi/utilities/HoodieMetadataTableValidator.java b/hudi-utilities/src/main/java/org/apache/hudi/utilities/HoodieMetadataTableValidator.java
index 832d942..af0c100 100644
--- a/hudi-utilities/src/main/java/org/apache/hudi/utilities/HoodieMetadataTableValidator.java
+++ b/hudi-utilities/src/main/java/org/apache/hudi/utilities/HoodieMetadataTableValidator.java
@@ -780,7 +780,7 @@ public class HoodieMetadataTableValidator implements Serializable {
         return allColumnNameList.stream()
             .flatMap(columnName ->
                 tableMetadata.getColumnStats(partitionFileNameList, columnName).values().stream()
-                    .map(stats -> new HoodieColumnRangeMetadata<>(
+                    .map(stats -> HoodieColumnRangeMetadata.create(
                         stats.getFileName(),
                         columnName,
                         stats.getMinValue(),
@@ -799,7 +799,7 @@ public class HoodieMetadataTableValidator implements Serializable {
                     metaClient.getHadoopConf(),
                     new Path(new Path(metaClient.getBasePath(), partitionPath), filename),
                     allColumnNameList).stream())
-            .map(rangeMetadata -> new HoodieColumnRangeMetadata<String>(
+            .map(rangeMetadata -> HoodieColumnRangeMetadata.create(
                 rangeMetadata.getFilePath(),
                 rangeMetadata.getColumnName(),
                 // Note: here we ignore the type in the validation,