You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hudi.apache.org by "lanyuanxiaoyao (Jira)" <ji...@apache.org> on 2022/04/30 08:25:00 UTC
[jira] [Created] (HUDI-4003) Flink offline compaction cause NPE when log file is full of delete operation
lanyuanxiaoyao created HUDI-4003:
------------------------------------
Summary: Flink offline compaction cause NPE when log file is full of delete operation
Key: HUDI-4003
URL: https://issues.apache.org/jira/browse/HUDI-4003
Project: Apache Hudi
Issue Type: Bug
Components: compaction, flink
Affects Versions: 0.11.1
Reporter: lanyuanxiaoyao
Environment: Hudi 0.12.0 (Latest master), Flink 1.13.3, JDK 8
My test:
# Two partitions: p1, p2
# Write data that p1 only delete record, p2 only update record
# Run offline compaction and it cause NPE
{code:java}
Exception in thread "main" java.lang.NullPointerException
at org.apache.parquet.avro.AvroSchemaConverter.convert(AvroSchemaConverter.java:264)
at org.apache.hudi.common.table.TableSchemaResolver.convertParquetSchemaToAvro(TableSchemaResolver.java:341)
at org.apache.hudi.common.table.TableSchemaResolver.getTableAvroSchemaFromDataFile(TableSchemaResolver.java:148)
at org.apache.hudi.util.CompactionUtil.inferChangelogMode(CompactionUtil.java:131)
at org.apache.hudi.sink.compact.HoodieFlinkCompactor$AsyncCompactionService.<init>(HoodieFlinkCompactor.java:173)
at com.lanyuanxiaoyao.Compactor.main(Compactor.java:25) {code}
Reson & Resolution:
# Flink offline compaction would get schema from latest data file to check '_hoodie_operation' field have set or not (org.apache.hudi.util.CompactionUtil#inferChangelogMode).
# For MOR table, it may get schema from a log file in random. But if it choose the log file that only contains delete operation, the code will get a NULL as result. (org.apache.hudi.common.table.TableSchemaResolver#readSchemaFromLogFile(org.apache.hadoop.fs.FileSystem, org.apache.hadoop.fs.Path))
# Finally, it throw NPE when the code want to get the schema name. (org.apache.parquet.avro.AvroSchemaConverter#convert(org.apache.parquet.schema.MessageType))
{code:java}
case MERGE_ON_READ:
// For MOR table, the file has data written may be a parquet file, .log file, orc file or hfile.
// Determine the file format based on the file name, and then extract schema from it.
if (instantAndCommitMetadata.isPresent()) {
HoodieCommitMetadata commitMetadata = instantAndCommitMetadata.get().getRight();
String filePath = commitMetadata.getFileIdAndFullPaths(metaClient.getBasePath()).values().stream().findAny().get();
if (filePath.contains(HoodieFileFormat.HOODIE_LOG.getFileExtension())) {
// this is a log file
return readSchemaFromLogFile(new Path(filePath));
} else {
return readSchemaFromBaseFile(filePath);
}
} {code}
I think that can try another log file to parse schema when it get NULL from a log file.
My solution is make the code try to scan all the file path to parse schema until success.
{code:java}
case MERGE_ON_READ:
// For MOR table, the file has data written may be a parquet file, .log file, orc file or hfile.
// Determine the file format based on the file name, and then extract schema from it.
if (instantAndCommitMetadata.isPresent()) {
HoodieCommitMetadata commitMetadata = instantAndCommitMetadata.get().getRight();
Iterator<String> filePaths = commitMetadata.getFileIdAndFullPaths(metaClient.getBasePath()).values().iterator();
MessageType type = null;
while (filePaths.hasNext() && type == null) {
String filePath = filePaths.next();
if (filePath.contains(HoodieFileFormat.HOODIE_LOG.getFileExtension())) {
// this is a log file
type = readSchemaFromLogFile(new Path(filePath));
} else {
type = readSchemaFromBaseFile(filePath);
}
}
return type;
} {code}
--
This message was sent by Atlassian Jira
(v8.20.7#820007)