You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hudi.apache.org by fo...@apache.org on 2023/01/04 03:32:48 UTC
[hudi] 38/45: [MINOR] add integrity check of parquet file for HoodieRowDataParquetWriter.
This is an automated email from the ASF dual-hosted git repository.
forwardxu pushed a commit to branch release-0.12.1
in repository https://gitbox.apache.org/repos/asf/hudi.git
commit 0d71705ec2276f6524ab97e765563f6c902f35d9
Author: XuQianJin-Stars <fo...@apache.com>
AuthorDate: Fri Dec 9 12:39:39 2022 +0800
[MINOR] add integrity check of parquet file for HoodieRowDataParquetWriter.
---
.../java/org/apache/hudi/io/HoodieMergeHandle.java | 22 ++++++----------------
.../src/main/java/org/apache/hudi/io/IOUtils.java | 14 ++++++++++++++
.../io/storage/row/HoodieRowDataParquetWriter.java | 2 ++
3 files changed, 22 insertions(+), 16 deletions(-)
diff --git a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/io/HoodieMergeHandle.java b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/io/HoodieMergeHandle.java
index 88db25bac4..c569acdda6 100644
--- a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/io/HoodieMergeHandle.java
+++ b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/io/HoodieMergeHandle.java
@@ -18,6 +18,10 @@
package org.apache.hudi.io;
+import org.apache.avro.Schema;
+import org.apache.avro.generic.GenericRecord;
+import org.apache.avro.generic.IndexedRecord;
+import org.apache.hadoop.fs.Path;
import org.apache.hudi.client.WriteStatus;
import org.apache.hudi.common.engine.TaskContextSupplier;
import org.apache.hudi.common.fs.FSUtils;
@@ -34,7 +38,6 @@ import org.apache.hudi.common.model.IOType;
import org.apache.hudi.common.util.DefaultSizeEstimator;
import org.apache.hudi.common.util.HoodieRecordSizeEstimator;
import org.apache.hudi.common.util.Option;
-import org.apache.hudi.common.util.ParquetUtils;
import org.apache.hudi.common.util.ValidationUtils;
import org.apache.hudi.common.util.collection.ExternalSpillableMap;
import org.apache.hudi.config.HoodieWriteConfig;
@@ -47,27 +50,19 @@ import org.apache.hudi.io.storage.HoodieFileWriter;
import org.apache.hudi.keygen.BaseKeyGenerator;
import org.apache.hudi.keygen.KeyGenUtils;
import org.apache.hudi.table.HoodieTable;
-
-import org.apache.avro.Schema;
-import org.apache.avro.generic.GenericRecord;
-import org.apache.avro.generic.IndexedRecord;
-import org.apache.hadoop.fs.Path;
import org.apache.log4j.LogManager;
import org.apache.log4j.Logger;
import javax.annotation.concurrent.NotThreadSafe;
-
import java.io.IOException;
import java.util.Collections;
import java.util.HashSet;
import java.util.Iterator;
import java.util.List;
-import java.util.NoSuchElementException;
import java.util.Map;
+import java.util.NoSuchElementException;
import java.util.Set;
-import static org.apache.hudi.common.model.HoodieFileFormat.PARQUET;
-
@SuppressWarnings("Duplicates")
/**
* Handle to merge incoming records to those in storage.
@@ -450,12 +445,7 @@ public class HoodieMergeHandle<T extends HoodieRecordPayload, I, K, O> extends H
return;
}
- // Fast verify the integrity of the parquet file.
- // only check the readable of parquet metadata.
- final String extension = FSUtils.getFileExtension(newFilePath.toString());
- if (PARQUET.getFileExtension().equals(extension)) {
- new ParquetUtils().readMetadata(hoodieTable.getHadoopConf(), newFilePath);
- }
+ IOUtils.checkParquetFileVaid(hoodieTable.getHadoopConf(), newFilePath);
long oldNumWrites = 0;
try {
diff --git a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/io/IOUtils.java b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/io/IOUtils.java
index 7636384c3a..b231136ece 100644
--- a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/io/IOUtils.java
+++ b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/io/IOUtils.java
@@ -18,11 +18,16 @@
package org.apache.hudi.io;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.Path;
import org.apache.hudi.common.config.HoodieConfig;
import org.apache.hudi.common.engine.EngineProperty;
import org.apache.hudi.common.engine.TaskContextSupplier;
+import org.apache.hudi.common.fs.FSUtils;
import org.apache.hudi.common.util.Option;
+import org.apache.hudi.common.util.ParquetUtils;
+import static org.apache.hudi.common.model.HoodieFileFormat.PARQUET;
import static org.apache.hudi.config.HoodieMemoryConfig.DEFAULT_MAX_MEMORY_FOR_SPILLABLE_MAP_IN_BYTES;
import static org.apache.hudi.config.HoodieMemoryConfig.DEFAULT_MIN_MEMORY_FOR_SPILLABLE_MAP_IN_BYTES;
import static org.apache.hudi.config.HoodieMemoryConfig.MAX_MEMORY_FOR_COMPACTION;
@@ -72,4 +77,13 @@ public class IOUtils {
String fraction = hoodieConfig.getStringOrDefault(MAX_MEMORY_FRACTION_FOR_COMPACTION);
return getMaxMemoryAllowedForMerge(context, fraction);
}
+
+ public static void checkParquetFileVaid(Configuration hadoopConf, Path filePath) {
+ // Fast verify the integrity of the parquet file.
+ // only check the readable of parquet metadata.
+ final String extension = FSUtils.getFileExtension(filePath.toString());
+ if (PARQUET.getFileExtension().equals(extension)) {
+ new ParquetUtils().readMetadata(hadoopConf, filePath);
+ }
+ }
}
diff --git a/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/io/storage/row/HoodieRowDataParquetWriter.java b/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/io/storage/row/HoodieRowDataParquetWriter.java
index 17b3b6b37c..fd1edaab84 100644
--- a/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/io/storage/row/HoodieRowDataParquetWriter.java
+++ b/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/io/storage/row/HoodieRowDataParquetWriter.java
@@ -20,6 +20,7 @@ package org.apache.hudi.io.storage.row;
import org.apache.hudi.common.fs.FSUtils;
import org.apache.hudi.common.fs.HoodieWrapperFileSystem;
+import org.apache.hudi.io.IOUtils;
import org.apache.hudi.io.storage.HoodieParquetConfig;
import org.apache.flink.table.data.RowData;
@@ -74,5 +75,6 @@ public class HoodieRowDataParquetWriter extends ParquetWriter<RowData>
@Override
public void close() throws IOException {
super.close();
+ IOUtils.checkParquetFileVaid(fs.getConf(), file);
}
}