You are viewing a plain text version of this content. The canonical link for it is here.
Posted to user-zh@flink.apache.org by Congxian Qiu <qc...@gmail.com> on 2020/11/04 05:28:54 UTC

Re: Re:Re: flink-1.10 checkpoint 偶尔报 NullPointerException

Hi
    这个问题看上去是特定 JDK 版本上,某些写法下对象被提前回收了,猜测和 gc 有关。之前看到一个可能相关的帖子[1]

[1] https://cloud.tencent.com/developer/news/564780

Best,
Congxian


蒋佳成(Jiacheng Jiang) <92...@qq.com> 于2020年11月4日周三 上午11:33写道:

> hi,这个问题我也遇到了,这个问题的根本原因是啥呢?
>
>
>
> ------------------&nbsp;原始邮件&nbsp;------------------
> 发件人: "chenkaibit"<chenkaibit@163.com&gt;;
> 发送时间: 2020年5月9日(星期六) 中午12:09
> 收件人: "user-zh"<user-zh@flink.apache.org&gt;;
> 主题: Re:Re:Re: flink-1.10 checkpoint 偶尔报 NullPointerException
>
>
>
> Hi:
> 加了一些日志后发现是&nbsp;checkpointMetaData&nbsp;为&nbsp;NULL&nbsp;了&nbsp;
> https://github.com/apache/flink/blob/release-1.10.0/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/StreamTask.java#L1421
> 测试程序为读&nbsp;kafka,然后进行&nbsp;wordcount,结果写入&nbsp;kafka。checkpoint&nbsp;配置如下:
> |&nbsp;Checkpointing&nbsp;Mode&nbsp;|&nbsp;Exactly&nbsp;Once&nbsp;|
> |&nbsp;Interval&nbsp;|&nbsp;5s&nbsp;|
> |&nbsp;Timeout&nbsp;|&nbsp;10m&nbsp;0s&nbsp;|
>
> |&nbsp;Minimum&nbsp;Pause&nbsp;Between&nbsp;Checkpoints&nbsp;|&nbsp;0ms&nbsp;|
> |&nbsp;Maximum&nbsp;Concurrent&nbsp;Checkpoints&nbsp;|&nbsp;1&nbsp;|
>
>
> 稳定在第&nbsp;5377&nbsp;个&nbsp;checkpoint&nbsp;抛出&nbsp;NPE
>
>
> 虽然原因还不清楚,但是修改了部分代码(见&nbsp;
> https://github.com/yuchuanchen/flink/commit/e5122d9787be1fee9bce141887e0d70c9b0a4f19
> )后不再出现&nbsp;NPE&nbsp;了。
>
>
> 在&nbsp;2020-04-21&nbsp;10:21:56,&quot;chenkaibit&quot;&nbsp;<
> chenkaibit@163.com&gt;&nbsp;写道:
> &gt;
> &gt;
> &gt;
> &gt;这个不是稳定复现的,但是在最近&nbsp;1.10&nbsp;上测试的几个作业出现了,触发时也没有其他报错。我加了一些日志,再观察下
> &gt;
> &gt;
> &gt;
> &gt;
> &gt;在&nbsp;2020-04-21&nbsp;01:12:48,&quot;Yun&nbsp;Tang&quot;&nbsp;<
> myasuka@live.com&gt;&nbsp;写道:
> &gt;&gt;Hi
> &gt;&gt;
> &gt;&gt;这个NPE有点奇怪,从executeCheckpointing方法[1]里面其实比较难定位究竟是哪一个变量或者变量的取值是null。
>
> &gt;&gt;一种排查思路是打开&nbsp;org.apache.flink.streaming.runtime.tasks&nbsp;的DEBUG&nbsp;level日志,通过debug日志缩小范围,判断哪个变量是null
> &gt;&gt;
> &gt;&gt;这个异常出现的时候,相关task上面的日志有什么异常么,触发这个NPE的条件是什么,稳定复现么?
> &gt;&gt;
> &gt;&gt;[1]&nbsp;
> https://github.com/apache/flink/blob/aa4eb8f0c9ce74e6b92c3d9be5dc8e8cb536239d/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/StreamTask.java#L1349
> &gt;&gt
> <https://github.com/apache/flink/blob/aa4eb8f0c9ce74e6b92c3d9be5dc8e8cb536239d/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/StreamTask.java#L1349&gt;&gt>
> ;
> &gt;&gt;祝好
> &gt;&gt;唐云
> &gt;&gt;
> &gt;&gt;________________________________
> &gt;&gt;From:&nbsp;chenkaibit&nbsp;<chenkaibit@163.com&gt;
> &gt;&gt;Sent:&nbsp;Monday,&nbsp;April&nbsp;20,&nbsp;2020&nbsp;18:39
> &gt;&gt;To:&nbsp;user-zh@flink.apache.org&nbsp;<user-zh@flink.apache.org
> &gt;
>
> &gt;&gt;Subject:&nbsp;flink-1.10&nbsp;checkpoint&nbsp;偶尔报&nbsp;NullPointerException
> &gt;&gt;
>
> &gt;&gt;大家遇到过这个错误吗,&nbsp;CheckpointOperation.executeCheckpointing&nbsp;的时候报&nbsp;NullPointerException
>
> &gt;&gt;java.lang.Exception:&nbsp;Couldnot&nbsp;perform&nbsp;checkpoint&nbsp;5505for&nbsp;operator&nbsp;Source:&nbsp;KafkaTableSource(xxx)&nbsp;-&gt;&nbsp;SourceConversion(table=[xxx,&nbsp;source:&nbsp;[KafkaTableSource(xxx)]],&nbsp;fields=[xxx])&nbsp;-&gt;&nbsp;Calc(select=[xxx)&nbsp;AS&nbsp;xxx])&nbsp;-&gt;&nbsp;SinkConversionToTuple2&nbsp;-&gt;&nbsp;Sink:&nbsp;Elasticsearch6UpsertTableSink(xxx)&nbsp;(1/1).
> &gt;&gt;
>
> &gt;&gt;&nbsp;&nbsp;&nbsp;&nbsp;at&nbsp;org.apache.flink.streaming.runtime.tasks.StreamTask.triggerCheckpoint(StreamTask.java:802)
> &gt;&gt;
>
> &gt;&gt;&nbsp;&nbsp;&nbsp;&nbsp;at&nbsp;org.apache.flink.streaming.runtime.tasks.StreamTask.lambda$triggerCheckpointAsync$3(StreamTask.java:777)
> &gt;&gt;
>
> &gt;&gt;&nbsp;&nbsp;&nbsp;&nbsp;at&nbsp;org.apache.flink.streaming.runtime.tasks.StreamTask$$Lambda$228/1024478318.call(UnknownSource)
> &gt;&gt;
>
> &gt;&gt;&nbsp;&nbsp;&nbsp;&nbsp;at&nbsp;java.util.concurrent.FutureTask.run(FutureTask.java:266)
> &gt;&gt;
>
> &gt;&gt;&nbsp;&nbsp;&nbsp;&nbsp;at&nbsp;org.apache.flink.streaming.runtime.tasks.StreamTaskActionExecutor$SynchronizedStreamTaskActionExecutor.run(StreamTaskActionExecutor.java:87)
> &gt;&gt;
>
> &gt;&gt;&nbsp;&nbsp;&nbsp;&nbsp;at&nbsp;org.apache.flink.streaming.runtime.tasks.mailbox.Mail.run(Mail.java:78)
> &gt;&gt;
>
> &gt;&gt;&nbsp;&nbsp;&nbsp;&nbsp;at&nbsp;org.apache.flink.streaming.runtime.tasks.mailbox.MailboxProcessor.processMail(MailboxProcessor.java:261)
> &gt;&gt;
>
> &gt;&gt;&nbsp;&nbsp;&nbsp;&nbsp;at&nbsp;org.apache.flink.streaming.runtime.tasks.mailbox.MailboxProcessor.runMailboxLoop(MailboxProcessor.java:186)
> &gt;&gt;
>
> &gt;&gt;&nbsp;&nbsp;&nbsp;&nbsp;at&nbsp;org.apache.flink.streaming.runtime.tasks.StreamTask.runMailboxLoop(StreamTask.java:487)
> &gt;&gt;
>
> &gt;&gt;&nbsp;&nbsp;&nbsp;&nbsp;at&nbsp;org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:470)
> &gt;&gt;
>
> &gt;&gt;&nbsp;&nbsp;&nbsp;&nbsp;at&nbsp;org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:707)
> &gt;&gt;
>
> &gt;&gt;&nbsp;&nbsp;&nbsp;&nbsp;at&nbsp;org.apache.flink.runtime.taskmanager.Task.run(Task.java:532)
> &gt;&gt;
>
> &gt;&gt;&nbsp;&nbsp;&nbsp;&nbsp;at&nbsp;java.lang.Thread.run(Thread.java:745)
> &gt;&gt;
> &gt;&gt;Causedby:&nbsp;java.lang.NullPointerException
> &gt;&gt;
>
> &gt;&gt;&nbsp;&nbsp;&nbsp;&nbsp;at&nbsp;org.apache.flink.streaming.runtime.tasks.StreamTask$CheckpointingOperation.executeCheckpointing(StreamTask.java:1411)
> &gt;&gt;
>
> &gt;&gt;&nbsp;&nbsp;&nbsp;&nbsp;at&nbsp;org.apache.flink.streaming.runtime.tasks.StreamTask.checkpointState(StreamTask.java:991)
> &gt;&gt;
>
> &gt;&gt;&nbsp;&nbsp;&nbsp;&nbsp;at&nbsp;org.apache.flink.streaming.runtime.tasks.StreamTask.lambda$performCheckpoint$5(StreamTask.java:887)
> &gt;&gt;
>
> &gt;&gt;&nbsp;&nbsp;&nbsp;&nbsp;at&nbsp;org.apache.flink.streaming.runtime.tasks.StreamTask$$Lambda$229/1010499540.run(UnknownSource)
> &gt;&gt;
>
> &gt;&gt;&nbsp;&nbsp;&nbsp;&nbsp;at&nbsp;org.apache.flink.streaming.runtime.tasks.StreamTaskActionExecutor$SynchronizedStreamTaskActionExecutor.runThrowing(StreamTaskActionExecutor.java:94)
> &gt;&gt;
>
> &gt;&gt;&nbsp;&nbsp;&nbsp;&nbsp;at&nbsp;org.apache.flink.streaming.runtime.tasks.StreamTask.performCheckpoint(StreamTask.java:860)
> &gt;&gt;
>
> &gt;&gt;&nbsp;&nbsp;&nbsp;&nbsp;at&nbsp;org.apache.flink.streaming.runtime.tasks.StreamTask.triggerCheckpoint(StreamTask.java:793)
> &gt;&gt;
> &gt;&gt;&nbsp;&nbsp;&nbsp;&nbsp;...&nbsp;12&nbsp;more