You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hudi.apache.org by co...@apache.org on 2022/03/24 12:08:08 UTC
[hudi] branch master updated: [HUDI-3684] Fixing NPE in `ParquetUtils` (#5102)
This is an automated email from the ASF dual-hosted git repository.
codope pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/hudi.git
The following commit(s) were added to refs/heads/master by this push:
new ccc3728 [HUDI-3684] Fixing NPE in `ParquetUtils` (#5102)
ccc3728 is described below
commit ccc3728002533978cb35b3b4c22cb4d0ef087347
Author: Alexey Kudinkin <al...@infinilake.com>
AuthorDate: Thu Mar 24 05:07:38 2022 -0700
[HUDI-3684] Fixing NPE in `ParquetUtils` (#5102)
* Make sure nulls are properly handled in `HoodieColumnRangeMetadata`
---
.../common/model/HoodieColumnRangeMetadata.java | 58 ++++++++++++++++++----
.../org/apache/hudi/common/util/ParquetUtils.java | 29 ++++++++---
.../hudi/metadata/HoodieTableMetadataUtil.java | 9 ++--
.../utilities/HoodieMetadataTableValidator.java | 4 +-
4 files changed, 75 insertions(+), 25 deletions(-)
diff --git a/hudi-common/src/main/java/org/apache/hudi/common/model/HoodieColumnRangeMetadata.java b/hudi-common/src/main/java/org/apache/hudi/common/model/HoodieColumnRangeMetadata.java
index d098c4f..2afbd19 100644
--- a/hudi-common/src/main/java/org/apache/hudi/common/model/HoodieColumnRangeMetadata.java
+++ b/hudi-common/src/main/java/org/apache/hudi/common/model/HoodieColumnRangeMetadata.java
@@ -18,19 +18,27 @@
package org.apache.hudi.common.model;
+import javax.annotation.Nullable;
import java.io.Serializable;
-import java.util.Arrays;
import java.util.Comparator;
import java.util.Objects;
import java.util.function.BiFunction;
+import java.util.stream.Stream;
/**
- * Hoodie Range metadata.
+ * Hoodie metadata for the column range of data stored in columnar format (like Parquet)
+ *
+ * NOTE: {@link Comparable} is used as raw-type so that we can handle polymorphism, where
+ * caller apriori is not aware of the type {@link HoodieColumnRangeMetadata} is
+ * associated with
*/
-public class HoodieColumnRangeMetadata<T> implements Serializable {
+@SuppressWarnings("rawtype")
+public class HoodieColumnRangeMetadata<T extends Comparable> implements Serializable {
private final String filePath;
private final String columnName;
+ @Nullable
private final T minValue;
+ @Nullable
private final T maxValue;
private final long nullCount;
private final long valueCount;
@@ -38,21 +46,30 @@ public class HoodieColumnRangeMetadata<T> implements Serializable {
private final long totalUncompressedSize;
public static final BiFunction<HoodieColumnRangeMetadata<Comparable>, HoodieColumnRangeMetadata<Comparable>, HoodieColumnRangeMetadata<Comparable>> COLUMN_RANGE_MERGE_FUNCTION =
- (oldColumnRange, newColumnRange) -> new HoodieColumnRangeMetadata<>(
+ (oldColumnRange, newColumnRange) -> new HoodieColumnRangeMetadata<Comparable>(
newColumnRange.getFilePath(),
newColumnRange.getColumnName(),
- (Comparable) Arrays.asList(oldColumnRange.getMinValue(), newColumnRange.getMinValue())
- .stream().filter(Objects::nonNull).min(Comparator.naturalOrder()).orElse(null),
- (Comparable) Arrays.asList(oldColumnRange.getMinValue(), newColumnRange.getMinValue())
- .stream().filter(Objects::nonNull).max(Comparator.naturalOrder()).orElse(null),
+ (Comparable) Stream.of(oldColumnRange.getMinValue(), newColumnRange.getMinValue())
+ .filter(Objects::nonNull)
+ .min(Comparator.naturalOrder())
+ .orElse(null),
+ (Comparable) Stream.of(oldColumnRange.getMinValue(), newColumnRange.getMinValue())
+ .filter(Objects::nonNull)
+ .max(Comparator.naturalOrder()).orElse(null),
oldColumnRange.getNullCount() + newColumnRange.getNullCount(),
oldColumnRange.getValueCount() + newColumnRange.getValueCount(),
oldColumnRange.getTotalSize() + newColumnRange.getTotalSize(),
oldColumnRange.getTotalUncompressedSize() + newColumnRange.getTotalUncompressedSize()
);
- public HoodieColumnRangeMetadata(final String filePath, final String columnName, final T minValue, final T maxValue,
- final long nullCount, long valueCount, long totalSize, long totalUncompressedSize) {
+ private HoodieColumnRangeMetadata(String filePath,
+ String columnName,
+ @Nullable T minValue,
+ @Nullable T maxValue,
+ long nullCount,
+ long valueCount,
+ long totalSize,
+ long totalUncompressedSize) {
this.filePath = filePath;
this.columnName = columnName;
this.minValue = minValue;
@@ -71,10 +88,12 @@ public class HoodieColumnRangeMetadata<T> implements Serializable {
return this.columnName;
}
+ @Nullable
public T getMinValue() {
return this.minValue;
}
+ @Nullable
public T getMaxValue() {
return this.maxValue;
}
@@ -133,6 +152,23 @@ public class HoodieColumnRangeMetadata<T> implements Serializable {
+ '}';
}
+ public static <T extends Comparable<T>> HoodieColumnRangeMetadata<T> create(String filePath,
+ String columnName,
+ @Nullable T minValue,
+ @Nullable T maxValue,
+ long nullCount,
+ long valueCount,
+ long totalSize,
+ long totalUncompressedSize) {
+ return new HoodieColumnRangeMetadata<>(filePath, columnName, minValue, maxValue, nullCount, valueCount, totalSize, totalUncompressedSize);
+ }
+
+ @SuppressWarnings("rawtype")
+ public static HoodieColumnRangeMetadata<Comparable> stub(String filePath,
+ String columnName) {
+ return new HoodieColumnRangeMetadata<>(filePath, columnName, null, null, -1, -1, -1, -1);
+ }
+
/**
* Statistics that is collected in {@link org.apache.hudi.metadata.MetadataPartitionType#COLUMN_STATS} index.
*/
@@ -144,6 +180,6 @@ public class HoodieColumnRangeMetadata<T> implements Serializable {
public static final String TOTAL_SIZE = "total_size";
public static final String TOTAL_UNCOMPRESSED_SIZE = "total_uncompressed_size";
- private Stats() { }
+ private Stats() {}
}
}
diff --git a/hudi-common/src/main/java/org/apache/hudi/common/util/ParquetUtils.java b/hudi-common/src/main/java/org/apache/hudi/common/util/ParquetUtils.java
index e74f4f7..c0f7aab 100644
--- a/hudi-common/src/main/java/org/apache/hudi/common/util/ParquetUtils.java
+++ b/hudi-common/src/main/java/org/apache/hudi/common/util/ParquetUtils.java
@@ -58,6 +58,7 @@ import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.function.Function;
+import java.util.stream.Collector;
import java.util.stream.Collectors;
import java.util.stream.Stream;
@@ -288,18 +289,27 @@ public class ParquetUtils extends BaseFileUtils {
/**
* Parse min/max statistics stored in parquet footers for all columns.
*/
+ @SuppressWarnings("rawtype")
public List<HoodieColumnRangeMetadata<Comparable>> readRangeFromParquetMetadata(
@Nonnull Configuration conf,
@Nonnull Path parquetFilePath,
@Nonnull List<String> cols
) {
ParquetMetadata metadata = readMetadata(conf, parquetFilePath);
+
+ // NOTE: This collector has to have fully specialized generic type params since
+ // Java 1.8 struggles to infer them
+ Collector<HoodieColumnRangeMetadata<Comparable>, ?, Map<String, List<HoodieColumnRangeMetadata<Comparable>>>> groupingByCollector =
+ Collectors.groupingBy(HoodieColumnRangeMetadata::getColumnName);
+
// Collect stats from all individual Parquet blocks
- Map<String, List<HoodieColumnRangeMetadata<Comparable>>> columnToStatsListMap = metadata.getBlocks().stream().sequential()
- .flatMap(blockMetaData -> blockMetaData.getColumns().stream()
- .filter(f -> cols.contains(f.getPath().toDotString()))
+ Map<String, List<HoodieColumnRangeMetadata<Comparable>>> columnToStatsListMap =
+ (Map<String, List<HoodieColumnRangeMetadata<Comparable>>>) metadata.getBlocks().stream().sequential()
+ .flatMap(blockMetaData ->
+ blockMetaData.getColumns().stream()
+ .filter(f -> cols.contains(f.getPath().toDotString()))
.map(columnChunkMetaData ->
- new HoodieColumnRangeMetadata<Comparable>(
+ HoodieColumnRangeMetadata.<Comparable>create(
parquetFilePath.getName(),
columnChunkMetaData.getPath().toDotString(),
convertToNativeJavaType(
@@ -312,7 +322,8 @@ public class ParquetUtils extends BaseFileUtils {
columnChunkMetaData.getValueCount(),
columnChunkMetaData.getTotalSize(),
columnChunkMetaData.getTotalUncompressedSize()))
- ).collect(Collectors.groupingBy(HoodieColumnRangeMetadata::getColumnName));
+ )
+ .collect(groupingByCollector);
// Combine those into file-level statistics
// NOTE: Inlining this var makes javac (1.8) upset (due to its inability to infer
@@ -360,7 +371,7 @@ public class ParquetUtils extends BaseFileUtils {
maxValue = one.getMaxValue();
}
- return new HoodieColumnRangeMetadata<T>(
+ return HoodieColumnRangeMetadata.create(
one.getFilePath(),
one.getColumnName(), minValue, maxValue,
one.getNullCount() + another.getNullCount(),
@@ -369,7 +380,11 @@ public class ParquetUtils extends BaseFileUtils {
one.getTotalUncompressedSize() + another.getTotalUncompressedSize());
}
- private static Comparable<?> convertToNativeJavaType(PrimitiveType primitiveType, Comparable val) {
+ private static Comparable<?> convertToNativeJavaType(PrimitiveType primitiveType, Comparable<?> val) {
+ if (val == null) {
+ return null;
+ }
+
if (primitiveType.getOriginalType() == OriginalType.DECIMAL) {
return extractDecimal(val, primitiveType.getDecimalMetadata());
} else if (primitiveType.getOriginalType() == OriginalType.DATE) {
diff --git a/hudi-common/src/main/java/org/apache/hudi/metadata/HoodieTableMetadataUtil.java b/hudi-common/src/main/java/org/apache/hudi/metadata/HoodieTableMetadataUtil.java
index 4390e87..4d6c602 100644
--- a/hudi-common/src/main/java/org/apache/hudi/metadata/HoodieTableMetadataUtil.java
+++ b/hudi-common/src/main/java/org/apache/hudi/metadata/HoodieTableMetadataUtil.java
@@ -960,8 +960,7 @@ public class HoodieTableMetadataUtil {
} else {
// TODO we should delete records instead of stubbing them
columnRangeMetadataList =
- columnsToIndex.stream().map(entry -> new HoodieColumnRangeMetadata<Comparable>(fileName,
- entry, null, null, 0, 0, 0, 0))
+ columnsToIndex.stream().map(entry -> HoodieColumnRangeMetadata.stub(fileName, entry))
.collect(Collectors.toList());
}
return HoodieMetadataPayload.createColumnStatsRecords(partitionPath, columnRangeMetadataList, isDeleted);
@@ -1012,11 +1011,11 @@ public class HoodieTableMetadataUtil {
Map<String, HoodieColumnRangeMetadata<Comparable>> columnRangeMap,
Map<String, Map<String, Object>> columnToStats) {
Map<String, Object> columnStats = columnToStats.get(field.name());
- HoodieColumnRangeMetadata<Comparable> columnRangeMetadata = new HoodieColumnRangeMetadata<>(
+ HoodieColumnRangeMetadata<Comparable> columnRangeMetadata = HoodieColumnRangeMetadata.create(
filePath,
field.name(),
- String.valueOf(columnStats.get(MIN)),
- String.valueOf(columnStats.get(MAX)),
+ (Comparable) String.valueOf(columnStats.get(MIN)),
+ (Comparable) String.valueOf(columnStats.get(MAX)),
Long.parseLong(columnStats.getOrDefault(NULL_COUNT, 0).toString()),
Long.parseLong(columnStats.getOrDefault(VALUE_COUNT, 0).toString()),
Long.parseLong(columnStats.getOrDefault(TOTAL_SIZE, 0).toString()),
diff --git a/hudi-utilities/src/main/java/org/apache/hudi/utilities/HoodieMetadataTableValidator.java b/hudi-utilities/src/main/java/org/apache/hudi/utilities/HoodieMetadataTableValidator.java
index 832d942..af0c100 100644
--- a/hudi-utilities/src/main/java/org/apache/hudi/utilities/HoodieMetadataTableValidator.java
+++ b/hudi-utilities/src/main/java/org/apache/hudi/utilities/HoodieMetadataTableValidator.java
@@ -780,7 +780,7 @@ public class HoodieMetadataTableValidator implements Serializable {
return allColumnNameList.stream()
.flatMap(columnName ->
tableMetadata.getColumnStats(partitionFileNameList, columnName).values().stream()
- .map(stats -> new HoodieColumnRangeMetadata<>(
+ .map(stats -> HoodieColumnRangeMetadata.create(
stats.getFileName(),
columnName,
stats.getMinValue(),
@@ -799,7 +799,7 @@ public class HoodieMetadataTableValidator implements Serializable {
metaClient.getHadoopConf(),
new Path(new Path(metaClient.getBasePath(), partitionPath), filename),
allColumnNameList).stream())
- .map(rangeMetadata -> new HoodieColumnRangeMetadata<String>(
+ .map(rangeMetadata -> HoodieColumnRangeMetadata.create(
rangeMetadata.getFilePath(),
rangeMetadata.getColumnName(),
// Note: here we ignore the type in the validation,