You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@flink.apache.org by "Dawid Wysakowicz (Jira)" <ji...@apache.org> on 2022/04/12 09:31:00 UTC

[jira] [Commented] (FLINK-27202) NullPointerException on stop-with-savepoint with AsyncWaitOperator followed by FlinkKafkaProducer

    [ https://issues.apache.org/jira/browse/FLINK-27202?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17521037#comment-17521037 ] 

Dawid Wysakowicz commented on FLINK-27202:
------------------------------------------

I think it should be fixed in 1.14 already. It is not necessarily fixed by the way we handle stop-with-savepoint per se. It should be fixed by the way we call {{close/(and/or previously dispose)}}. I'd say it should rather be fixed with e.g. FLINK-22972 and other parts of the FLINK-2491 effort. We call close only once the mailbox is closed now.

> NullPointerException on stop-with-savepoint with AsyncWaitOperator followed by FlinkKafkaProducer 
> --------------------------------------------------------------------------------------------------
>
>                 Key: FLINK-27202
>                 URL: https://issues.apache.org/jira/browse/FLINK-27202
>             Project: Flink
>          Issue Type: Bug
>          Components: Connectors / Kafka, Runtime / Task
>    Affects Versions: 1.12.7, 1.13.6
>            Reporter: Piotr Nowojski
>            Priority: Critical
>
> Some lingering mails from {{AsyncWaitOperator}} (or other operators using mailbox, or maybe even processing time timers), that are chained with {{FlinkKafkaProducer}} can cause the following exceptions when using stop-with-savepoint:
> {noformat}
> 2022-04-11 15:46:19,781 INFO  org.apache.flink.runtime.executiongraph.ExecutionGraph       [] - static enrichment -> Map -> Sink: enriched events sink (179/256) (3fefa588ad05fa8d2a10a6ad4a740cc6) switched from RUNNING to FAILED on 10.239.104.67:38149-12df6c @ 10.239.104.67 (dataPort=35745).
> java.lang.NullPointerException: null
> 	at org.apache.flink.streaming.api.functions.sink.TwoPhaseCommitSinkFunction$TransactionHolder.access$000(TwoPhaseCommitSinkFunction.java:591) ~[flink-dist_2.12-1.12.7-stream2.jar:1.12.7-stream2]
> 	at org.apache.flink.streaming.api.functions.sink.TwoPhaseCommitSinkFunction.invoke(TwoPhaseCommitSinkFunction.java:223) ~[flink-dist_2.12-1.12.7-stream2.jar:1.12.7-stream2]
> 	at org.apache.flink.streaming.api.operators.StreamSink.processElement(StreamSink.java:54) ~[flink-dist_2.12-1.12.7-stream2.jar:1.12.7-stream2]
> 	at org.apache.flink.streaming.runtime.tasks.CopyingChainingOutput.pushToOperator(CopyingChainingOutput.java:71) ~[flink-dist_2.12-1.12.7-stream2.jar:1.12.7-stream2]
> 	at org.apache.flink.streaming.runtime.tasks.CopyingChainingOutput.collect(CopyingChainingOutput.java:46) ~[flink-dist_2.12-1.12.7-stream2.jar:1.12.7-stream2]
> 	at org.apache.flink.streaming.runtime.tasks.CopyingChainingOutput.collect(CopyingChainingOutput.java:26) ~[flink-dist_2.12-1.12.7-stream2.jar:1.12.7-stream2]
> 	at org.apache.flink.streaming.api.operators.CountingOutput.collect(CountingOutput.java:50) ~[flink-dist_2.12-1.12.7-stream2.jar:1.12.7-stream2]
> 	at org.apache.flink.streaming.api.operators.CountingOutput.collect(CountingOutput.java:28) ~[flink-dist_2.12-1.12.7-stream2.jar:1.12.7-stream2]
> 	at org.apache.flink.streaming.api.operators.StreamMap.processElement(StreamMap.java:38) ~[flink-dist_2.12-1.12.7-stream2.jar:1.12.7-stream2]
> 	at org.apache.flink.streaming.runtime.tasks.CopyingChainingOutput.pushToOperator(CopyingChainingOutput.java:71) ~[flink-dist_2.12-1.12.7-stream2.jar:1.12.7-stream2]
> 	at org.apache.flink.streaming.runtime.tasks.CopyingChainingOutput.collect(CopyingChainingOutput.java:46) ~[flink-dist_2.12-1.12.7-stream2.jar:1.12.7-stream2]
> 	at org.apache.flink.streaming.runtime.tasks.CopyingChainingOutput.collect(CopyingChainingOutput.java:26) ~[flink-dist_2.12-1.12.7-stream2.jar:1.12.7-stream2]
> 	at org.apache.flink.streaming.api.operators.CountingOutput.collect(CountingOutput.java:50) ~[flink-dist_2.12-1.12.7-stream2.jar:1.12.7-stream2]
> 	at org.apache.flink.streaming.api.operators.CountingOutput.collect(CountingOutput.java:28) ~[flink-dist_2.12-1.12.7-stream2.jar:1.12.7-stream2]
> 	at org.apache.flink.streaming.api.operators.TimestampedCollector.collect(TimestampedCollector.java:50) ~[flink-dist_2.12-1.12.7-stream2.jar:1.12.7-stream2]
> 	at org.apache.flink.streaming.api.operators.async.queue.StreamRecordQueueEntry.emitResult(StreamRecordQueueEntry.java:64) ~[flink-dist_2.12-1.12.7-stream2.jar:1.12.7-stream2]
> 	at org.apache.flink.streaming.api.operators.async.queue.UnorderedStreamElementQueue$Segment.emitCompleted(UnorderedStreamElementQueue.java:272) ~[flink-dist_2.12-1.12.7-stream2.jar:1.12.7-stream2]
> 	at org.apache.flink.streaming.api.operators.async.queue.UnorderedStreamElementQueue.emitCompletedElement(UnorderedStreamElementQueue.java:159) ~[flink-dist_2.12-1.12.7-stream2.jar:1.12.7-stream2]
> 	at org.apache.flink.streaming.api.operators.async.AsyncWaitOperator.outputCompletedElement(AsyncWaitOperator.java:287) ~[flink-dist_2.12-1.12.7-stream2.jar:1.12.7-stream2]
> 	at org.apache.flink.streaming.api.operators.async.AsyncWaitOperator.access$100(AsyncWaitOperator.java:78) ~[flink-dist_2.12-1.12.7-stream2.jar:1.12.7-stream2]
> 	at org.apache.flink.streaming.api.operators.async.AsyncWaitOperator$ResultHandler.processResults(AsyncWaitOperator.java:356) ~[flink-dist_2.12-1.12.7-stream2.jar:1.12.7-stream2]
> 	at org.apache.flink.streaming.api.operators.async.AsyncWaitOperator$ResultHandler.lambda$processInMailbox$0(AsyncWaitOperator.java:337) ~[flink-dist_2.12-1.12.7-stream2.jar:1.12.7-stream2]
> 	at org.apache.flink.streaming.runtime.tasks.StreamTaskActionExecutor$1.runThrowing(StreamTaskActionExecutor.java:50) ~[flink-dist_2.12-1.12.7-stream2.jar:1.12.7-stream2]
> 	at org.apache.flink.streaming.runtime.tasks.mailbox.Mail.run(Mail.java:90) ~[flink-dist_2.12-1.12.7-stream2.jar:1.12.7-stream2]
> 	at org.apache.flink.streaming.runtime.tasks.mailbox.MailboxProcessor.drain(MailboxProcessor.java:170) ~[flink-dist_2.12-1.12.7-stream2.jar:1.12.7-stream2]
> 	at org.apache.flink.streaming.runtime.tasks.StreamTask.afterInvoke(StreamTask.java:647) ~[flink-dist_2.12-1.12.7-stream2.jar:1.12.7-stream2]
> 	at org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:591) ~[flink-dist_2.12-1.12.7-stream2.jar:1.12.7-stream2]
> 	at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:758) ~[flink-dist_2.12-1.12.7-stream2.jar:1.12.7-stream2]
> 	at org.apache.flink.runtime.taskmanager.Task.run(Task.java:573) ~[flink-dist_2.12-1.12.7-stream2.jar:1.12.7-stream2]
> 	at java.lang.Thread.run(Thread.java:829) ~[?:?]
> {noformat}
> This happens since {{FlinkKafkaProducer}} can be closed, without quiescing the mailbox. This issue might have been fixed by either FLINK-23532
> or FLINK-23408.



--
This message was sent by Atlassian Jira
(v8.20.1#820001)