You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hudi.apache.org by "qian heng (Jira)" <ji...@apache.org> on 2022/02/17 07:38:00 UTC

[jira] [Created] (HUDI-3444) NPE occured when ingesting data to MOR table by flink

qian heng created HUDI-3444:
-------------------------------

             Summary: NPE occured when ingesting data to MOR table by flink
                 Key: HUDI-3444
                 URL: https://issues.apache.org/jira/browse/HUDI-3444
             Project: Apache Hudi
          Issue Type: Bug
          Components: flink
    Affects Versions: 0.10.0
            Reporter: qian heng


**Describe the problem you faced**

NPE occured when ingesting data to MOR table by flink.

Through stacktrace,  we can find the position where NPE occured is when hudi attemped to close HoodeiAppendHandle and write all left data in HoodieAppendHandle to a log file. (The line number may be different as same logs are added)

```

org.apache.hudi.exception.HoodieUpsertException: Failed to upsert for commit time 20220216172134086

 at org.apache.hudi.table.action.commit.FlinkWriteHelper.write(FlinkWriteHelper.java:79)

 at org.apache.hudi.table.action.commit.delta.FlinkUpsertDeltaCommitActionExecutor.execute(FlinkUpsertDeltaCommitActionExecutor.java:49)

 at org.apache.hudi.table.HoodieFlinkMergeOnReadTable.upsert(HoodieFlinkMergeOnReadTable.java:72)

 at org.apache.hudi.client.HoodieFlinkWriteClient.upsert(HoodieFlinkWriteClient.java:167)

 at org.apache.hudi.sink.StreamWriteFunction.lambda$initWriteFunction$6(StreamWriteFunction.java:548)

 at org.apache.hudi.sink.StreamWriteFunction.flushBucket(StreamWriteFunction.java:836)

 at org.apache.hudi.sink.StreamWriteFunction.bufferRecord(StreamWriteFunction.java:788)

 at org.apache.hudi.sink.StreamWriteFunction.processElement(StreamWriteFunction.java:298)

 at org.apache.flink.streaming.api.operators.ProcessOperator.processElement(ProcessOperator.java:66)

 at org.apache.flink.streaming.runtime.tasks.OneInputStreamTask$StreamTaskNetworkOutput.processRecord(OneInputStreamTask.java:264)

 at org.apache.flink.streaming.runtime.tasks.OneInputStreamTask$StreamTaskNetworkOutput.emitRecord(OneInputStreamTask.java:257)

 at org.apache.flink.streaming.runtime.io.AbstractStreamTaskNetworkInput.processElement(AbstractStreamTaskNetworkInput.java:134)

 at org.apache.flink.streaming.runtime.io.AbstractStreamTaskNetworkInput.emitNext(AbstractStreamTaskNetworkInput.java:105)

 at org.apache.flink.streaming.runtime.io.StreamOneInputProcessor.processInput(StreamOneInputProcessor.java:66)

 at org.apache.flink.streaming.runtime.tasks.StreamTask.processInput(StreamTask.java:426)

 at org.apache.flink.streaming.runtime.tasks.mailbox.MailboxProcessor.runMailboxLoop(MailboxProcessor.java:204)

 at org.apache.flink.streaming.runtime.tasks.StreamTask.runMailboxLoop(StreamTask.java:688)

 at org.apache.flink.streaming.runtime.tasks.StreamTask.executeInvoke(StreamTask.java:643)

 at org.apache.flink.streaming.runtime.tasks.StreamTask.runWithCleanUpOnFail(StreamTask.java:654)

 at org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:627)

 at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:831)

 at org.apache.flink.runtime.taskmanager.Task.run(Task.java:612)

 at java.base/java.lang.Thread.run(Thread.java:829)

Caused by: java.lang.RuntimeException: org.apache.hudi.exception.HoodieException: org.apache.hudi.exception.HoodieException: java.util.concurrent.ExecutionException: java.lang.NullPointerException: Cannot invoke "org.apache.hudi.common.table.log.HoodieLogFormat$Writer.getLogFile()" because "this.writer" is null

 at org.apache.hudi.client.utils.LazyIterableIterator.next(LazyIterableIterator.java:121)

 at java.base/java.util.Iterator.forEachRemaining(Iterator.java:133)

 at org.apache.hudi.table.action.commit.BaseFlinkCommitActionExecutor.execute(BaseFlinkCommitActionExecutor.java:114)

 at org.apache.hudi.table.action.commit.BaseFlinkCommitActionExecutor.execute(BaseFlinkCommitActionExecutor.java:70)

 at org.apache.hudi.table.action.commit.FlinkWriteHelper.write(FlinkWriteHelper.java:72)

 ... 22 more

Caused by: org.apache.hudi.exception.HoodieException: org.apache.hudi.exception.HoodieException: java.util.concurrent.ExecutionException: java.lang.NullPointerException: Cannot invoke "org.apache.hudi.common.table.log.HoodieLogFormat$Writer.getLogFile()" because "this.writer" is null

 at org.apache.hudi.execution.FlinkLazyInsertIterable.computeNext(FlinkLazyInsertIterable.java:106)

 at org.apache.hudi.execution.FlinkLazyInsertIterable.computeNext(FlinkLazyInsertIterable.java:43)

 at org.apache.hudi.client.utils.LazyIterableIterator.next(LazyIterableIterator.java:119)

 ... 26 more

Caused by: org.apache.hudi.exception.HoodieException: java.util.concurrent.ExecutionException: java.lang.NullPointerException: Cannot invoke "org.apache.hudi.common.table.log.HoodieLogFormat$Writer.getLogFile()" because "this.writer" is null

 at org.apache.hudi.common.util.queue.BoundedInMemoryExecutor.execute(BoundedInMemoryExecutor.java:188)

 at org.apache.hudi.execution.FlinkLazyInsertIterable.computeNext(FlinkLazyInsertIterable.java:102)

 ... 28 more

Caused by: java.util.concurrent.ExecutionException: java.lang.NullPointerException: Cannot invoke "org.apache.hudi.common.table.log.HoodieLogFormat$Writer.getLogFile()" because "this.writer" is null

 at java.base/java.util.concurrent.FutureTask.report(FutureTask.java:122)

 at java.base/java.util.concurrent.FutureTask.get(FutureTask.java:191)

 at org.apache.hudi.common.util.queue.BoundedInMemoryExecutor.execute(BoundedInMemoryExecutor.java:182)

 ... 29 more

Caused by: java.lang.NullPointerException

 at org.apache.hudi.io.HoodieAppendHandle.appendDataAndDeleteBlocks(HoodieAppendHandle.java:387)

 at org.apache.hudi.io.HoodieAppendHandle.close(HoodieAppendHandle.java:416)

 at org.apache.hudi.io.FlinkAppendHandle.close(FlinkAppendHandle.java:94)

 at org.apache.hudi.execution.CopyOnWriteInsertHandler.consumeOneRecord(CopyOnWriteInsertHandler.java:115)

 at org.apache.hudi.execution.CopyOnWriteInsertHandler.consume(CopyOnWriteInsertHandler.java:80)

 at org.apache.hudi.execution.CopyOnWriteInsertHandler.consume(CopyOnWriteInsertHandler.java:41)

 at org.apache.hudi.common.util.queue.BoundedInMemoryExecutor.lambda$null$2(BoundedInMemoryExecutor.java:162)

 at java.base/java.util.concurrent.FutureTask.run(FutureTask.java:264)

 at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1128)

 at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:628)

 ... 1 more
``` 

Through code, I think this NPE is caused by this postion:
BaseFlinkDeltaCommitActionExecutor.java
```java
 @Override
  public Iterator<List<WriteStatus>> handleInsert(String idPfx, Iterator<HoodieRecord<T>> recordItr) {
    return new FlinkLazyInsertIterable<>(recordItr, true, config, instantTime, table,
        idPfx, taskContextSupplier, new ExplicitWriteHandleFactory(writeHandle));
  }
```

the BaseFlinkDeltaCommitActionExecutor use ExplicitWriteHandleFactory  as writeHandleFactory, which leads to the reuse of the HoodieAppendHandle in CopyOnWriteInsertHandler

```java
    if (!handle.canWrite(payload.record)) {
      // Handle is full. Close the handle and add the WriteStatus
      statuses.addAll(handle.close());
      // Open new handle
      handle = writeHandleFactory.create(config, instantTime, hoodieTable,
          insertPayload.getPartitionPath(), idPrefix, taskContextSupplier);
      handles.put(partitionPath, handle);
    }
    handle.write(insertPayload, payload.insertValue, payload.exception);
  }
```

Thus, if a log file is fullfilled and its handle is closed which means the writer in the HoodieAppendHandle is also cleared. When we use the same HoodieAppendHandle next time will leads to NPE of the writer.

**To Reproduce**

Steps to reproduce the behavior:

1.  use flink to ingest data to a MOR table
2.  set hoodie.parquet.max.file.size to a small value, so the #handle.canWrite(payload.record)# can become false easily, then the handle will be closed
3. when consuming the next record, the NPE will occur

**Expected behavior**

A clear and concise description of what you expected to happen.

**Environment Description**

* Hudi version : 0.10.0

* Flink version : 0.13.1

* Storage (HDFS/S3/GCS..) : hdfs

* Running on Docker? (yes/no) : no





--
This message was sent by Atlassian Jira
(v8.20.1#820001)