You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hudi.apache.org by "lanyuanxiaoyao (Jira)" <ji...@apache.org> on 2022/04/30 08:25:00 UTC

[jira] [Created] (HUDI-4003) Flink offline compaction cause NPE when log file is full of delete operation

lanyuanxiaoyao created HUDI-4003:
------------------------------------

             Summary: Flink offline compaction cause NPE when log file is full of delete operation
                 Key: HUDI-4003
                 URL: https://issues.apache.org/jira/browse/HUDI-4003
             Project: Apache Hudi
          Issue Type: Bug
          Components: compaction, flink
    Affects Versions: 0.11.1
            Reporter: lanyuanxiaoyao


Environment: Hudi 0.12.0 (Latest master), Flink 1.13.3, JDK 8

My test:
 # Two partitions: p1, p2
 # Write data that p1 only delete record, p2 only update record
 # Run offline compaction and it cause NPE

{code:java}
Exception in thread "main" java.lang.NullPointerException
    at org.apache.parquet.avro.AvroSchemaConverter.convert(AvroSchemaConverter.java:264)
    at org.apache.hudi.common.table.TableSchemaResolver.convertParquetSchemaToAvro(TableSchemaResolver.java:341)
    at org.apache.hudi.common.table.TableSchemaResolver.getTableAvroSchemaFromDataFile(TableSchemaResolver.java:148)
    at org.apache.hudi.util.CompactionUtil.inferChangelogMode(CompactionUtil.java:131)
    at org.apache.hudi.sink.compact.HoodieFlinkCompactor$AsyncCompactionService.<init>(HoodieFlinkCompactor.java:173)
    at com.lanyuanxiaoyao.Compactor.main(Compactor.java:25) {code}
Reson & Resolution:
 # Flink offline compaction would get schema from latest data file to check  '_hoodie_operation' field have set or not (org.apache.hudi.util.CompactionUtil#inferChangelogMode).
 # For MOR table, it may get schema from a log file in random. But if it choose the log file that only contains delete operation, the code will get a NULL as result. (org.apache.hudi.common.table.TableSchemaResolver#readSchemaFromLogFile(org.apache.hadoop.fs.FileSystem, org.apache.hadoop.fs.Path))
 # Finally, it throw NPE when the code want to get the schema name. (org.apache.parquet.avro.AvroSchemaConverter#convert(org.apache.parquet.schema.MessageType))

{code:java}
case MERGE_ON_READ:
  // For MOR table, the file has data written may be a parquet file, .log file, orc file or hfile.
  // Determine the file format based on the file name, and then extract schema from it.
  if (instantAndCommitMetadata.isPresent()) {
    HoodieCommitMetadata commitMetadata = instantAndCommitMetadata.get().getRight();
    String filePath = commitMetadata.getFileIdAndFullPaths(metaClient.getBasePath()).values().stream().findAny().get();
    if (filePath.contains(HoodieFileFormat.HOODIE_LOG.getFileExtension())) {
      // this is a log file
      return readSchemaFromLogFile(new Path(filePath));
    } else {
      return readSchemaFromBaseFile(filePath);
    }
  } {code}
I think that can try another log file to parse schema when it get NULL from a log file.

My solution is make the code try to scan all the file path to parse schema until success.
{code:java}
case MERGE_ON_READ:
  // For MOR table, the file has data written may be a parquet file, .log file, orc file or hfile.
  // Determine the file format based on the file name, and then extract schema from it.
  if (instantAndCommitMetadata.isPresent()) {
    HoodieCommitMetadata commitMetadata = instantAndCommitMetadata.get().getRight();
    Iterator<String> filePaths = commitMetadata.getFileIdAndFullPaths(metaClient.getBasePath()).values().iterator();
    MessageType type = null;
    while (filePaths.hasNext() && type == null) {
      String filePath = filePaths.next();
      if (filePath.contains(HoodieFileFormat.HOODIE_LOG.getFileExtension())) {
        // this is a log file
        type = readSchemaFromLogFile(new Path(filePath));
      } else {
        type = readSchemaFromBaseFile(filePath);
      }
    }
    return type;
  } {code}



--
This message was sent by Atlassian Jira
(v8.20.7#820007)