You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hudi.apache.org by "Alexey Kudinkin (Jira)" <ji...@apache.org> on 2022/05/04 20:35:00 UTC

[jira] [Updated] (HUDI-4003) Flink offline compaction may cause NPE when log file only contain delete opereation

     [ https://issues.apache.org/jira/browse/HUDI-4003?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ]

Alexey Kudinkin updated HUDI-4003:
----------------------------------
    Affects Version/s: 0.11.0
                           (was: 0.11.1)

> Flink offline compaction may cause NPE when log file only contain delete opereation
> -----------------------------------------------------------------------------------
>
>                 Key: HUDI-4003
>                 URL: https://issues.apache.org/jira/browse/HUDI-4003
>             Project: Apache Hudi
>          Issue Type: Bug
>          Components: compaction, flink
>    Affects Versions: 0.11.0
>            Reporter: lanyuanxiaoyao
>            Priority: Critical
>              Labels: pull-request-available
>
> Environment: Hudi 0.12.0 (Latest master), Flink 1.13.3, JDK 8
> My test:
>  # Two partitions: p1, p2
>  # Write data that p1 only delete record, p2 only update record
>  # Run offline compaction and it cause NPE
> {code:java}
> Exception in thread "main" java.lang.NullPointerException
>     at org.apache.parquet.avro.AvroSchemaConverter.convert(AvroSchemaConverter.java:264)
>     at org.apache.hudi.common.table.TableSchemaResolver.convertParquetSchemaToAvro(TableSchemaResolver.java:341)
>     at org.apache.hudi.common.table.TableSchemaResolver.getTableAvroSchemaFromDataFile(TableSchemaResolver.java:148)
>     at org.apache.hudi.util.CompactionUtil.inferChangelogMode(CompactionUtil.java:131)
>     at org.apache.hudi.sink.compact.HoodieFlinkCompactor$AsyncCompactionService.<init>(HoodieFlinkCompactor.java:173)
>     at com.lanyuanxiaoyao.Compactor.main(Compactor.java:25) {code}
> Reson & Resolution:
>  # Flink offline compaction would get schema from latest data file to check  '_hoodie_operation' field have set or not (org.apache.hudi.util.CompactionUtil#inferChangelogMode).
>  # For MOR table, it may get schema from a log file in random. But if it choose the log file that only contains delete operation, the code will get a NULL as result. (org.apache.hudi.common.table.TableSchemaResolver#readSchemaFromLogFile(org.apache.hadoop.fs.FileSystem, org.apache.hadoop.fs.Path))
>  # Finally, it throw NPE when the code want to get the schema name. (org.apache.parquet.avro.AvroSchemaConverter#convert(org.apache.parquet.schema.MessageType))
> {code:java}
> case MERGE_ON_READ:
>   // For MOR table, the file has data written may be a parquet file, .log file, orc file or hfile.
>   // Determine the file format based on the file name, and then extract schema from it.
>   if (instantAndCommitMetadata.isPresent()) {
>     HoodieCommitMetadata commitMetadata = instantAndCommitMetadata.get().getRight();
>     String filePath = commitMetadata.getFileIdAndFullPaths(metaClient.getBasePath()).values().stream().findAny().get();
>     if (filePath.contains(HoodieFileFormat.HOODIE_LOG.getFileExtension())) {
>       // this is a log file
>       return readSchemaFromLogFile(new Path(filePath));
>     } else {
>       return readSchemaFromBaseFile(filePath);
>     }
>   } {code}
> I think that can try another log file to parse schema when it get NULL from a log file.
> My solution is make the code try to scan all the file path to parse schema until success.
> {code:java}
> case MERGE_ON_READ:
>   // For MOR table, the file has data written may be a parquet file, .log file, orc file or hfile.
>   // Determine the file format based on the file name, and then extract schema from it.
>   if (instantAndCommitMetadata.isPresent()) {
>     HoodieCommitMetadata commitMetadata = instantAndCommitMetadata.get().getRight();
>     Iterator<String> filePaths = commitMetadata.getFileIdAndFullPaths(metaClient.getBasePath()).values().iterator();
>     MessageType type = null;
>     while (filePaths.hasNext() && type == null) {
>       String filePath = filePaths.next();
>       if (filePath.contains(HoodieFileFormat.HOODIE_LOG.getFileExtension())) {
>         // this is a log file
>         type = readSchemaFromLogFile(new Path(filePath));
>       } else {
>         type = readSchemaFromBaseFile(filePath);
>       }
>     }
>     return type;
>   } {code}



--
This message was sent by Atlassian Jira
(v8.20.7#820007)