You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hudi.apache.org by "lanyuanxiaoyao (Jira)" <ji...@apache.org> on 2022/04/30 08:30:00 UTC
[jira] [Updated] (HUDI-4003) Flink offline compaction may cause NPE when log file only contain delete opereation
[ https://issues.apache.org/jira/browse/HUDI-4003?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ]
lanyuanxiaoyao updated HUDI-4003:
---------------------------------
Summary: Flink offline compaction may cause NPE when log file only contain delete opereation (was: Flink offline compaction cause NPE when log file is full of delete operation)
> Flink offline compaction may cause NPE when log file only contain delete opereation
> -----------------------------------------------------------------------------------
>
> Key: HUDI-4003
> URL: https://issues.apache.org/jira/browse/HUDI-4003
> Project: Apache Hudi
> Issue Type: Bug
> Components: compaction, flink
> Affects Versions: 0.11.1
> Reporter: lanyuanxiaoyao
> Priority: Major
>
> Environment: Hudi 0.12.0 (Latest master), Flink 1.13.3, JDK 8
> My test:
> # Two partitions: p1, p2
> # Write data that p1 only delete record, p2 only update record
> # Run offline compaction and it cause NPE
> {code:java}
> Exception in thread "main" java.lang.NullPointerException
> at org.apache.parquet.avro.AvroSchemaConverter.convert(AvroSchemaConverter.java:264)
> at org.apache.hudi.common.table.TableSchemaResolver.convertParquetSchemaToAvro(TableSchemaResolver.java:341)
> at org.apache.hudi.common.table.TableSchemaResolver.getTableAvroSchemaFromDataFile(TableSchemaResolver.java:148)
> at org.apache.hudi.util.CompactionUtil.inferChangelogMode(CompactionUtil.java:131)
> at org.apache.hudi.sink.compact.HoodieFlinkCompactor$AsyncCompactionService.<init>(HoodieFlinkCompactor.java:173)
> at com.lanyuanxiaoyao.Compactor.main(Compactor.java:25) {code}
> Reson & Resolution:
> # Flink offline compaction would get schema from latest data file to check '_hoodie_operation' field have set or not (org.apache.hudi.util.CompactionUtil#inferChangelogMode).
> # For MOR table, it may get schema from a log file in random. But if it choose the log file that only contains delete operation, the code will get a NULL as result. (org.apache.hudi.common.table.TableSchemaResolver#readSchemaFromLogFile(org.apache.hadoop.fs.FileSystem, org.apache.hadoop.fs.Path))
> # Finally, it throw NPE when the code want to get the schema name. (org.apache.parquet.avro.AvroSchemaConverter#convert(org.apache.parquet.schema.MessageType))
> {code:java}
> case MERGE_ON_READ:
> // For MOR table, the file has data written may be a parquet file, .log file, orc file or hfile.
> // Determine the file format based on the file name, and then extract schema from it.
> if (instantAndCommitMetadata.isPresent()) {
> HoodieCommitMetadata commitMetadata = instantAndCommitMetadata.get().getRight();
> String filePath = commitMetadata.getFileIdAndFullPaths(metaClient.getBasePath()).values().stream().findAny().get();
> if (filePath.contains(HoodieFileFormat.HOODIE_LOG.getFileExtension())) {
> // this is a log file
> return readSchemaFromLogFile(new Path(filePath));
> } else {
> return readSchemaFromBaseFile(filePath);
> }
> } {code}
> I think that can try another log file to parse schema when it get NULL from a log file.
> My solution is make the code try to scan all the file path to parse schema until success.
> {code:java}
> case MERGE_ON_READ:
> // For MOR table, the file has data written may be a parquet file, .log file, orc file or hfile.
> // Determine the file format based on the file name, and then extract schema from it.
> if (instantAndCommitMetadata.isPresent()) {
> HoodieCommitMetadata commitMetadata = instantAndCommitMetadata.get().getRight();
> Iterator<String> filePaths = commitMetadata.getFileIdAndFullPaths(metaClient.getBasePath()).values().iterator();
> MessageType type = null;
> while (filePaths.hasNext() && type == null) {
> String filePath = filePaths.next();
> if (filePath.contains(HoodieFileFormat.HOODIE_LOG.getFileExtension())) {
> // this is a log file
> type = readSchemaFromLogFile(new Path(filePath));
> } else {
> type = readSchemaFromBaseFile(filePath);
> }
> }
> return type;
> } {code}
--
This message was sent by Atlassian Jira
(v8.20.7#820007)