You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hudi.apache.org by da...@apache.org on 2021/08/09 01:15:59 UTC
[hudi] branch master updated: [HUDI-2247] Filter file where length
less than parquet MAGIC length (#3363)
This is an automated email from the ASF dual-hosted git repository.
danny0405 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/hudi.git
The following commit(s) were added to refs/heads/master by this push:
new 11ea749 [HUDI-2247] Filter file where length less than parquet MAGIC length (#3363)
11ea749 is described below
commit 11ea74958d2aadffb3abe757cface5e2835784db
Author: yuzhaojing <32...@users.noreply.github.com>
AuthorDate: Mon Aug 9 09:15:42 2021 +0800
[HUDI-2247] Filter file where length less than parquet MAGIC length (#3363)
Co-authored-by: 喻兆靖 <yu...@bilibili.com>
---
.../hudi/sink/bootstrap/BootstrapFunction.java | 5 +++--
.../sink/partitioner/profile/WriteProfiles.java | 3 ++-
.../java/org/apache/hudi/util/StreamerUtil.java | 24 ++++++++++++++++++++++
3 files changed, 29 insertions(+), 3 deletions(-)
diff --git a/hudi-flink/src/main/java/org/apache/hudi/sink/bootstrap/BootstrapFunction.java b/hudi-flink/src/main/java/org/apache/hudi/sink/bootstrap/BootstrapFunction.java
index 0a4ef30..bcf6c3a 100644
--- a/hudi-flink/src/main/java/org/apache/hudi/sink/bootstrap/BootstrapFunction.java
+++ b/hudi-flink/src/main/java/org/apache/hudi/sink/bootstrap/BootstrapFunction.java
@@ -57,6 +57,7 @@ import java.util.concurrent.TimeUnit;
import java.util.regex.Pattern;
import static java.util.stream.Collectors.toList;
+import static org.apache.hudi.util.StreamerUtil.isValidFile;
/**
* The function to load index from existing hoodieTable.
@@ -179,7 +180,7 @@ public class BootstrapFunction<I, O extends HoodieRecord>
// load parquet records
fileSlice.getBaseFile().ifPresent(baseFile -> {
// filter out crushed files
- if (baseFile.getFileSize() <= 0) {
+ if (!isValidFile(baseFile.getFileStatus())) {
return;
}
@@ -199,7 +200,7 @@ public class BootstrapFunction<I, O extends HoodieRecord>
// load avro log records
List<String> logPaths = fileSlice.getLogFiles()
// filter out crushed files
- .filter(logFile -> logFile.getFileSize() > 0)
+ .filter(logFile -> isValidFile(logFile.getFileStatus()))
.map(logFile -> logFile.getPath().toString())
.collect(toList());
HoodieMergedLogRecordScanner scanner = FormatUtils.scanLog(logPaths, schema, latestCommitTime.get().getTimestamp(),
diff --git a/hudi-flink/src/main/java/org/apache/hudi/sink/partitioner/profile/WriteProfiles.java b/hudi-flink/src/main/java/org/apache/hudi/sink/partitioner/profile/WriteProfiles.java
index 1277b19..1fcf3f1 100644
--- a/hudi-flink/src/main/java/org/apache/hudi/sink/partitioner/profile/WriteProfiles.java
+++ b/hudi-flink/src/main/java/org/apache/hudi/sink/partitioner/profile/WriteProfiles.java
@@ -26,6 +26,7 @@ import org.apache.hudi.common.table.timeline.HoodieTimeline;
import org.apache.hudi.common.util.Option;
import org.apache.hudi.config.HoodieWriteConfig;
import org.apache.hudi.exception.HoodieException;
+import org.apache.hudi.util.StreamerUtil;
import org.apache.flink.core.fs.Path;
import org.apache.hadoop.conf.Configuration;
@@ -131,7 +132,7 @@ public class WriteProfiles {
})
// filter out crushed files
.filter(Objects::nonNull)
- .filter(fileStatus -> fileStatus.getLen() > 0)
+ .filter(StreamerUtil::isValidFile)
.collect(Collectors.toList());
}
diff --git a/hudi-flink/src/main/java/org/apache/hudi/util/StreamerUtil.java b/hudi-flink/src/main/java/org/apache/hudi/util/StreamerUtil.java
index 3d53a07..86de90e 100644
--- a/hudi-flink/src/main/java/org/apache/hudi/util/StreamerUtil.java
+++ b/hudi-flink/src/main/java/org/apache/hudi/util/StreamerUtil.java
@@ -29,6 +29,7 @@ import org.apache.hudi.common.engine.EngineType;
import org.apache.hudi.common.fs.FSUtils;
import org.apache.hudi.common.model.WriteOperationType;
import org.apache.hudi.common.table.HoodieTableMetaClient;
+import org.apache.hudi.common.table.log.HoodieLogFormat;
import org.apache.hudi.common.table.timeline.HoodieActiveTimeline;
import org.apache.hudi.common.util.Option;
import org.apache.hudi.common.util.ReflectionUtils;
@@ -51,9 +52,12 @@ import org.apache.flink.api.common.functions.RuntimeContext;
import org.apache.flink.configuration.ConfigOption;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.util.Preconditions;
+import org.apache.hadoop.fs.FileStatus;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.kafka.clients.consumer.ConsumerConfig;
+import org.apache.orc.OrcFile;
+import org.apache.parquet.hadoop.ParquetFileWriter;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -68,6 +72,9 @@ import java.util.List;
import java.util.Locale;
import java.util.Properties;
+import static org.apache.hudi.common.model.HoodieFileFormat.HOODIE_LOG;
+import static org.apache.hudi.common.model.HoodieFileFormat.ORC;
+import static org.apache.hudi.common.model.HoodieFileFormat.PARQUET;
import static org.apache.hudi.common.table.HoodieTableConfig.HOODIE_ARCHIVELOG_FOLDER_PROP;
/**
@@ -348,6 +355,23 @@ public class StreamerUtil {
}
}
+ public static boolean isValidFile(FileStatus fileStatus) {
+ final String extension = FSUtils.getFileExtension(fileStatus.getPath().toString());
+ if (PARQUET.getFileExtension().equals(extension)) {
+ return fileStatus.getLen() > ParquetFileWriter.MAGIC.length;
+ }
+
+ if (ORC.getFileExtension().equals(extension)) {
+ return fileStatus.getLen() > OrcFile.MAGIC.length();
+ }
+
+ if (HOODIE_LOG.getFileExtension().equals(extension)) {
+ return fileStatus.getLen() > HoodieLogFormat.MAGIC.length;
+ }
+
+ return fileStatus.getLen() > 0;
+ }
+
public static boolean allowDuplicateInserts(Configuration conf) {
WriteOperationType operationType = WriteOperationType.fromValue(conf.getString(FlinkOptions.OPERATION));
return operationType == WriteOperationType.INSERT && !conf.getBoolean(FlinkOptions.INSERT_DEDUP);