You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hudi.apache.org by fo...@apache.org on 2023/01/04 03:32:48 UTC

[hudi] 38/45: [MINOR] add integrity check of parquet file for HoodieRowDataParquetWriter.

This is an automated email from the ASF dual-hosted git repository.

forwardxu pushed a commit to branch release-0.12.1
in repository https://gitbox.apache.org/repos/asf/hudi.git

commit 0d71705ec2276f6524ab97e765563f6c902f35d9
Author: XuQianJin-Stars <fo...@apache.com>
AuthorDate: Fri Dec 9 12:39:39 2022 +0800

    [MINOR] add integrity check of parquet file for HoodieRowDataParquetWriter.
---
 .../java/org/apache/hudi/io/HoodieMergeHandle.java | 22 ++++++----------------
 .../src/main/java/org/apache/hudi/io/IOUtils.java  | 14 ++++++++++++++
 .../io/storage/row/HoodieRowDataParquetWriter.java |  2 ++
 3 files changed, 22 insertions(+), 16 deletions(-)

diff --git a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/io/HoodieMergeHandle.java b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/io/HoodieMergeHandle.java
index 88db25bac4..c569acdda6 100644
--- a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/io/HoodieMergeHandle.java
+++ b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/io/HoodieMergeHandle.java
@@ -18,6 +18,10 @@
 
 package org.apache.hudi.io;
 
+import org.apache.avro.Schema;
+import org.apache.avro.generic.GenericRecord;
+import org.apache.avro.generic.IndexedRecord;
+import org.apache.hadoop.fs.Path;
 import org.apache.hudi.client.WriteStatus;
 import org.apache.hudi.common.engine.TaskContextSupplier;
 import org.apache.hudi.common.fs.FSUtils;
@@ -34,7 +38,6 @@ import org.apache.hudi.common.model.IOType;
 import org.apache.hudi.common.util.DefaultSizeEstimator;
 import org.apache.hudi.common.util.HoodieRecordSizeEstimator;
 import org.apache.hudi.common.util.Option;
-import org.apache.hudi.common.util.ParquetUtils;
 import org.apache.hudi.common.util.ValidationUtils;
 import org.apache.hudi.common.util.collection.ExternalSpillableMap;
 import org.apache.hudi.config.HoodieWriteConfig;
@@ -47,27 +50,19 @@ import org.apache.hudi.io.storage.HoodieFileWriter;
 import org.apache.hudi.keygen.BaseKeyGenerator;
 import org.apache.hudi.keygen.KeyGenUtils;
 import org.apache.hudi.table.HoodieTable;
-
-import org.apache.avro.Schema;
-import org.apache.avro.generic.GenericRecord;
-import org.apache.avro.generic.IndexedRecord;
-import org.apache.hadoop.fs.Path;
 import org.apache.log4j.LogManager;
 import org.apache.log4j.Logger;
 
 import javax.annotation.concurrent.NotThreadSafe;
-
 import java.io.IOException;
 import java.util.Collections;
 import java.util.HashSet;
 import java.util.Iterator;
 import java.util.List;
-import java.util.NoSuchElementException;
 import java.util.Map;
+import java.util.NoSuchElementException;
 import java.util.Set;
 
-import static org.apache.hudi.common.model.HoodieFileFormat.PARQUET;
-
 @SuppressWarnings("Duplicates")
 /**
  * Handle to merge incoming records to those in storage.
@@ -450,12 +445,7 @@ public class HoodieMergeHandle<T extends HoodieRecordPayload, I, K, O> extends H
       return;
     }
 
-    // Fast verify the integrity of the parquet file.
-    // only check the readable of parquet metadata.
-    final String extension = FSUtils.getFileExtension(newFilePath.toString());
-    if (PARQUET.getFileExtension().equals(extension)) {
-      new ParquetUtils().readMetadata(hoodieTable.getHadoopConf(), newFilePath);
-    }
+    IOUtils.checkParquetFileVaid(hoodieTable.getHadoopConf(), newFilePath);
 
     long oldNumWrites = 0;
     try {
diff --git a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/io/IOUtils.java b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/io/IOUtils.java
index 7636384c3a..b231136ece 100644
--- a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/io/IOUtils.java
+++ b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/io/IOUtils.java
@@ -18,11 +18,16 @@
 
 package org.apache.hudi.io;
 
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.Path;
 import org.apache.hudi.common.config.HoodieConfig;
 import org.apache.hudi.common.engine.EngineProperty;
 import org.apache.hudi.common.engine.TaskContextSupplier;
+import org.apache.hudi.common.fs.FSUtils;
 import org.apache.hudi.common.util.Option;
+import org.apache.hudi.common.util.ParquetUtils;
 
+import static org.apache.hudi.common.model.HoodieFileFormat.PARQUET;
 import static org.apache.hudi.config.HoodieMemoryConfig.DEFAULT_MAX_MEMORY_FOR_SPILLABLE_MAP_IN_BYTES;
 import static org.apache.hudi.config.HoodieMemoryConfig.DEFAULT_MIN_MEMORY_FOR_SPILLABLE_MAP_IN_BYTES;
 import static org.apache.hudi.config.HoodieMemoryConfig.MAX_MEMORY_FOR_COMPACTION;
@@ -72,4 +77,13 @@ public class IOUtils {
     String fraction = hoodieConfig.getStringOrDefault(MAX_MEMORY_FRACTION_FOR_COMPACTION);
     return getMaxMemoryAllowedForMerge(context, fraction);
   }
+
+  public static void checkParquetFileVaid(Configuration hadoopConf, Path filePath) {
+    // Fast verify the integrity of the parquet file.
+    // only check the readable of parquet metadata.
+    final String extension = FSUtils.getFileExtension(filePath.toString());
+    if (PARQUET.getFileExtension().equals(extension)) {
+      new ParquetUtils().readMetadata(hadoopConf, filePath);
+    }
+  }
 }
diff --git a/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/io/storage/row/HoodieRowDataParquetWriter.java b/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/io/storage/row/HoodieRowDataParquetWriter.java
index 17b3b6b37c..fd1edaab84 100644
--- a/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/io/storage/row/HoodieRowDataParquetWriter.java
+++ b/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/io/storage/row/HoodieRowDataParquetWriter.java
@@ -20,6 +20,7 @@ package org.apache.hudi.io.storage.row;
 
 import org.apache.hudi.common.fs.FSUtils;
 import org.apache.hudi.common.fs.HoodieWrapperFileSystem;
+import org.apache.hudi.io.IOUtils;
 import org.apache.hudi.io.storage.HoodieParquetConfig;
 
 import org.apache.flink.table.data.RowData;
@@ -74,5 +75,6 @@ public class HoodieRowDataParquetWriter extends ParquetWriter<RowData>
   @Override
   public void close() throws IOException {
     super.close();
+    IOUtils.checkParquetFileVaid(fs.getConf(), file);
   }
 }