You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hudi.apache.org by da...@apache.org on 2021/08/09 01:15:59 UTC

[hudi] branch master updated: [HUDI-2247] Filter file where length less than parquet MAGIC length (#3363)

This is an automated email from the ASF dual-hosted git repository.

danny0405 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/hudi.git


The following commit(s) were added to refs/heads/master by this push:
     new 11ea749  [HUDI-2247] Filter file where length less than parquet MAGIC length (#3363)
11ea749 is described below

commit 11ea74958d2aadffb3abe757cface5e2835784db
Author: yuzhaojing <32...@users.noreply.github.com>
AuthorDate: Mon Aug 9 09:15:42 2021 +0800

    [HUDI-2247] Filter file where length less than parquet MAGIC length (#3363)
    
    Co-authored-by: 喻兆靖 <yu...@bilibili.com>
---
 .../hudi/sink/bootstrap/BootstrapFunction.java     |  5 +++--
 .../sink/partitioner/profile/WriteProfiles.java    |  3 ++-
 .../java/org/apache/hudi/util/StreamerUtil.java    | 24 ++++++++++++++++++++++
 3 files changed, 29 insertions(+), 3 deletions(-)

diff --git a/hudi-flink/src/main/java/org/apache/hudi/sink/bootstrap/BootstrapFunction.java b/hudi-flink/src/main/java/org/apache/hudi/sink/bootstrap/BootstrapFunction.java
index 0a4ef30..bcf6c3a 100644
--- a/hudi-flink/src/main/java/org/apache/hudi/sink/bootstrap/BootstrapFunction.java
+++ b/hudi-flink/src/main/java/org/apache/hudi/sink/bootstrap/BootstrapFunction.java
@@ -57,6 +57,7 @@ import java.util.concurrent.TimeUnit;
 import java.util.regex.Pattern;
 
 import static java.util.stream.Collectors.toList;
+import static org.apache.hudi.util.StreamerUtil.isValidFile;
 
 /**
  * The function to load index from existing hoodieTable.
@@ -179,7 +180,7 @@ public class BootstrapFunction<I, O extends HoodieRecord>
         // load parquet records
         fileSlice.getBaseFile().ifPresent(baseFile -> {
           // filter out crushed files
-          if (baseFile.getFileSize() <= 0) {
+          if (!isValidFile(baseFile.getFileStatus())) {
             return;
           }
 
@@ -199,7 +200,7 @@ public class BootstrapFunction<I, O extends HoodieRecord>
         // load avro log records
         List<String> logPaths = fileSlice.getLogFiles()
                 // filter out crushed files
-                .filter(logFile -> logFile.getFileSize() > 0)
+                .filter(logFile -> isValidFile(logFile.getFileStatus()))
                 .map(logFile -> logFile.getPath().toString())
                 .collect(toList());
         HoodieMergedLogRecordScanner scanner = FormatUtils.scanLog(logPaths, schema, latestCommitTime.get().getTimestamp(),
diff --git a/hudi-flink/src/main/java/org/apache/hudi/sink/partitioner/profile/WriteProfiles.java b/hudi-flink/src/main/java/org/apache/hudi/sink/partitioner/profile/WriteProfiles.java
index 1277b19..1fcf3f1 100644
--- a/hudi-flink/src/main/java/org/apache/hudi/sink/partitioner/profile/WriteProfiles.java
+++ b/hudi-flink/src/main/java/org/apache/hudi/sink/partitioner/profile/WriteProfiles.java
@@ -26,6 +26,7 @@ import org.apache.hudi.common.table.timeline.HoodieTimeline;
 import org.apache.hudi.common.util.Option;
 import org.apache.hudi.config.HoodieWriteConfig;
 import org.apache.hudi.exception.HoodieException;
+import org.apache.hudi.util.StreamerUtil;
 
 import org.apache.flink.core.fs.Path;
 import org.apache.hadoop.conf.Configuration;
@@ -131,7 +132,7 @@ public class WriteProfiles {
         })
         // filter out crushed files
         .filter(Objects::nonNull)
-        .filter(fileStatus -> fileStatus.getLen() > 0)
+        .filter(StreamerUtil::isValidFile)
         .collect(Collectors.toList());
   }
 
diff --git a/hudi-flink/src/main/java/org/apache/hudi/util/StreamerUtil.java b/hudi-flink/src/main/java/org/apache/hudi/util/StreamerUtil.java
index 3d53a07..86de90e 100644
--- a/hudi-flink/src/main/java/org/apache/hudi/util/StreamerUtil.java
+++ b/hudi-flink/src/main/java/org/apache/hudi/util/StreamerUtil.java
@@ -29,6 +29,7 @@ import org.apache.hudi.common.engine.EngineType;
 import org.apache.hudi.common.fs.FSUtils;
 import org.apache.hudi.common.model.WriteOperationType;
 import org.apache.hudi.common.table.HoodieTableMetaClient;
+import org.apache.hudi.common.table.log.HoodieLogFormat;
 import org.apache.hudi.common.table.timeline.HoodieActiveTimeline;
 import org.apache.hudi.common.util.Option;
 import org.apache.hudi.common.util.ReflectionUtils;
@@ -51,9 +52,12 @@ import org.apache.flink.api.common.functions.RuntimeContext;
 import org.apache.flink.configuration.ConfigOption;
 import org.apache.flink.configuration.Configuration;
 import org.apache.flink.util.Preconditions;
+import org.apache.hadoop.fs.FileStatus;
 import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.Path;
 import org.apache.kafka.clients.consumer.ConsumerConfig;
+import org.apache.orc.OrcFile;
+import org.apache.parquet.hadoop.ParquetFileWriter;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -68,6 +72,9 @@ import java.util.List;
 import java.util.Locale;
 import java.util.Properties;
 
+import static org.apache.hudi.common.model.HoodieFileFormat.HOODIE_LOG;
+import static org.apache.hudi.common.model.HoodieFileFormat.ORC;
+import static org.apache.hudi.common.model.HoodieFileFormat.PARQUET;
 import static org.apache.hudi.common.table.HoodieTableConfig.HOODIE_ARCHIVELOG_FOLDER_PROP;
 
 /**
@@ -348,6 +355,23 @@ public class StreamerUtil {
     }
   }
 
+  public static boolean isValidFile(FileStatus fileStatus) {
+    final String extension = FSUtils.getFileExtension(fileStatus.getPath().toString());
+    if (PARQUET.getFileExtension().equals(extension)) {
+      return fileStatus.getLen() > ParquetFileWriter.MAGIC.length;
+    }
+
+    if (ORC.getFileExtension().equals(extension)) {
+      return fileStatus.getLen() > OrcFile.MAGIC.length();
+    }
+
+    if (HOODIE_LOG.getFileExtension().equals(extension)) {
+      return fileStatus.getLen() > HoodieLogFormat.MAGIC.length;
+    }
+
+    return fileStatus.getLen() > 0;
+  }
+  
   public static boolean allowDuplicateInserts(Configuration conf) {
     WriteOperationType operationType = WriteOperationType.fromValue(conf.getString(FlinkOptions.OPERATION));
     return operationType == WriteOperationType.INSERT && !conf.getBoolean(FlinkOptions.INSERT_DEDUP);