You are viewing a plain text version of this content. The canonical link for it is here.
Posted to user-zh@flink.apache.org by yidan zhao <hi...@gmail.com> on 2022/08/23 12:31:33 UTC

flink1.15.1 stop 任务失败

如题,stop,停止并保存检查点失败。
测试看 cancel、cancel -s 方式都成功。 cancel -s 可成功生成检查点并退出。

stop则不行,报错主要是
Could not stop with a savepoint job "1b87f308e2582f3cc0e3ccc812471201"
...
Caused by: java.util.concurrent.ExecutionException:
java.util.concurrent.CompletionException:
org.apache.flink.runtime.checkpoint.CheckpointEx
ception: Task has failed.
...
Caused by: org.apache.flink.util.SerializedThrowable:
org.apache.flink.runtime.checkpoint.CheckpointException: Task has
failed.
...
Caused by: org.apache.flink.util.SerializedThrowable: Task has failed.
...

______详细日志:

Re: Re: flink1.15.1 stop 任务失败

Posted by yidan zhao <hi...@gmail.com>.
嗯,我之前也试过了,kafkaSouce确实是可以的,就是FlinkKafkaConsumer不行。

yanfei lei <fr...@gmail.com> 于2022年10月14日周五 14:22写道:
>
> Hi yidan && hjw,
> 我用FlinkKafkaConsumer在本地也复现了这一问题,但用KafakaSource是可以正常做stop-with-savepoint的。FlinkKafkaConsumer在Flink
> 1.15后被deprecated了[1],推荐用新的KafkaSource再试试。
>
> [1]
> https://nightlies.apache.org/flink/flink-docs-master/docs/connectors/datastream/kafka/#kafka-sourcefunction
>
> Best,
> Yanfei
>
> hjw <10...@qq.com.invalid> 于2022年8月23日周二 23:39写道:
>
> > 我认为这个问题应该是Kafka Connector用旧的Api导致的。这个问题在IDEA本地跑就可以复现。我针对这个问题已经提过相关Jira
> > https://issues.apache.org/jira/browse/FLINK-28758。目前还没有收到社区的反馈。
> >
> >
> > ------------------&nbsp;原始邮件&nbsp;------------------
> > 发件人:
> >                                                   "user-zh"
> >                                                                     <
> > hinobleyd@gmail.com&gt;;
> > 发送时间:&nbsp;2022年8月23日(星期二) 晚上11:09
> > 收件人:&nbsp;"user-zh"<user-zh@flink.apache.org&gt;;
> >
> > 主题:&nbsp;Re: Re: flink1.15.1 stop 任务失败
> >
> >
> >
> > 1 大概率是source部分问题,或者 savepoint 的 trigger 层面。
> > 2 也可以从 cancel 和 stop 的区别上考虑下?
> > 3 补充信息:我的kafka source是用的旧版本(没办法用新版本,原因是由于一些原因我必须用 kafka 低版本 client)。
> >
> > yidan zhao <hinobleyd@gmail.com&gt; 于2022年8月23日周二 23:06写道:
> > &gt;
> > &gt; 看了下,报错很少。
> > &gt; 反正 flink cancel -s 是可以的,flink stop 就不行。而且目测是瞬间失败。从web
> > &gt; ui来看,整个savepoint的完成是0/841,应该是几乎没开始就出错了。
> > &gt; 目前4台机器:
> > &gt; 机器1
> > &gt; 2022-08-23 22:47:37,093 WARN
> > &gt;
> > org.apache.flink.runtime.taskmanager.Task&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;
> > [] -
> > &gt; Source: JobConfig -&gt; Split(JobName_configType)
> > &gt;&nbsp; (1/1)#0 (b5076938b231fb9d33e582104292ebd5) switched from
> > RUNNING to
> > &gt; FAILED with failure cause:
> > &gt; org.apache.flink.util.FlinkRuntimeException: S
> > &gt; top-with-savepoint failed.
> > &gt;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp; at
> > org.apache.flink.streaming.runtime.tasks.StreamTask.lambda$notifyCheckpointAbortAsync$15(StreamTask.java:1339)
> > &gt;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp; at
> > org.apache.flink.streaming.runtime.tasks.StreamTask.lambda$notifyCheckpointOperation$17(StreamTask.java:1364)
> > &gt;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp; at
> > org.apache.flink.streaming.runtime.tasks.StreamTaskActionExecutor$SynchronizedStreamTaskActionExecutor.runThrowing(StreamTaskAction
> > &gt; Executor.java:93)
> > &gt;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp; at
> > org.apache.flink.streaming.runtime.tasks.mailbox.Mail.run(Mail.java:90)
> > &gt;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp; at
> > org.apache.flink.streaming.runtime.tasks.mailbox.MailboxProcessor.processMailsWhenDefaultActionUnavailable(MailboxProcessor.java:33
> > &gt; 8)
> > &gt;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp; at
> > org.apache.flink.streaming.runtime.tasks.mailbox.MailboxProcessor.processMail(MailboxProcessor.java:324)
> > &gt;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp; at
> > org.apache.flink.streaming.runtime.tasks.mailbox.MailboxProcessor.runMailboxLoop(MailboxProcessor.java:201)
> > &gt;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp; at
> > org.apache.flink.streaming.runtime.tasks.StreamTask.runMailboxLoop(StreamTask.java:804)
> > &gt;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp; at
> > org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:753)
> > &gt;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp; at
> > org.apache.flink.runtime.taskmanager.Task.runWithSystemExitMonitoring(Task.java:948)
> > &gt;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp; at
> > org.apache.flink.runtime.taskmanager.Task.restoreAndInvoke(Task.java:927)
> > &gt;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp; at
> > org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:741)
> > &gt;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp; at
> > org.apache.flink.runtime.taskmanager.Task.run(Task.java:563)
> > &gt;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp; at
> > java.lang.Thread.run(Thread.java:748)
> > &gt; 下面就是各种 free task,unregister扒拉的。
> > &gt;
> > &gt; 机器2
> > &gt; ...
> > &gt; 基本看起来都是从 Attempt to cancel 开始,没有先报错,再cancel的貌似。
> > &gt;
> > &gt; Xuyang <xyzhong131@163.com&gt; 于2022年8月23日周二 22:25写道:
> > &gt; &gt;
> > &gt; &gt; Hi, TM上有报错信息嘛?有的话可以贴出来看一下是什么导致cp失败的
> > &gt; &gt;
> > &gt; &gt;
> > &gt; &gt;
> > &gt; &gt;
> > &gt; &gt;
> > &gt; &gt;
> > &gt; &gt;
> > &gt; &gt; --
> > &gt; &gt;
> > &gt; &gt;&nbsp;&nbsp;&nbsp;&nbsp; Best!
> > &gt; &gt;&nbsp;&nbsp;&nbsp;&nbsp; Xuyang
> > &gt; &gt;
> > &gt; &gt;
> > &gt; &gt;
> > &gt; &gt;
> > &gt; &gt;
> > &gt; &gt; Hi, TM上有报错信息嘛?有的话可以贴出来看一下是什么导致cp失败的
> > &gt; &gt; 在 2022-08-23 20:41:59,"yidan zhao" <hinobleyd@gmail.com&gt; 写道:
> > &gt; &gt; &gt;补充部分信息:
> > &gt; &gt; &gt;看日志,如果是 flink savepoint xxx 这样触发检查点,JM的日志很简单:
> > &gt; &gt; &gt;2022-08-23 20:33:22,307 INFO
> > &gt; &gt;
> > &gt;org.apache.flink.runtime.jobmaster.JobMaster&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;
> > [] -
> > &gt; &gt; &gt;Triggering savepoint for job 8d231de75b8227a1b
> > &gt; &gt; &gt;715b1aa665caa91.
> > &gt; &gt; &gt;
> > &gt; &gt; &gt;2022-08-23 20:33:22,318 INFO
> > &gt; &gt;
> > &gt;org.apache.flink.runtime.checkpoint.CheckpointCoordinator&nbsp;&nbsp;&nbsp;
> > [] -
> > &gt; &gt; &gt;Triggering checkpoint 5 (type=SavepointType{na
> > &gt; &gt; &gt;me='Savepoint', postCheckpointAction=NONE,
> > formatType=CANONICAL}) @
> > &gt; &gt; &gt;1661258002307 for job 8d231de75b8227a1b715b1aa665caa91.
> > &gt; &gt; &gt;
> > &gt; &gt; &gt;2022-08-23 20:33:23,701 INFO
> > &gt; &gt;
> > &gt;org.apache.flink.runtime.state.filesystem.FsCheckpointMetadataOutputStream
> > &gt; &gt; &gt;[] - Cannot create recoverable writer
> > &gt; &gt; &gt; due to Recoverable writers on Hadoop are only supported for
> > HDFS,
> > &gt; &gt; &gt;will use the ordinary writer.
> > &gt; &gt; &gt;
> > &gt; &gt; &gt;2022-08-23 20:33:23,908 INFO
> > &gt; &gt;
> > &gt;org.apache.flink.runtime.checkpoint.CheckpointCoordinator&nbsp;&nbsp;&nbsp;
> > [] -
> > &gt; &gt; &gt;Completed checkpoint 5 for job
> > 8d231de75b8227a1b715b1aa665caa91
> > &gt; &gt; &gt;(1638207 bytes, checkpointDuration=1600 ms,
> > finalizationTime=1 ms).
> > &gt; &gt; &gt;
> > &gt; &gt; &gt;
> > &gt; &gt; &gt;如果是 stop xxx 这样停止任务,则JM日志(错误)如下:
> > &gt; &gt; &gt;
> > &gt; &gt; &gt;2022-08-23 20:35:01,834 INFO
> > &gt; &gt;
> > &gt;org.apache.flink.runtime.jobmaster.JobMaster&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;
> > [] -
> > &gt; &gt; &gt;Triggering stop-with-savepoint for job
> > &gt; &gt; &gt;8d231de75b8227a1b715b1aa665caa91.
> > &gt; &gt; &gt;
> > &gt; &gt; &gt;2022-08-23 20:35:01,842 INFO
> > &gt; &gt;
> > &gt;org.apache.flink.runtime.checkpoint.CheckpointCoordinator&nbsp;&nbsp;&nbsp;
> > [] -
> > &gt; &gt; &gt;Triggering checkpoint 6 (type=SavepointType{name='Suspend
> > Savepoint',
> > &gt; &gt; &gt;postCheckpointAction=SUSPEND, formatType=CANONICAL}) @
> > 1661258101834
> > &gt; &gt; &gt;for job 8d231de75b8227a1b715b1aa665caa91.
> > &gt; &gt; &gt;
> > &gt; &gt; &gt;2022-08-23 20:35:02,083 INFO
> > &gt; &gt;
> > &gt;org.apache.flink.runtime.checkpoint.CheckpointCoordinator&nbsp;&nbsp;&nbsp;
> > [] -
> > &gt; &gt; &gt;Decline checkpoint 6 by task
> > a65383dad01bc15f654c4afe4aa63b6d of job
> > &gt; &gt; &gt;8d231de75b8227a1b715b1aa665caa91 at 10.35.95.150:13151-3dfdc5
> > @
> > &gt; &gt; &gt;xxx.xxx.com (dataPort=13156).
> > &gt; &gt; &gt;(此处看起来是被decline了,原因是 task failed?)
> > &gt; &gt; &gt;org.apache.flink.util.SerializedThrowable: Task name with
> > subtask :
> > &gt; &gt; &gt;Source: XXX_Kafka(startTs:latest) -&gt;... -&gt;... -&gt;...
> > (10/10)#2 Failure
> > &gt; &gt; &gt;reason: Task has failed.
> > &gt; &gt; &gt;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp; at
> > org.apache.flink.runtime.taskmanager.Task.declineCheckpoint(Task.java:1388)
> > &gt; &gt; &gt;~[flink-dist-1.15.1.jar:1.15.1]
> > &gt; &gt; &gt;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp; at
> > org.apache.flink.runtime.taskmanager.Task.lambda$triggerCheckpointBarrier$3(Task.java:1331)
> > &gt; &gt; &gt;~[flink-dist-1.15.1.jar:1.15.1]
> > &gt; &gt; &gt;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp; at
> > java.util.concurrent.CompletableFuture.uniHandle(CompletableFuture.java:836)
> > &gt; &gt; &gt;~[?:1.8.0_251]
> > &gt; &gt; &gt;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp; at
> > java.util.concurrent.CompletableFuture$UniHandle.tryFire(CompletableFuture.java:811)
> > &gt; &gt; &gt;~[?:1.8.0_251]
> > &gt; &gt; &gt;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp; at
> > java.util.concurrent.CompletableFuture.postComplete(CompletableFuture.java:488)
> > &gt; &gt; &gt;~[?:1.8.0_251]
> > &gt; &gt; &gt;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp; at
> > java.util.concurrent.CompletableFuture.completeExceptionally(CompletableFuture.java:1990)
> > &gt; &gt; &gt;~[?:1.8.0_251]
> > &gt; &gt; &gt;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp; at
> > org.apache.flink.streaming.runtime.tasks.SourceStreamTask$LegacySourceFunctionThread.run(SourceStreamTask.java:343)
> > &gt; &gt; &gt;~[flink-dist-1.15.1.jar:1.15.1]
> > &gt; &gt; &gt;Caused by: org.apache.flink.util.SerializedThrowable:
> > &gt; &gt;
> > &gt;org.apache.flink.streaming.connectors.kafka.internals.Handover$ClosedException
> > &gt; &gt; &gt;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp; at
> > java.util.concurrent.CompletableFuture.encodeThrowable(CompletableFuture.java:292)
> > &gt; &gt; &gt;~[?:1.8.0_251]
> > &gt; &gt; &gt;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp; at
> > java.util.concurrent.CompletableFuture.completeThrowable(CompletableFuture.java:308)
> > &gt; &gt; &gt;~[?:1.8.0_251]
> > &gt; &gt; &gt;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp; at
> > java.util.concurrent.CompletableFuture.uniCompose(CompletableFuture.java:957)
> > &gt; &gt; &gt;~[?:1.8.0_251]
> > &gt; &gt; &gt;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp; at
> > java.util.concurrent.CompletableFuture$UniCompose.tryFire(CompletableFuture.java:940)
> > &gt; &gt; &gt;~[?:1.8.0_251]
> > &gt; &gt; &gt;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp; ... 3 more
> > &gt; &gt; &gt;Caused by: org.apache.flink.util.SerializedThrowable
> > &gt; &gt; &gt;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp; at
> > org.apache.flink.streaming.connectors.kafka.internals.Handover.close(Handover.java:177)
> > &gt; &gt; &gt;~[?:?]
> > &gt; &gt; &gt;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp; at
> > org.apache.flink.streaming.connectors.kafka.internals.KafkaFetcher.cancel(KafkaFetcher.java:164)
> > &gt; &gt; &gt;~[?:?]
> > &gt; &gt; &gt;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp; at
> > org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumerBase.cancel(FlinkKafkaConsumerBase.java:1002)
> > &gt; &gt; &gt;~[?:?]
> > &gt; &gt; &gt;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp; at
> > org.apache.flink.streaming.api.operators.StreamSource.stop(StreamSource.java:128)
> > &gt; &gt; &gt;~[flink-dist-1.15.1.jar:1.15.1]
> > &gt; &gt; &gt;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp; at
> > org.apache.flink.streaming.runtime.tasks.SourceStreamTask.stopOperatorForStopWithSavepoint(SourceStreamTask.java:305)
> > &gt; &gt; &gt;~[flink-dist-1.15.1.jar:1.15.1]
> > &gt; &gt; &gt;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp; at
> > org.apache.flink.streaming.runtime.tasks.SourceStreamTask.lambda$triggerStopWithSavepointAsync$1(SourceStreamTask.java:285)
> > &gt; &gt; &gt;~[flink-dist-1.15.1.jar:1.15.1]
> > &gt; &gt; &gt;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp; at
> > org.apache.flink.streaming.runtime.tasks.StreamTaskActionExecutor$SynchronizedStreamTaskActionExecutor.runThrowing(StreamTaskActionExecutor.java:93)
> > &gt; &gt; &gt;~[flink-dist-1.15.1.jar:1.15.1]
> > &gt; &gt; &gt;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp; at
> > org.apache.flink.streaming.runtime.tasks.mailbox.Mail.run(Mail.java:90)
> > &gt; &gt; &gt;~[flink-dist-1.15.1.jar:1.15.1]
> > &gt; &gt; &gt;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp; at
> > org.apache.flink.streaming.runtime.tasks.mailbox.MailboxProcessor.processMailsWhenDefaultActionUnavailable(MailboxProcessor.java:338)
> > &gt; &gt; &gt;~[flink-dist-1.15.1.jar:1.15.1]
> > &gt; &gt; &gt;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp; at
> > org.apache.flink.streaming.runtime.tasks.mailbox.MailboxProcessor.processMail(MailboxProcessor.java:324)
> > &gt; &gt; &gt;~[flink-dist-1.15.1.jar:1.15.1]
> > &gt; &gt; &gt;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp; at
> > org.apache.flink.streaming.runtime.tasks.mailbox.MailboxProcessor.runMailboxLoop(MailboxProcessor.java:201)
> > &gt; &gt; &gt;~[flink-dist-1.15.1.jar:1.15.1]
> > &gt; &gt; &gt;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp; at
> > org.apache.flink.streaming.runtime.tasks.StreamTask.runMailboxLoop(StreamTask.java:804)
> > &gt; &gt; &gt;~[flink-dist-1.15.1.jar:1.15.1]
> > &gt; &gt; &gt;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp; at
> > org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:753)
> > &gt; &gt; &gt;~[flink-dist-1.15.1.jar:1.15.1]
> > &gt; &gt; &gt;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp; at
> > org.apache.flink.runtime.taskmanager.Task.runWithSystemExitMonitoring(Task.java:948)
> > &gt; &gt; &gt;~[flink-dist-1.15.1.jar:1.15.1]
> > &gt; &gt; &gt;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp; at
> > org.apache.flink.runtime.taskmanager.Task.restoreAndInvoke(Task.java:927)
> > &gt; &gt; &gt;~[flink-dist-1.15.1.jar:1.15.1]
> > &gt; &gt; &gt;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp; at
> > org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:741)
> > &gt; &gt; &gt;~[flink-dist-1.15.1.jar:1.15.1]
> > &gt; &gt; &gt;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp; at
> > org.apache.flink.runtime.taskmanager.Task.run(Task.java:563)
> > &gt; &gt; &gt;~[flink-dist-1.15.1.jar:1.15.1]
> > &gt; &gt; &gt;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp; at
> > java.lang.Thread.run(Thread.java:748) [?:1.8.0_251]
> > &gt; &gt; &gt;
> > &gt; &gt; &gt;yidan zhao <hinobleyd@gmail.com&gt; 于2022年8月23日周二 20:31写道:
> > &gt; &gt; &gt;&gt;
> > &gt; &gt; &gt;&gt; 如题,stop,停止并保存检查点失败。
> > &gt; &gt; &gt;&gt; 测试看 cancel、cancel -s 方式都成功。 cancel -s 可成功生成检查点并退出。
> > &gt; &gt; &gt;&gt;
> > &gt; &gt; &gt;&gt; stop则不行,报错主要是
> > &gt; &gt; &gt;&gt; Could not stop with a savepoint job
> > "1b87f308e2582f3cc0e3ccc812471201"
> > &gt; &gt; &gt;&gt; ...
> > &gt; &gt; &gt;&gt; Caused by: java.util.concurrent.ExecutionException:
> > &gt; &gt; &gt;&gt; java.util.concurrent.CompletionException:
> > &gt; &gt; &gt;&gt; org.apache.flink.runtime.checkpoint.CheckpointEx
> > &gt; &gt; &gt;&gt; ception: Task has failed.
> > &gt; &gt; &gt;&gt; ...
> > &gt; &gt; &gt;&gt; Caused by: org.apache.flink.util.SerializedThrowable:
> > &gt; &gt; &gt;&gt;
> > org.apache.flink.runtime.checkpoint.CheckpointException: Task has
> > &gt; &gt; &gt;&gt; failed.
> > &gt; &gt; &gt;&gt; ...
> > &gt; &gt; &gt;&gt; Caused by: org.apache.flink.util.SerializedThrowable:
> > Task has failed.
> > &gt; &gt; &gt;&gt; ...
> > &gt; &gt; &gt;&gt;
> > &gt; &gt; &gt;&gt; ______详细日志:

Re: Re: flink1.15.1 stop 任务失败

Posted by yanfei lei <fr...@gmail.com>.
Hi yidan && hjw,
我用FlinkKafkaConsumer在本地也复现了这一问题,但用KafakaSource是可以正常做stop-with-savepoint的。FlinkKafkaConsumer在Flink
1.15后被deprecated了[1],推荐用新的KafkaSource再试试。

[1]
https://nightlies.apache.org/flink/flink-docs-master/docs/connectors/datastream/kafka/#kafka-sourcefunction

Best,
Yanfei

hjw <10...@qq.com.invalid> 于2022年8月23日周二 23:39写道:

> 我认为这个问题应该是Kafka Connector用旧的Api导致的。这个问题在IDEA本地跑就可以复现。我针对这个问题已经提过相关Jira
> https://issues.apache.org/jira/browse/FLINK-28758。目前还没有收到社区的反馈。
>
>
> ------------------&nbsp;原始邮件&nbsp;------------------
> 发件人:
>                                                   "user-zh"
>                                                                     <
> hinobleyd@gmail.com&gt;;
> 发送时间:&nbsp;2022年8月23日(星期二) 晚上11:09
> 收件人:&nbsp;"user-zh"<user-zh@flink.apache.org&gt;;
>
> 主题:&nbsp;Re: Re: flink1.15.1 stop 任务失败
>
>
>
> 1 大概率是source部分问题,或者 savepoint 的 trigger 层面。
> 2 也可以从 cancel 和 stop 的区别上考虑下?
> 3 补充信息:我的kafka source是用的旧版本(没办法用新版本,原因是由于一些原因我必须用 kafka 低版本 client)。
>
> yidan zhao <hinobleyd@gmail.com&gt; 于2022年8月23日周二 23:06写道:
> &gt;
> &gt; 看了下,报错很少。
> &gt; 反正 flink cancel -s 是可以的,flink stop 就不行。而且目测是瞬间失败。从web
> &gt; ui来看,整个savepoint的完成是0/841,应该是几乎没开始就出错了。
> &gt; 目前4台机器:
> &gt; 机器1
> &gt; 2022-08-23 22:47:37,093 WARN
> &gt;
> org.apache.flink.runtime.taskmanager.Task&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;
> [] -
> &gt; Source: JobConfig -&gt; Split(JobName_configType)
> &gt;&nbsp; (1/1)#0 (b5076938b231fb9d33e582104292ebd5) switched from
> RUNNING to
> &gt; FAILED with failure cause:
> &gt; org.apache.flink.util.FlinkRuntimeException: S
> &gt; top-with-savepoint failed.
> &gt;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp; at
> org.apache.flink.streaming.runtime.tasks.StreamTask.lambda$notifyCheckpointAbortAsync$15(StreamTask.java:1339)
> &gt;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp; at
> org.apache.flink.streaming.runtime.tasks.StreamTask.lambda$notifyCheckpointOperation$17(StreamTask.java:1364)
> &gt;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp; at
> org.apache.flink.streaming.runtime.tasks.StreamTaskActionExecutor$SynchronizedStreamTaskActionExecutor.runThrowing(StreamTaskAction
> &gt; Executor.java:93)
> &gt;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp; at
> org.apache.flink.streaming.runtime.tasks.mailbox.Mail.run(Mail.java:90)
> &gt;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp; at
> org.apache.flink.streaming.runtime.tasks.mailbox.MailboxProcessor.processMailsWhenDefaultActionUnavailable(MailboxProcessor.java:33
> &gt; 8)
> &gt;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp; at
> org.apache.flink.streaming.runtime.tasks.mailbox.MailboxProcessor.processMail(MailboxProcessor.java:324)
> &gt;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp; at
> org.apache.flink.streaming.runtime.tasks.mailbox.MailboxProcessor.runMailboxLoop(MailboxProcessor.java:201)
> &gt;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp; at
> org.apache.flink.streaming.runtime.tasks.StreamTask.runMailboxLoop(StreamTask.java:804)
> &gt;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp; at
> org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:753)
> &gt;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp; at
> org.apache.flink.runtime.taskmanager.Task.runWithSystemExitMonitoring(Task.java:948)
> &gt;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp; at
> org.apache.flink.runtime.taskmanager.Task.restoreAndInvoke(Task.java:927)
> &gt;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp; at
> org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:741)
> &gt;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp; at
> org.apache.flink.runtime.taskmanager.Task.run(Task.java:563)
> &gt;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp; at
> java.lang.Thread.run(Thread.java:748)
> &gt; 下面就是各种 free task,unregister扒拉的。
> &gt;
> &gt; 机器2
> &gt; ...
> &gt; 基本看起来都是从 Attempt to cancel 开始,没有先报错,再cancel的貌似。
> &gt;
> &gt; Xuyang <xyzhong131@163.com&gt; 于2022年8月23日周二 22:25写道:
> &gt; &gt;
> &gt; &gt; Hi, TM上有报错信息嘛?有的话可以贴出来看一下是什么导致cp失败的
> &gt; &gt;
> &gt; &gt;
> &gt; &gt;
> &gt; &gt;
> &gt; &gt;
> &gt; &gt;
> &gt; &gt;
> &gt; &gt; --
> &gt; &gt;
> &gt; &gt;&nbsp;&nbsp;&nbsp;&nbsp; Best!
> &gt; &gt;&nbsp;&nbsp;&nbsp;&nbsp; Xuyang
> &gt; &gt;
> &gt; &gt;
> &gt; &gt;
> &gt; &gt;
> &gt; &gt;
> &gt; &gt; Hi, TM上有报错信息嘛?有的话可以贴出来看一下是什么导致cp失败的
> &gt; &gt; 在 2022-08-23 20:41:59,"yidan zhao" <hinobleyd@gmail.com&gt; 写道:
> &gt; &gt; &gt;补充部分信息:
> &gt; &gt; &gt;看日志,如果是 flink savepoint xxx 这样触发检查点,JM的日志很简单:
> &gt; &gt; &gt;2022-08-23 20:33:22,307 INFO
> &gt; &gt;
> &gt;org.apache.flink.runtime.jobmaster.JobMaster&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;
> [] -
> &gt; &gt; &gt;Triggering savepoint for job 8d231de75b8227a1b
> &gt; &gt; &gt;715b1aa665caa91.
> &gt; &gt; &gt;
> &gt; &gt; &gt;2022-08-23 20:33:22,318 INFO
> &gt; &gt;
> &gt;org.apache.flink.runtime.checkpoint.CheckpointCoordinator&nbsp;&nbsp;&nbsp;
> [] -
> &gt; &gt; &gt;Triggering checkpoint 5 (type=SavepointType{na
> &gt; &gt; &gt;me='Savepoint', postCheckpointAction=NONE,
> formatType=CANONICAL}) @
> &gt; &gt; &gt;1661258002307 for job 8d231de75b8227a1b715b1aa665caa91.
> &gt; &gt; &gt;
> &gt; &gt; &gt;2022-08-23 20:33:23,701 INFO
> &gt; &gt;
> &gt;org.apache.flink.runtime.state.filesystem.FsCheckpointMetadataOutputStream
> &gt; &gt; &gt;[] - Cannot create recoverable writer
> &gt; &gt; &gt; due to Recoverable writers on Hadoop are only supported for
> HDFS,
> &gt; &gt; &gt;will use the ordinary writer.
> &gt; &gt; &gt;
> &gt; &gt; &gt;2022-08-23 20:33:23,908 INFO
> &gt; &gt;
> &gt;org.apache.flink.runtime.checkpoint.CheckpointCoordinator&nbsp;&nbsp;&nbsp;
> [] -
> &gt; &gt; &gt;Completed checkpoint 5 for job
> 8d231de75b8227a1b715b1aa665caa91
> &gt; &gt; &gt;(1638207 bytes, checkpointDuration=1600 ms,
> finalizationTime=1 ms).
> &gt; &gt; &gt;
> &gt; &gt; &gt;
> &gt; &gt; &gt;如果是 stop xxx 这样停止任务,则JM日志(错误)如下:
> &gt; &gt; &gt;
> &gt; &gt; &gt;2022-08-23 20:35:01,834 INFO
> &gt; &gt;
> &gt;org.apache.flink.runtime.jobmaster.JobMaster&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;
> [] -
> &gt; &gt; &gt;Triggering stop-with-savepoint for job
> &gt; &gt; &gt;8d231de75b8227a1b715b1aa665caa91.
> &gt; &gt; &gt;
> &gt; &gt; &gt;2022-08-23 20:35:01,842 INFO
> &gt; &gt;
> &gt;org.apache.flink.runtime.checkpoint.CheckpointCoordinator&nbsp;&nbsp;&nbsp;
> [] -
> &gt; &gt; &gt;Triggering checkpoint 6 (type=SavepointType{name='Suspend
> Savepoint',
> &gt; &gt; &gt;postCheckpointAction=SUSPEND, formatType=CANONICAL}) @
> 1661258101834
> &gt; &gt; &gt;for job 8d231de75b8227a1b715b1aa665caa91.
> &gt; &gt; &gt;
> &gt; &gt; &gt;2022-08-23 20:35:02,083 INFO
> &gt; &gt;
> &gt;org.apache.flink.runtime.checkpoint.CheckpointCoordinator&nbsp;&nbsp;&nbsp;
> [] -
> &gt; &gt; &gt;Decline checkpoint 6 by task
> a65383dad01bc15f654c4afe4aa63b6d of job
> &gt; &gt; &gt;8d231de75b8227a1b715b1aa665caa91 at 10.35.95.150:13151-3dfdc5
> @
> &gt; &gt; &gt;xxx.xxx.com (dataPort=13156).
> &gt; &gt; &gt;(此处看起来是被decline了,原因是 task failed?)
> &gt; &gt; &gt;org.apache.flink.util.SerializedThrowable: Task name with
> subtask :
> &gt; &gt; &gt;Source: XXX_Kafka(startTs:latest) -&gt;... -&gt;... -&gt;...
> (10/10)#2 Failure
> &gt; &gt; &gt;reason: Task has failed.
> &gt; &gt; &gt;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp; at
> org.apache.flink.runtime.taskmanager.Task.declineCheckpoint(Task.java:1388)
> &gt; &gt; &gt;~[flink-dist-1.15.1.jar:1.15.1]
> &gt; &gt; &gt;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp; at
> org.apache.flink.runtime.taskmanager.Task.lambda$triggerCheckpointBarrier$3(Task.java:1331)
> &gt; &gt; &gt;~[flink-dist-1.15.1.jar:1.15.1]
> &gt; &gt; &gt;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp; at
> java.util.concurrent.CompletableFuture.uniHandle(CompletableFuture.java:836)
> &gt; &gt; &gt;~[?:1.8.0_251]
> &gt; &gt; &gt;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp; at
> java.util.concurrent.CompletableFuture$UniHandle.tryFire(CompletableFuture.java:811)
> &gt; &gt; &gt;~[?:1.8.0_251]
> &gt; &gt; &gt;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp; at
> java.util.concurrent.CompletableFuture.postComplete(CompletableFuture.java:488)
> &gt; &gt; &gt;~[?:1.8.0_251]
> &gt; &gt; &gt;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp; at
> java.util.concurrent.CompletableFuture.completeExceptionally(CompletableFuture.java:1990)
> &gt; &gt; &gt;~[?:1.8.0_251]
> &gt; &gt; &gt;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp; at
> org.apache.flink.streaming.runtime.tasks.SourceStreamTask$LegacySourceFunctionThread.run(SourceStreamTask.java:343)
> &gt; &gt; &gt;~[flink-dist-1.15.1.jar:1.15.1]
> &gt; &gt; &gt;Caused by: org.apache.flink.util.SerializedThrowable:
> &gt; &gt;
> &gt;org.apache.flink.streaming.connectors.kafka.internals.Handover$ClosedException
> &gt; &gt; &gt;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp; at
> java.util.concurrent.CompletableFuture.encodeThrowable(CompletableFuture.java:292)
> &gt; &gt; &gt;~[?:1.8.0_251]
> &gt; &gt; &gt;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp; at
> java.util.concurrent.CompletableFuture.completeThrowable(CompletableFuture.java:308)
> &gt; &gt; &gt;~[?:1.8.0_251]
> &gt; &gt; &gt;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp; at
> java.util.concurrent.CompletableFuture.uniCompose(CompletableFuture.java:957)
> &gt; &gt; &gt;~[?:1.8.0_251]
> &gt; &gt; &gt;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp; at
> java.util.concurrent.CompletableFuture$UniCompose.tryFire(CompletableFuture.java:940)
> &gt; &gt; &gt;~[?:1.8.0_251]
> &gt; &gt; &gt;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp; ... 3 more
> &gt; &gt; &gt;Caused by: org.apache.flink.util.SerializedThrowable
> &gt; &gt; &gt;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp; at
> org.apache.flink.streaming.connectors.kafka.internals.Handover.close(Handover.java:177)
> &gt; &gt; &gt;~[?:?]
> &gt; &gt; &gt;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp; at
> org.apache.flink.streaming.connectors.kafka.internals.KafkaFetcher.cancel(KafkaFetcher.java:164)
> &gt; &gt; &gt;~[?:?]
> &gt; &gt; &gt;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp; at
> org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumerBase.cancel(FlinkKafkaConsumerBase.java:1002)
> &gt; &gt; &gt;~[?:?]
> &gt; &gt; &gt;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp; at
> org.apache.flink.streaming.api.operators.StreamSource.stop(StreamSource.java:128)
> &gt; &gt; &gt;~[flink-dist-1.15.1.jar:1.15.1]
> &gt; &gt; &gt;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp; at
> org.apache.flink.streaming.runtime.tasks.SourceStreamTask.stopOperatorForStopWithSavepoint(SourceStreamTask.java:305)
> &gt; &gt; &gt;~[flink-dist-1.15.1.jar:1.15.1]
> &gt; &gt; &gt;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp; at
> org.apache.flink.streaming.runtime.tasks.SourceStreamTask.lambda$triggerStopWithSavepointAsync$1(SourceStreamTask.java:285)
> &gt; &gt; &gt;~[flink-dist-1.15.1.jar:1.15.1]
> &gt; &gt; &gt;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp; at
> org.apache.flink.streaming.runtime.tasks.StreamTaskActionExecutor$SynchronizedStreamTaskActionExecutor.runThrowing(StreamTaskActionExecutor.java:93)
> &gt; &gt; &gt;~[flink-dist-1.15.1.jar:1.15.1]
> &gt; &gt; &gt;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp; at
> org.apache.flink.streaming.runtime.tasks.mailbox.Mail.run(Mail.java:90)
> &gt; &gt; &gt;~[flink-dist-1.15.1.jar:1.15.1]
> &gt; &gt; &gt;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp; at
> org.apache.flink.streaming.runtime.tasks.mailbox.MailboxProcessor.processMailsWhenDefaultActionUnavailable(MailboxProcessor.java:338)
> &gt; &gt; &gt;~[flink-dist-1.15.1.jar:1.15.1]
> &gt; &gt; &gt;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp; at
> org.apache.flink.streaming.runtime.tasks.mailbox.MailboxProcessor.processMail(MailboxProcessor.java:324)
> &gt; &gt; &gt;~[flink-dist-1.15.1.jar:1.15.1]
> &gt; &gt; &gt;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp; at
> org.apache.flink.streaming.runtime.tasks.mailbox.MailboxProcessor.runMailboxLoop(MailboxProcessor.java:201)
> &gt; &gt; &gt;~[flink-dist-1.15.1.jar:1.15.1]
> &gt; &gt; &gt;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp; at
> org.apache.flink.streaming.runtime.tasks.StreamTask.runMailboxLoop(StreamTask.java:804)
> &gt; &gt; &gt;~[flink-dist-1.15.1.jar:1.15.1]
> &gt; &gt; &gt;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp; at
> org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:753)
> &gt; &gt; &gt;~[flink-dist-1.15.1.jar:1.15.1]
> &gt; &gt; &gt;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp; at
> org.apache.flink.runtime.taskmanager.Task.runWithSystemExitMonitoring(Task.java:948)
> &gt; &gt; &gt;~[flink-dist-1.15.1.jar:1.15.1]
> &gt; &gt; &gt;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp; at
> org.apache.flink.runtime.taskmanager.Task.restoreAndInvoke(Task.java:927)
> &gt; &gt; &gt;~[flink-dist-1.15.1.jar:1.15.1]
> &gt; &gt; &gt;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp; at
> org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:741)
> &gt; &gt; &gt;~[flink-dist-1.15.1.jar:1.15.1]
> &gt; &gt; &gt;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp; at
> org.apache.flink.runtime.taskmanager.Task.run(Task.java:563)
> &gt; &gt; &gt;~[flink-dist-1.15.1.jar:1.15.1]
> &gt; &gt; &gt;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp; at
> java.lang.Thread.run(Thread.java:748) [?:1.8.0_251]
> &gt; &gt; &gt;
> &gt; &gt; &gt;yidan zhao <hinobleyd@gmail.com&gt; 于2022年8月23日周二 20:31写道:
> &gt; &gt; &gt;&gt;
> &gt; &gt; &gt;&gt; 如题,stop,停止并保存检查点失败。
> &gt; &gt; &gt;&gt; 测试看 cancel、cancel -s 方式都成功。 cancel -s 可成功生成检查点并退出。
> &gt; &gt; &gt;&gt;
> &gt; &gt; &gt;&gt; stop则不行,报错主要是
> &gt; &gt; &gt;&gt; Could not stop with a savepoint job
> "1b87f308e2582f3cc0e3ccc812471201"
> &gt; &gt; &gt;&gt; ...
> &gt; &gt; &gt;&gt; Caused by: java.util.concurrent.ExecutionException:
> &gt; &gt; &gt;&gt; java.util.concurrent.CompletionException:
> &gt; &gt; &gt;&gt; org.apache.flink.runtime.checkpoint.CheckpointEx
> &gt; &gt; &gt;&gt; ception: Task has failed.
> &gt; &gt; &gt;&gt; ...
> &gt; &gt; &gt;&gt; Caused by: org.apache.flink.util.SerializedThrowable:
> &gt; &gt; &gt;&gt;
> org.apache.flink.runtime.checkpoint.CheckpointException: Task has
> &gt; &gt; &gt;&gt; failed.
> &gt; &gt; &gt;&gt; ...
> &gt; &gt; &gt;&gt; Caused by: org.apache.flink.util.SerializedThrowable:
> Task has failed.
> &gt; &gt; &gt;&gt; ...
> &gt; &gt; &gt;&gt;
> &gt; &gt; &gt;&gt; ______详细日志:

回复: Re: flink1.15.1 stop 任务失败

Posted by hjw <10...@qq.com.INVALID>.
我认为这个问题应该是Kafka Connector用旧的Api导致的。这个问题在IDEA本地跑就可以复现。我针对这个问题已经提过相关Jira https://issues.apache.org/jira/browse/FLINK-28758。目前还没有收到社区的反馈。


------------------&nbsp;原始邮件&nbsp;------------------
发件人:                                                                                                                        "user-zh"                                                                                    <hinobleyd@gmail.com&gt;;
发送时间:&nbsp;2022年8月23日(星期二) 晚上11:09
收件人:&nbsp;"user-zh"<user-zh@flink.apache.org&gt;;

主题:&nbsp;Re: Re: flink1.15.1 stop 任务失败



1 大概率是source部分问题,或者 savepoint 的 trigger 层面。
2 也可以从 cancel 和 stop 的区别上考虑下?
3 补充信息:我的kafka source是用的旧版本(没办法用新版本,原因是由于一些原因我必须用 kafka 低版本 client)。

yidan zhao <hinobleyd@gmail.com&gt; 于2022年8月23日周二 23:06写道:
&gt;
&gt; 看了下,报错很少。
&gt; 反正 flink cancel -s 是可以的,flink stop 就不行。而且目测是瞬间失败。从web
&gt; ui来看,整个savepoint的完成是0/841,应该是几乎没开始就出错了。
&gt; 目前4台机器:
&gt; 机器1
&gt; 2022-08-23 22:47:37,093 WARN
&gt; org.apache.flink.runtime.taskmanager.Task&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp; [] -
&gt; Source: JobConfig -&gt; Split(JobName_configType)
&gt;&nbsp; (1/1)#0 (b5076938b231fb9d33e582104292ebd5) switched from RUNNING to
&gt; FAILED with failure cause:
&gt; org.apache.flink.util.FlinkRuntimeException: S
&gt; top-with-savepoint failed.
&gt;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp; at org.apache.flink.streaming.runtime.tasks.StreamTask.lambda$notifyCheckpointAbortAsync$15(StreamTask.java:1339)
&gt;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp; at org.apache.flink.streaming.runtime.tasks.StreamTask.lambda$notifyCheckpointOperation$17(StreamTask.java:1364)
&gt;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp; at org.apache.flink.streaming.runtime.tasks.StreamTaskActionExecutor$SynchronizedStreamTaskActionExecutor.runThrowing(StreamTaskAction
&gt; Executor.java:93)
&gt;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp; at org.apache.flink.streaming.runtime.tasks.mailbox.Mail.run(Mail.java:90)
&gt;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp; at org.apache.flink.streaming.runtime.tasks.mailbox.MailboxProcessor.processMailsWhenDefaultActionUnavailable(MailboxProcessor.java:33
&gt; 8)
&gt;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp; at org.apache.flink.streaming.runtime.tasks.mailbox.MailboxProcessor.processMail(MailboxProcessor.java:324)
&gt;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp; at org.apache.flink.streaming.runtime.tasks.mailbox.MailboxProcessor.runMailboxLoop(MailboxProcessor.java:201)
&gt;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp; at org.apache.flink.streaming.runtime.tasks.StreamTask.runMailboxLoop(StreamTask.java:804)
&gt;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp; at org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:753)
&gt;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp; at org.apache.flink.runtime.taskmanager.Task.runWithSystemExitMonitoring(Task.java:948)
&gt;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp; at org.apache.flink.runtime.taskmanager.Task.restoreAndInvoke(Task.java:927)
&gt;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp; at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:741)
&gt;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp; at org.apache.flink.runtime.taskmanager.Task.run(Task.java:563)
&gt;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp; at java.lang.Thread.run(Thread.java:748)
&gt; 下面就是各种 free task,unregister扒拉的。
&gt;
&gt; 机器2
&gt; ...
&gt; 基本看起来都是从 Attempt to cancel 开始,没有先报错,再cancel的貌似。
&gt;
&gt; Xuyang <xyzhong131@163.com&gt; 于2022年8月23日周二 22:25写道:
&gt; &gt;
&gt; &gt; Hi, TM上有报错信息嘛?有的话可以贴出来看一下是什么导致cp失败的
&gt; &gt;
&gt; &gt;
&gt; &gt;
&gt; &gt;
&gt; &gt;
&gt; &gt;
&gt; &gt;
&gt; &gt; --
&gt; &gt;
&gt; &gt;&nbsp;&nbsp;&nbsp;&nbsp; Best!
&gt; &gt;&nbsp;&nbsp;&nbsp;&nbsp; Xuyang
&gt; &gt;
&gt; &gt;
&gt; &gt;
&gt; &gt;
&gt; &gt;
&gt; &gt; Hi, TM上有报错信息嘛?有的话可以贴出来看一下是什么导致cp失败的
&gt; &gt; 在 2022-08-23 20:41:59,"yidan zhao" <hinobleyd@gmail.com&gt; 写道:
&gt; &gt; &gt;补充部分信息:
&gt; &gt; &gt;看日志,如果是 flink savepoint xxx 这样触发检查点,JM的日志很简单:
&gt; &gt; &gt;2022-08-23 20:33:22,307 INFO
&gt; &gt; &gt;org.apache.flink.runtime.jobmaster.JobMaster&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp; [] -
&gt; &gt; &gt;Triggering savepoint for job 8d231de75b8227a1b
&gt; &gt; &gt;715b1aa665caa91.
&gt; &gt; &gt;
&gt; &gt; &gt;2022-08-23 20:33:22,318 INFO
&gt; &gt; &gt;org.apache.flink.runtime.checkpoint.CheckpointCoordinator&nbsp;&nbsp;&nbsp; [] -
&gt; &gt; &gt;Triggering checkpoint 5 (type=SavepointType{na
&gt; &gt; &gt;me='Savepoint', postCheckpointAction=NONE, formatType=CANONICAL}) @
&gt; &gt; &gt;1661258002307 for job 8d231de75b8227a1b715b1aa665caa91.
&gt; &gt; &gt;
&gt; &gt; &gt;2022-08-23 20:33:23,701 INFO
&gt; &gt; &gt;org.apache.flink.runtime.state.filesystem.FsCheckpointMetadataOutputStream
&gt; &gt; &gt;[] - Cannot create recoverable writer
&gt; &gt; &gt; due to Recoverable writers on Hadoop are only supported for HDFS,
&gt; &gt; &gt;will use the ordinary writer.
&gt; &gt; &gt;
&gt; &gt; &gt;2022-08-23 20:33:23,908 INFO
&gt; &gt; &gt;org.apache.flink.runtime.checkpoint.CheckpointCoordinator&nbsp;&nbsp;&nbsp; [] -
&gt; &gt; &gt;Completed checkpoint 5 for job 8d231de75b8227a1b715b1aa665caa91
&gt; &gt; &gt;(1638207 bytes, checkpointDuration=1600 ms, finalizationTime=1 ms).
&gt; &gt; &gt;
&gt; &gt; &gt;
&gt; &gt; &gt;如果是 stop xxx 这样停止任务,则JM日志(错误)如下:
&gt; &gt; &gt;
&gt; &gt; &gt;2022-08-23 20:35:01,834 INFO
&gt; &gt; &gt;org.apache.flink.runtime.jobmaster.JobMaster&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp; [] -
&gt; &gt; &gt;Triggering stop-with-savepoint for job
&gt; &gt; &gt;8d231de75b8227a1b715b1aa665caa91.
&gt; &gt; &gt;
&gt; &gt; &gt;2022-08-23 20:35:01,842 INFO
&gt; &gt; &gt;org.apache.flink.runtime.checkpoint.CheckpointCoordinator&nbsp;&nbsp;&nbsp; [] -
&gt; &gt; &gt;Triggering checkpoint 6 (type=SavepointType{name='Suspend Savepoint',
&gt; &gt; &gt;postCheckpointAction=SUSPEND, formatType=CANONICAL}) @ 1661258101834
&gt; &gt; &gt;for job 8d231de75b8227a1b715b1aa665caa91.
&gt; &gt; &gt;
&gt; &gt; &gt;2022-08-23 20:35:02,083 INFO
&gt; &gt; &gt;org.apache.flink.runtime.checkpoint.CheckpointCoordinator&nbsp;&nbsp;&nbsp; [] -
&gt; &gt; &gt;Decline checkpoint 6 by task a65383dad01bc15f654c4afe4aa63b6d of job
&gt; &gt; &gt;8d231de75b8227a1b715b1aa665caa91 at 10.35.95.150:13151-3dfdc5 @
&gt; &gt; &gt;xxx.xxx.com (dataPort=13156).
&gt; &gt; &gt;(此处看起来是被decline了,原因是 task failed?)
&gt; &gt; &gt;org.apache.flink.util.SerializedThrowable: Task name with subtask :
&gt; &gt; &gt;Source: XXX_Kafka(startTs:latest) -&gt;... -&gt;... -&gt;... (10/10)#2 Failure
&gt; &gt; &gt;reason: Task has failed.
&gt; &gt; &gt;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp; at org.apache.flink.runtime.taskmanager.Task.declineCheckpoint(Task.java:1388)
&gt; &gt; &gt;~[flink-dist-1.15.1.jar:1.15.1]
&gt; &gt; &gt;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp; at org.apache.flink.runtime.taskmanager.Task.lambda$triggerCheckpointBarrier$3(Task.java:1331)
&gt; &gt; &gt;~[flink-dist-1.15.1.jar:1.15.1]
&gt; &gt; &gt;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp; at java.util.concurrent.CompletableFuture.uniHandle(CompletableFuture.java:836)
&gt; &gt; &gt;~[?:1.8.0_251]
&gt; &gt; &gt;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp; at java.util.concurrent.CompletableFuture$UniHandle.tryFire(CompletableFuture.java:811)
&gt; &gt; &gt;~[?:1.8.0_251]
&gt; &gt; &gt;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp; at java.util.concurrent.CompletableFuture.postComplete(CompletableFuture.java:488)
&gt; &gt; &gt;~[?:1.8.0_251]
&gt; &gt; &gt;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp; at java.util.concurrent.CompletableFuture.completeExceptionally(CompletableFuture.java:1990)
&gt; &gt; &gt;~[?:1.8.0_251]
&gt; &gt; &gt;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp; at org.apache.flink.streaming.runtime.tasks.SourceStreamTask$LegacySourceFunctionThread.run(SourceStreamTask.java:343)
&gt; &gt; &gt;~[flink-dist-1.15.1.jar:1.15.1]
&gt; &gt; &gt;Caused by: org.apache.flink.util.SerializedThrowable:
&gt; &gt; &gt;org.apache.flink.streaming.connectors.kafka.internals.Handover$ClosedException
&gt; &gt; &gt;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp; at java.util.concurrent.CompletableFuture.encodeThrowable(CompletableFuture.java:292)
&gt; &gt; &gt;~[?:1.8.0_251]
&gt; &gt; &gt;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp; at java.util.concurrent.CompletableFuture.completeThrowable(CompletableFuture.java:308)
&gt; &gt; &gt;~[?:1.8.0_251]
&gt; &gt; &gt;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp; at java.util.concurrent.CompletableFuture.uniCompose(CompletableFuture.java:957)
&gt; &gt; &gt;~[?:1.8.0_251]
&gt; &gt; &gt;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp; at java.util.concurrent.CompletableFuture$UniCompose.tryFire(CompletableFuture.java:940)
&gt; &gt; &gt;~[?:1.8.0_251]
&gt; &gt; &gt;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp; ... 3 more
&gt; &gt; &gt;Caused by: org.apache.flink.util.SerializedThrowable
&gt; &gt; &gt;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp; at org.apache.flink.streaming.connectors.kafka.internals.Handover.close(Handover.java:177)
&gt; &gt; &gt;~[?:?]
&gt; &gt; &gt;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp; at org.apache.flink.streaming.connectors.kafka.internals.KafkaFetcher.cancel(KafkaFetcher.java:164)
&gt; &gt; &gt;~[?:?]
&gt; &gt; &gt;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp; at org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumerBase.cancel(FlinkKafkaConsumerBase.java:1002)
&gt; &gt; &gt;~[?:?]
&gt; &gt; &gt;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp; at org.apache.flink.streaming.api.operators.StreamSource.stop(StreamSource.java:128)
&gt; &gt; &gt;~[flink-dist-1.15.1.jar:1.15.1]
&gt; &gt; &gt;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp; at org.apache.flink.streaming.runtime.tasks.SourceStreamTask.stopOperatorForStopWithSavepoint(SourceStreamTask.java:305)
&gt; &gt; &gt;~[flink-dist-1.15.1.jar:1.15.1]
&gt; &gt; &gt;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp; at org.apache.flink.streaming.runtime.tasks.SourceStreamTask.lambda$triggerStopWithSavepointAsync$1(SourceStreamTask.java:285)
&gt; &gt; &gt;~[flink-dist-1.15.1.jar:1.15.1]
&gt; &gt; &gt;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp; at org.apache.flink.streaming.runtime.tasks.StreamTaskActionExecutor$SynchronizedStreamTaskActionExecutor.runThrowing(StreamTaskActionExecutor.java:93)
&gt; &gt; &gt;~[flink-dist-1.15.1.jar:1.15.1]
&gt; &gt; &gt;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp; at org.apache.flink.streaming.runtime.tasks.mailbox.Mail.run(Mail.java:90)
&gt; &gt; &gt;~[flink-dist-1.15.1.jar:1.15.1]
&gt; &gt; &gt;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp; at org.apache.flink.streaming.runtime.tasks.mailbox.MailboxProcessor.processMailsWhenDefaultActionUnavailable(MailboxProcessor.java:338)
&gt; &gt; &gt;~[flink-dist-1.15.1.jar:1.15.1]
&gt; &gt; &gt;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp; at org.apache.flink.streaming.runtime.tasks.mailbox.MailboxProcessor.processMail(MailboxProcessor.java:324)
&gt; &gt; &gt;~[flink-dist-1.15.1.jar:1.15.1]
&gt; &gt; &gt;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp; at org.apache.flink.streaming.runtime.tasks.mailbox.MailboxProcessor.runMailboxLoop(MailboxProcessor.java:201)
&gt; &gt; &gt;~[flink-dist-1.15.1.jar:1.15.1]
&gt; &gt; &gt;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp; at org.apache.flink.streaming.runtime.tasks.StreamTask.runMailboxLoop(StreamTask.java:804)
&gt; &gt; &gt;~[flink-dist-1.15.1.jar:1.15.1]
&gt; &gt; &gt;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp; at org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:753)
&gt; &gt; &gt;~[flink-dist-1.15.1.jar:1.15.1]
&gt; &gt; &gt;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp; at org.apache.flink.runtime.taskmanager.Task.runWithSystemExitMonitoring(Task.java:948)
&gt; &gt; &gt;~[flink-dist-1.15.1.jar:1.15.1]
&gt; &gt; &gt;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp; at org.apache.flink.runtime.taskmanager.Task.restoreAndInvoke(Task.java:927)
&gt; &gt; &gt;~[flink-dist-1.15.1.jar:1.15.1]
&gt; &gt; &gt;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp; at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:741)
&gt; &gt; &gt;~[flink-dist-1.15.1.jar:1.15.1]
&gt; &gt; &gt;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp; at org.apache.flink.runtime.taskmanager.Task.run(Task.java:563)
&gt; &gt; &gt;~[flink-dist-1.15.1.jar:1.15.1]
&gt; &gt; &gt;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp; at java.lang.Thread.run(Thread.java:748) [?:1.8.0_251]
&gt; &gt; &gt;
&gt; &gt; &gt;yidan zhao <hinobleyd@gmail.com&gt; 于2022年8月23日周二 20:31写道:
&gt; &gt; &gt;&gt;
&gt; &gt; &gt;&gt; 如题,stop,停止并保存检查点失败。
&gt; &gt; &gt;&gt; 测试看 cancel、cancel -s 方式都成功。 cancel -s 可成功生成检查点并退出。
&gt; &gt; &gt;&gt;
&gt; &gt; &gt;&gt; stop则不行,报错主要是
&gt; &gt; &gt;&gt; Could not stop with a savepoint job "1b87f308e2582f3cc0e3ccc812471201"
&gt; &gt; &gt;&gt; ...
&gt; &gt; &gt;&gt; Caused by: java.util.concurrent.ExecutionException:
&gt; &gt; &gt;&gt; java.util.concurrent.CompletionException:
&gt; &gt; &gt;&gt; org.apache.flink.runtime.checkpoint.CheckpointEx
&gt; &gt; &gt;&gt; ception: Task has failed.
&gt; &gt; &gt;&gt; ...
&gt; &gt; &gt;&gt; Caused by: org.apache.flink.util.SerializedThrowable:
&gt; &gt; &gt;&gt; org.apache.flink.runtime.checkpoint.CheckpointException: Task has
&gt; &gt; &gt;&gt; failed.
&gt; &gt; &gt;&gt; ...
&gt; &gt; &gt;&gt; Caused by: org.apache.flink.util.SerializedThrowable: Task has failed.
&gt; &gt; &gt;&gt; ...
&gt; &gt; &gt;&gt;
&gt; &gt; &gt;&gt; ______详细日志:

Re: Re: flink1.15.1 stop 任务失败

Posted by yidan zhao <hi...@gmail.com>.
1 大概率是source部分问题,或者 savepoint 的 trigger 层面。
2 也可以从 cancel 和 stop 的区别上考虑下?
3 补充信息:我的kafka source是用的旧版本(没办法用新版本,原因是由于一些原因我必须用 kafka 低版本 client)。

yidan zhao <hi...@gmail.com> 于2022年8月23日周二 23:06写道:
>
> 看了下,报错很少。
> 反正 flink cancel -s 是可以的,flink stop 就不行。而且目测是瞬间失败。从web
> ui来看,整个savepoint的完成是0/841,应该是几乎没开始就出错了。
> 目前4台机器:
> 机器1
> 2022-08-23 22:47:37,093 WARN
> org.apache.flink.runtime.taskmanager.Task                    [] -
> Source: JobConfig -> Split(JobName_configType)
>  (1/1)#0 (b5076938b231fb9d33e582104292ebd5) switched from RUNNING to
> FAILED with failure cause:
> org.apache.flink.util.FlinkRuntimeException: S
> top-with-savepoint failed.
>         at org.apache.flink.streaming.runtime.tasks.StreamTask.lambda$notifyCheckpointAbortAsync$15(StreamTask.java:1339)
>         at org.apache.flink.streaming.runtime.tasks.StreamTask.lambda$notifyCheckpointOperation$17(StreamTask.java:1364)
>         at org.apache.flink.streaming.runtime.tasks.StreamTaskActionExecutor$SynchronizedStreamTaskActionExecutor.runThrowing(StreamTaskAction
> Executor.java:93)
>         at org.apache.flink.streaming.runtime.tasks.mailbox.Mail.run(Mail.java:90)
>         at org.apache.flink.streaming.runtime.tasks.mailbox.MailboxProcessor.processMailsWhenDefaultActionUnavailable(MailboxProcessor.java:33
> 8)
>         at org.apache.flink.streaming.runtime.tasks.mailbox.MailboxProcessor.processMail(MailboxProcessor.java:324)
>         at org.apache.flink.streaming.runtime.tasks.mailbox.MailboxProcessor.runMailboxLoop(MailboxProcessor.java:201)
>         at org.apache.flink.streaming.runtime.tasks.StreamTask.runMailboxLoop(StreamTask.java:804)
>         at org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:753)
>         at org.apache.flink.runtime.taskmanager.Task.runWithSystemExitMonitoring(Task.java:948)
>         at org.apache.flink.runtime.taskmanager.Task.restoreAndInvoke(Task.java:927)
>         at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:741)
>         at org.apache.flink.runtime.taskmanager.Task.run(Task.java:563)
>         at java.lang.Thread.run(Thread.java:748)
> 下面就是各种 free task,unregister扒拉的。
>
> 机器2
> ...
> 基本看起来都是从 Attempt to cancel 开始,没有先报错,再cancel的貌似。
>
> Xuyang <xy...@163.com> 于2022年8月23日周二 22:25写道:
> >
> > Hi, TM上有报错信息嘛?有的话可以贴出来看一下是什么导致cp失败的
> >
> >
> >
> >
> >
> >
> >
> > --
> >
> >     Best!
> >     Xuyang
> >
> >
> >
> >
> >
> > Hi, TM上有报错信息嘛?有的话可以贴出来看一下是什么导致cp失败的
> > 在 2022-08-23 20:41:59,"yidan zhao" <hi...@gmail.com> 写道:
> > >补充部分信息:
> > >看日志,如果是 flink savepoint xxx 这样触发检查点,JM的日志很简单:
> > >2022-08-23 20:33:22,307 INFO
> > >org.apache.flink.runtime.jobmaster.JobMaster                 [] -
> > >Triggering savepoint for job 8d231de75b8227a1b
> > >715b1aa665caa91.
> > >
> > >2022-08-23 20:33:22,318 INFO
> > >org.apache.flink.runtime.checkpoint.CheckpointCoordinator    [] -
> > >Triggering checkpoint 5 (type=SavepointType{na
> > >me='Savepoint', postCheckpointAction=NONE, formatType=CANONICAL}) @
> > >1661258002307 for job 8d231de75b8227a1b715b1aa665caa91.
> > >
> > >2022-08-23 20:33:23,701 INFO
> > >org.apache.flink.runtime.state.filesystem.FsCheckpointMetadataOutputStream
> > >[] - Cannot create recoverable writer
> > > due to Recoverable writers on Hadoop are only supported for HDFS,
> > >will use the ordinary writer.
> > >
> > >2022-08-23 20:33:23,908 INFO
> > >org.apache.flink.runtime.checkpoint.CheckpointCoordinator    [] -
> > >Completed checkpoint 5 for job 8d231de75b8227a1b715b1aa665caa91
> > >(1638207 bytes, checkpointDuration=1600 ms, finalizationTime=1 ms).
> > >
> > >
> > >如果是 stop xxx 这样停止任务,则JM日志(错误)如下:
> > >
> > >2022-08-23 20:35:01,834 INFO
> > >org.apache.flink.runtime.jobmaster.JobMaster                 [] -
> > >Triggering stop-with-savepoint for job
> > >8d231de75b8227a1b715b1aa665caa91.
> > >
> > >2022-08-23 20:35:01,842 INFO
> > >org.apache.flink.runtime.checkpoint.CheckpointCoordinator    [] -
> > >Triggering checkpoint 6 (type=SavepointType{name='Suspend Savepoint',
> > >postCheckpointAction=SUSPEND, formatType=CANONICAL}) @ 1661258101834
> > >for job 8d231de75b8227a1b715b1aa665caa91.
> > >
> > >2022-08-23 20:35:02,083 INFO
> > >org.apache.flink.runtime.checkpoint.CheckpointCoordinator    [] -
> > >Decline checkpoint 6 by task a65383dad01bc15f654c4afe4aa63b6d of job
> > >8d231de75b8227a1b715b1aa665caa91 at 10.35.95.150:13151-3dfdc5 @
> > >xxx.xxx.com (dataPort=13156).
> > >(此处看起来是被decline了,原因是 task failed?)
> > >org.apache.flink.util.SerializedThrowable: Task name with subtask :
> > >Source: XXX_Kafka(startTs:latest) ->... ->... ->... (10/10)#2 Failure
> > >reason: Task has failed.
> > >        at org.apache.flink.runtime.taskmanager.Task.declineCheckpoint(Task.java:1388)
> > >~[flink-dist-1.15.1.jar:1.15.1]
> > >        at org.apache.flink.runtime.taskmanager.Task.lambda$triggerCheckpointBarrier$3(Task.java:1331)
> > >~[flink-dist-1.15.1.jar:1.15.1]
> > >        at java.util.concurrent.CompletableFuture.uniHandle(CompletableFuture.java:836)
> > >~[?:1.8.0_251]
> > >        at java.util.concurrent.CompletableFuture$UniHandle.tryFire(CompletableFuture.java:811)
> > >~[?:1.8.0_251]
> > >        at java.util.concurrent.CompletableFuture.postComplete(CompletableFuture.java:488)
> > >~[?:1.8.0_251]
> > >        at java.util.concurrent.CompletableFuture.completeExceptionally(CompletableFuture.java:1990)
> > >~[?:1.8.0_251]
> > >        at org.apache.flink.streaming.runtime.tasks.SourceStreamTask$LegacySourceFunctionThread.run(SourceStreamTask.java:343)
> > >~[flink-dist-1.15.1.jar:1.15.1]
> > >Caused by: org.apache.flink.util.SerializedThrowable:
> > >org.apache.flink.streaming.connectors.kafka.internals.Handover$ClosedException
> > >        at java.util.concurrent.CompletableFuture.encodeThrowable(CompletableFuture.java:292)
> > >~[?:1.8.0_251]
> > >        at java.util.concurrent.CompletableFuture.completeThrowable(CompletableFuture.java:308)
> > >~[?:1.8.0_251]
> > >        at java.util.concurrent.CompletableFuture.uniCompose(CompletableFuture.java:957)
> > >~[?:1.8.0_251]
> > >        at java.util.concurrent.CompletableFuture$UniCompose.tryFire(CompletableFuture.java:940)
> > >~[?:1.8.0_251]
> > >        ... 3 more
> > >Caused by: org.apache.flink.util.SerializedThrowable
> > >        at org.apache.flink.streaming.connectors.kafka.internals.Handover.close(Handover.java:177)
> > >~[?:?]
> > >        at org.apache.flink.streaming.connectors.kafka.internals.KafkaFetcher.cancel(KafkaFetcher.java:164)
> > >~[?:?]
> > >        at org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumerBase.cancel(FlinkKafkaConsumerBase.java:1002)
> > >~[?:?]
> > >        at org.apache.flink.streaming.api.operators.StreamSource.stop(StreamSource.java:128)
> > >~[flink-dist-1.15.1.jar:1.15.1]
> > >        at org.apache.flink.streaming.runtime.tasks.SourceStreamTask.stopOperatorForStopWithSavepoint(SourceStreamTask.java:305)
> > >~[flink-dist-1.15.1.jar:1.15.1]
> > >        at org.apache.flink.streaming.runtime.tasks.SourceStreamTask.lambda$triggerStopWithSavepointAsync$1(SourceStreamTask.java:285)
> > >~[flink-dist-1.15.1.jar:1.15.1]
> > >        at org.apache.flink.streaming.runtime.tasks.StreamTaskActionExecutor$SynchronizedStreamTaskActionExecutor.runThrowing(StreamTaskActionExecutor.java:93)
> > >~[flink-dist-1.15.1.jar:1.15.1]
> > >        at org.apache.flink.streaming.runtime.tasks.mailbox.Mail.run(Mail.java:90)
> > >~[flink-dist-1.15.1.jar:1.15.1]
> > >        at org.apache.flink.streaming.runtime.tasks.mailbox.MailboxProcessor.processMailsWhenDefaultActionUnavailable(MailboxProcessor.java:338)
> > >~[flink-dist-1.15.1.jar:1.15.1]
> > >        at org.apache.flink.streaming.runtime.tasks.mailbox.MailboxProcessor.processMail(MailboxProcessor.java:324)
> > >~[flink-dist-1.15.1.jar:1.15.1]
> > >        at org.apache.flink.streaming.runtime.tasks.mailbox.MailboxProcessor.runMailboxLoop(MailboxProcessor.java:201)
> > >~[flink-dist-1.15.1.jar:1.15.1]
> > >        at org.apache.flink.streaming.runtime.tasks.StreamTask.runMailboxLoop(StreamTask.java:804)
> > >~[flink-dist-1.15.1.jar:1.15.1]
> > >        at org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:753)
> > >~[flink-dist-1.15.1.jar:1.15.1]
> > >        at org.apache.flink.runtime.taskmanager.Task.runWithSystemExitMonitoring(Task.java:948)
> > >~[flink-dist-1.15.1.jar:1.15.1]
> > >        at org.apache.flink.runtime.taskmanager.Task.restoreAndInvoke(Task.java:927)
> > >~[flink-dist-1.15.1.jar:1.15.1]
> > >        at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:741)
> > >~[flink-dist-1.15.1.jar:1.15.1]
> > >        at org.apache.flink.runtime.taskmanager.Task.run(Task.java:563)
> > >~[flink-dist-1.15.1.jar:1.15.1]
> > >        at java.lang.Thread.run(Thread.java:748) [?:1.8.0_251]
> > >
> > >yidan zhao <hi...@gmail.com> 于2022年8月23日周二 20:31写道:
> > >>
> > >> 如题,stop,停止并保存检查点失败。
> > >> 测试看 cancel、cancel -s 方式都成功。 cancel -s 可成功生成检查点并退出。
> > >>
> > >> stop则不行,报错主要是
> > >> Could not stop with a savepoint job "1b87f308e2582f3cc0e3ccc812471201"
> > >> ...
> > >> Caused by: java.util.concurrent.ExecutionException:
> > >> java.util.concurrent.CompletionException:
> > >> org.apache.flink.runtime.checkpoint.CheckpointEx
> > >> ception: Task has failed.
> > >> ...
> > >> Caused by: org.apache.flink.util.SerializedThrowable:
> > >> org.apache.flink.runtime.checkpoint.CheckpointException: Task has
> > >> failed.
> > >> ...
> > >> Caused by: org.apache.flink.util.SerializedThrowable: Task has failed.
> > >> ...
> > >>
> > >> ______详细日志:

Re: Re: flink1.15.1 stop 任务失败

Posted by yidan zhao <hi...@gmail.com>.
看了下,报错很少。
反正 flink cancel -s 是可以的,flink stop 就不行。而且目测是瞬间失败。从web
ui来看,整个savepoint的完成是0/841,应该是几乎没开始就出错了。
目前4台机器:
机器1
2022-08-23 22:47:37,093 WARN
org.apache.flink.runtime.taskmanager.Task                    [] -
Source: JobConfig -> Split(JobName_configType)
 (1/1)#0 (b5076938b231fb9d33e582104292ebd5) switched from RUNNING to
FAILED with failure cause:
org.apache.flink.util.FlinkRuntimeException: S
top-with-savepoint failed.
        at org.apache.flink.streaming.runtime.tasks.StreamTask.lambda$notifyCheckpointAbortAsync$15(StreamTask.java:1339)
        at org.apache.flink.streaming.runtime.tasks.StreamTask.lambda$notifyCheckpointOperation$17(StreamTask.java:1364)
        at org.apache.flink.streaming.runtime.tasks.StreamTaskActionExecutor$SynchronizedStreamTaskActionExecutor.runThrowing(StreamTaskAction
Executor.java:93)
        at org.apache.flink.streaming.runtime.tasks.mailbox.Mail.run(Mail.java:90)
        at org.apache.flink.streaming.runtime.tasks.mailbox.MailboxProcessor.processMailsWhenDefaultActionUnavailable(MailboxProcessor.java:33
8)
        at org.apache.flink.streaming.runtime.tasks.mailbox.MailboxProcessor.processMail(MailboxProcessor.java:324)
        at org.apache.flink.streaming.runtime.tasks.mailbox.MailboxProcessor.runMailboxLoop(MailboxProcessor.java:201)
        at org.apache.flink.streaming.runtime.tasks.StreamTask.runMailboxLoop(StreamTask.java:804)
        at org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:753)
        at org.apache.flink.runtime.taskmanager.Task.runWithSystemExitMonitoring(Task.java:948)
        at org.apache.flink.runtime.taskmanager.Task.restoreAndInvoke(Task.java:927)
        at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:741)
        at org.apache.flink.runtime.taskmanager.Task.run(Task.java:563)
        at java.lang.Thread.run(Thread.java:748)
下面就是各种 free task,unregister扒拉的。

机器2
...
基本看起来都是从 Attempt to cancel 开始,没有先报错,再cancel的貌似。

Xuyang <xy...@163.com> 于2022年8月23日周二 22:25写道:
>
> Hi, TM上有报错信息嘛?有的话可以贴出来看一下是什么导致cp失败的
>
>
>
>
>
>
>
> --
>
>     Best!
>     Xuyang
>
>
>
>
>
> Hi, TM上有报错信息嘛?有的话可以贴出来看一下是什么导致cp失败的
> 在 2022-08-23 20:41:59,"yidan zhao" <hi...@gmail.com> 写道:
> >补充部分信息:
> >看日志,如果是 flink savepoint xxx 这样触发检查点,JM的日志很简单:
> >2022-08-23 20:33:22,307 INFO
> >org.apache.flink.runtime.jobmaster.JobMaster                 [] -
> >Triggering savepoint for job 8d231de75b8227a1b
> >715b1aa665caa91.
> >
> >2022-08-23 20:33:22,318 INFO
> >org.apache.flink.runtime.checkpoint.CheckpointCoordinator    [] -
> >Triggering checkpoint 5 (type=SavepointType{na
> >me='Savepoint', postCheckpointAction=NONE, formatType=CANONICAL}) @
> >1661258002307 for job 8d231de75b8227a1b715b1aa665caa91.
> >
> >2022-08-23 20:33:23,701 INFO
> >org.apache.flink.runtime.state.filesystem.FsCheckpointMetadataOutputStream
> >[] - Cannot create recoverable writer
> > due to Recoverable writers on Hadoop are only supported for HDFS,
> >will use the ordinary writer.
> >
> >2022-08-23 20:33:23,908 INFO
> >org.apache.flink.runtime.checkpoint.CheckpointCoordinator    [] -
> >Completed checkpoint 5 for job 8d231de75b8227a1b715b1aa665caa91
> >(1638207 bytes, checkpointDuration=1600 ms, finalizationTime=1 ms).
> >
> >
> >如果是 stop xxx 这样停止任务,则JM日志(错误)如下:
> >
> >2022-08-23 20:35:01,834 INFO
> >org.apache.flink.runtime.jobmaster.JobMaster                 [] -
> >Triggering stop-with-savepoint for job
> >8d231de75b8227a1b715b1aa665caa91.
> >
> >2022-08-23 20:35:01,842 INFO
> >org.apache.flink.runtime.checkpoint.CheckpointCoordinator    [] -
> >Triggering checkpoint 6 (type=SavepointType{name='Suspend Savepoint',
> >postCheckpointAction=SUSPEND, formatType=CANONICAL}) @ 1661258101834
> >for job 8d231de75b8227a1b715b1aa665caa91.
> >
> >2022-08-23 20:35:02,083 INFO
> >org.apache.flink.runtime.checkpoint.CheckpointCoordinator    [] -
> >Decline checkpoint 6 by task a65383dad01bc15f654c4afe4aa63b6d of job
> >8d231de75b8227a1b715b1aa665caa91 at 10.35.95.150:13151-3dfdc5 @
> >xxx.xxx.com (dataPort=13156).
> >(此处看起来是被decline了,原因是 task failed?)
> >org.apache.flink.util.SerializedThrowable: Task name with subtask :
> >Source: XXX_Kafka(startTs:latest) ->... ->... ->... (10/10)#2 Failure
> >reason: Task has failed.
> >        at org.apache.flink.runtime.taskmanager.Task.declineCheckpoint(Task.java:1388)
> >~[flink-dist-1.15.1.jar:1.15.1]
> >        at org.apache.flink.runtime.taskmanager.Task.lambda$triggerCheckpointBarrier$3(Task.java:1331)
> >~[flink-dist-1.15.1.jar:1.15.1]
> >        at java.util.concurrent.CompletableFuture.uniHandle(CompletableFuture.java:836)
> >~[?:1.8.0_251]
> >        at java.util.concurrent.CompletableFuture$UniHandle.tryFire(CompletableFuture.java:811)
> >~[?:1.8.0_251]
> >        at java.util.concurrent.CompletableFuture.postComplete(CompletableFuture.java:488)
> >~[?:1.8.0_251]
> >        at java.util.concurrent.CompletableFuture.completeExceptionally(CompletableFuture.java:1990)
> >~[?:1.8.0_251]
> >        at org.apache.flink.streaming.runtime.tasks.SourceStreamTask$LegacySourceFunctionThread.run(SourceStreamTask.java:343)
> >~[flink-dist-1.15.1.jar:1.15.1]
> >Caused by: org.apache.flink.util.SerializedThrowable:
> >org.apache.flink.streaming.connectors.kafka.internals.Handover$ClosedException
> >        at java.util.concurrent.CompletableFuture.encodeThrowable(CompletableFuture.java:292)
> >~[?:1.8.0_251]
> >        at java.util.concurrent.CompletableFuture.completeThrowable(CompletableFuture.java:308)
> >~[?:1.8.0_251]
> >        at java.util.concurrent.CompletableFuture.uniCompose(CompletableFuture.java:957)
> >~[?:1.8.0_251]
> >        at java.util.concurrent.CompletableFuture$UniCompose.tryFire(CompletableFuture.java:940)
> >~[?:1.8.0_251]
> >        ... 3 more
> >Caused by: org.apache.flink.util.SerializedThrowable
> >        at org.apache.flink.streaming.connectors.kafka.internals.Handover.close(Handover.java:177)
> >~[?:?]
> >        at org.apache.flink.streaming.connectors.kafka.internals.KafkaFetcher.cancel(KafkaFetcher.java:164)
> >~[?:?]
> >        at org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumerBase.cancel(FlinkKafkaConsumerBase.java:1002)
> >~[?:?]
> >        at org.apache.flink.streaming.api.operators.StreamSource.stop(StreamSource.java:128)
> >~[flink-dist-1.15.1.jar:1.15.1]
> >        at org.apache.flink.streaming.runtime.tasks.SourceStreamTask.stopOperatorForStopWithSavepoint(SourceStreamTask.java:305)
> >~[flink-dist-1.15.1.jar:1.15.1]
> >        at org.apache.flink.streaming.runtime.tasks.SourceStreamTask.lambda$triggerStopWithSavepointAsync$1(SourceStreamTask.java:285)
> >~[flink-dist-1.15.1.jar:1.15.1]
> >        at org.apache.flink.streaming.runtime.tasks.StreamTaskActionExecutor$SynchronizedStreamTaskActionExecutor.runThrowing(StreamTaskActionExecutor.java:93)
> >~[flink-dist-1.15.1.jar:1.15.1]
> >        at org.apache.flink.streaming.runtime.tasks.mailbox.Mail.run(Mail.java:90)
> >~[flink-dist-1.15.1.jar:1.15.1]
> >        at org.apache.flink.streaming.runtime.tasks.mailbox.MailboxProcessor.processMailsWhenDefaultActionUnavailable(MailboxProcessor.java:338)
> >~[flink-dist-1.15.1.jar:1.15.1]
> >        at org.apache.flink.streaming.runtime.tasks.mailbox.MailboxProcessor.processMail(MailboxProcessor.java:324)
> >~[flink-dist-1.15.1.jar:1.15.1]
> >        at org.apache.flink.streaming.runtime.tasks.mailbox.MailboxProcessor.runMailboxLoop(MailboxProcessor.java:201)
> >~[flink-dist-1.15.1.jar:1.15.1]
> >        at org.apache.flink.streaming.runtime.tasks.StreamTask.runMailboxLoop(StreamTask.java:804)
> >~[flink-dist-1.15.1.jar:1.15.1]
> >        at org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:753)
> >~[flink-dist-1.15.1.jar:1.15.1]
> >        at org.apache.flink.runtime.taskmanager.Task.runWithSystemExitMonitoring(Task.java:948)
> >~[flink-dist-1.15.1.jar:1.15.1]
> >        at org.apache.flink.runtime.taskmanager.Task.restoreAndInvoke(Task.java:927)
> >~[flink-dist-1.15.1.jar:1.15.1]
> >        at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:741)
> >~[flink-dist-1.15.1.jar:1.15.1]
> >        at org.apache.flink.runtime.taskmanager.Task.run(Task.java:563)
> >~[flink-dist-1.15.1.jar:1.15.1]
> >        at java.lang.Thread.run(Thread.java:748) [?:1.8.0_251]
> >
> >yidan zhao <hi...@gmail.com> 于2022年8月23日周二 20:31写道:
> >>
> >> 如题,stop,停止并保存检查点失败。
> >> 测试看 cancel、cancel -s 方式都成功。 cancel -s 可成功生成检查点并退出。
> >>
> >> stop则不行,报错主要是
> >> Could not stop with a savepoint job "1b87f308e2582f3cc0e3ccc812471201"
> >> ...
> >> Caused by: java.util.concurrent.ExecutionException:
> >> java.util.concurrent.CompletionException:
> >> org.apache.flink.runtime.checkpoint.CheckpointEx
> >> ception: Task has failed.
> >> ...
> >> Caused by: org.apache.flink.util.SerializedThrowable:
> >> org.apache.flink.runtime.checkpoint.CheckpointException: Task has
> >> failed.
> >> ...
> >> Caused by: org.apache.flink.util.SerializedThrowable: Task has failed.
> >> ...
> >>
> >> ______详细日志:

Re:Re: flink1.15.1 stop 任务失败

Posted by Xuyang <xy...@163.com>.
Hi, TM上有报错信息嘛?有的话可以贴出来看一下是什么导致cp失败的







--

    Best!
    Xuyang





Hi, TM上有报错信息嘛?有的话可以贴出来看一下是什么导致cp失败的
在 2022-08-23 20:41:59,"yidan zhao" <hi...@gmail.com> 写道:
>补充部分信息:
>看日志,如果是 flink savepoint xxx 这样触发检查点,JM的日志很简单:
>2022-08-23 20:33:22,307 INFO
>org.apache.flink.runtime.jobmaster.JobMaster                 [] -
>Triggering savepoint for job 8d231de75b8227a1b
>715b1aa665caa91.
>
>2022-08-23 20:33:22,318 INFO
>org.apache.flink.runtime.checkpoint.CheckpointCoordinator    [] -
>Triggering checkpoint 5 (type=SavepointType{na
>me='Savepoint', postCheckpointAction=NONE, formatType=CANONICAL}) @
>1661258002307 for job 8d231de75b8227a1b715b1aa665caa91.
>
>2022-08-23 20:33:23,701 INFO
>org.apache.flink.runtime.state.filesystem.FsCheckpointMetadataOutputStream
>[] - Cannot create recoverable writer
> due to Recoverable writers on Hadoop are only supported for HDFS,
>will use the ordinary writer.
>
>2022-08-23 20:33:23,908 INFO
>org.apache.flink.runtime.checkpoint.CheckpointCoordinator    [] -
>Completed checkpoint 5 for job 8d231de75b8227a1b715b1aa665caa91
>(1638207 bytes, checkpointDuration=1600 ms, finalizationTime=1 ms).
>
>
>如果是 stop xxx 这样停止任务,则JM日志(错误)如下:
>
>2022-08-23 20:35:01,834 INFO
>org.apache.flink.runtime.jobmaster.JobMaster                 [] -
>Triggering stop-with-savepoint for job
>8d231de75b8227a1b715b1aa665caa91.
>
>2022-08-23 20:35:01,842 INFO
>org.apache.flink.runtime.checkpoint.CheckpointCoordinator    [] -
>Triggering checkpoint 6 (type=SavepointType{name='Suspend Savepoint',
>postCheckpointAction=SUSPEND, formatType=CANONICAL}) @ 1661258101834
>for job 8d231de75b8227a1b715b1aa665caa91.
>
>2022-08-23 20:35:02,083 INFO
>org.apache.flink.runtime.checkpoint.CheckpointCoordinator    [] -
>Decline checkpoint 6 by task a65383dad01bc15f654c4afe4aa63b6d of job
>8d231de75b8227a1b715b1aa665caa91 at 10.35.95.150:13151-3dfdc5 @
>xxx.xxx.com (dataPort=13156).
>(此处看起来是被decline了,原因是 task failed?)
>org.apache.flink.util.SerializedThrowable: Task name with subtask :
>Source: XXX_Kafka(startTs:latest) ->... ->... ->... (10/10)#2 Failure
>reason: Task has failed.
>        at org.apache.flink.runtime.taskmanager.Task.declineCheckpoint(Task.java:1388)
>~[flink-dist-1.15.1.jar:1.15.1]
>        at org.apache.flink.runtime.taskmanager.Task.lambda$triggerCheckpointBarrier$3(Task.java:1331)
>~[flink-dist-1.15.1.jar:1.15.1]
>        at java.util.concurrent.CompletableFuture.uniHandle(CompletableFuture.java:836)
>~[?:1.8.0_251]
>        at java.util.concurrent.CompletableFuture$UniHandle.tryFire(CompletableFuture.java:811)
>~[?:1.8.0_251]
>        at java.util.concurrent.CompletableFuture.postComplete(CompletableFuture.java:488)
>~[?:1.8.0_251]
>        at java.util.concurrent.CompletableFuture.completeExceptionally(CompletableFuture.java:1990)
>~[?:1.8.0_251]
>        at org.apache.flink.streaming.runtime.tasks.SourceStreamTask$LegacySourceFunctionThread.run(SourceStreamTask.java:343)
>~[flink-dist-1.15.1.jar:1.15.1]
>Caused by: org.apache.flink.util.SerializedThrowable:
>org.apache.flink.streaming.connectors.kafka.internals.Handover$ClosedException
>        at java.util.concurrent.CompletableFuture.encodeThrowable(CompletableFuture.java:292)
>~[?:1.8.0_251]
>        at java.util.concurrent.CompletableFuture.completeThrowable(CompletableFuture.java:308)
>~[?:1.8.0_251]
>        at java.util.concurrent.CompletableFuture.uniCompose(CompletableFuture.java:957)
>~[?:1.8.0_251]
>        at java.util.concurrent.CompletableFuture$UniCompose.tryFire(CompletableFuture.java:940)
>~[?:1.8.0_251]
>        ... 3 more
>Caused by: org.apache.flink.util.SerializedThrowable
>        at org.apache.flink.streaming.connectors.kafka.internals.Handover.close(Handover.java:177)
>~[?:?]
>        at org.apache.flink.streaming.connectors.kafka.internals.KafkaFetcher.cancel(KafkaFetcher.java:164)
>~[?:?]
>        at org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumerBase.cancel(FlinkKafkaConsumerBase.java:1002)
>~[?:?]
>        at org.apache.flink.streaming.api.operators.StreamSource.stop(StreamSource.java:128)
>~[flink-dist-1.15.1.jar:1.15.1]
>        at org.apache.flink.streaming.runtime.tasks.SourceStreamTask.stopOperatorForStopWithSavepoint(SourceStreamTask.java:305)
>~[flink-dist-1.15.1.jar:1.15.1]
>        at org.apache.flink.streaming.runtime.tasks.SourceStreamTask.lambda$triggerStopWithSavepointAsync$1(SourceStreamTask.java:285)
>~[flink-dist-1.15.1.jar:1.15.1]
>        at org.apache.flink.streaming.runtime.tasks.StreamTaskActionExecutor$SynchronizedStreamTaskActionExecutor.runThrowing(StreamTaskActionExecutor.java:93)
>~[flink-dist-1.15.1.jar:1.15.1]
>        at org.apache.flink.streaming.runtime.tasks.mailbox.Mail.run(Mail.java:90)
>~[flink-dist-1.15.1.jar:1.15.1]
>        at org.apache.flink.streaming.runtime.tasks.mailbox.MailboxProcessor.processMailsWhenDefaultActionUnavailable(MailboxProcessor.java:338)
>~[flink-dist-1.15.1.jar:1.15.1]
>        at org.apache.flink.streaming.runtime.tasks.mailbox.MailboxProcessor.processMail(MailboxProcessor.java:324)
>~[flink-dist-1.15.1.jar:1.15.1]
>        at org.apache.flink.streaming.runtime.tasks.mailbox.MailboxProcessor.runMailboxLoop(MailboxProcessor.java:201)
>~[flink-dist-1.15.1.jar:1.15.1]
>        at org.apache.flink.streaming.runtime.tasks.StreamTask.runMailboxLoop(StreamTask.java:804)
>~[flink-dist-1.15.1.jar:1.15.1]
>        at org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:753)
>~[flink-dist-1.15.1.jar:1.15.1]
>        at org.apache.flink.runtime.taskmanager.Task.runWithSystemExitMonitoring(Task.java:948)
>~[flink-dist-1.15.1.jar:1.15.1]
>        at org.apache.flink.runtime.taskmanager.Task.restoreAndInvoke(Task.java:927)
>~[flink-dist-1.15.1.jar:1.15.1]
>        at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:741)
>~[flink-dist-1.15.1.jar:1.15.1]
>        at org.apache.flink.runtime.taskmanager.Task.run(Task.java:563)
>~[flink-dist-1.15.1.jar:1.15.1]
>        at java.lang.Thread.run(Thread.java:748) [?:1.8.0_251]
>
>yidan zhao <hi...@gmail.com> 于2022年8月23日周二 20:31写道:
>>
>> 如题,stop,停止并保存检查点失败。
>> 测试看 cancel、cancel -s 方式都成功。 cancel -s 可成功生成检查点并退出。
>>
>> stop则不行,报错主要是
>> Could not stop with a savepoint job "1b87f308e2582f3cc0e3ccc812471201"
>> ...
>> Caused by: java.util.concurrent.ExecutionException:
>> java.util.concurrent.CompletionException:
>> org.apache.flink.runtime.checkpoint.CheckpointEx
>> ception: Task has failed.
>> ...
>> Caused by: org.apache.flink.util.SerializedThrowable:
>> org.apache.flink.runtime.checkpoint.CheckpointException: Task has
>> failed.
>> ...
>> Caused by: org.apache.flink.util.SerializedThrowable: Task has failed.
>> ...
>>
>> ______详细日志:

Re: flink1.15.1 stop 任务失败

Posted by yidan zhao <hi...@gmail.com>.
补充部分信息:
看日志,如果是 flink savepoint xxx 这样触发检查点,JM的日志很简单:
2022-08-23 20:33:22,307 INFO
org.apache.flink.runtime.jobmaster.JobMaster                 [] -
Triggering savepoint for job 8d231de75b8227a1b
715b1aa665caa91.

2022-08-23 20:33:22,318 INFO
org.apache.flink.runtime.checkpoint.CheckpointCoordinator    [] -
Triggering checkpoint 5 (type=SavepointType{na
me='Savepoint', postCheckpointAction=NONE, formatType=CANONICAL}) @
1661258002307 for job 8d231de75b8227a1b715b1aa665caa91.

2022-08-23 20:33:23,701 INFO
org.apache.flink.runtime.state.filesystem.FsCheckpointMetadataOutputStream
[] - Cannot create recoverable writer
 due to Recoverable writers on Hadoop are only supported for HDFS,
will use the ordinary writer.

2022-08-23 20:33:23,908 INFO
org.apache.flink.runtime.checkpoint.CheckpointCoordinator    [] -
Completed checkpoint 5 for job 8d231de75b8227a1b715b1aa665caa91
(1638207 bytes, checkpointDuration=1600 ms, finalizationTime=1 ms).


如果是 stop xxx 这样停止任务,则JM日志(错误)如下:

2022-08-23 20:35:01,834 INFO
org.apache.flink.runtime.jobmaster.JobMaster                 [] -
Triggering stop-with-savepoint for job
8d231de75b8227a1b715b1aa665caa91.

2022-08-23 20:35:01,842 INFO
org.apache.flink.runtime.checkpoint.CheckpointCoordinator    [] -
Triggering checkpoint 6 (type=SavepointType{name='Suspend Savepoint',
postCheckpointAction=SUSPEND, formatType=CANONICAL}) @ 1661258101834
for job 8d231de75b8227a1b715b1aa665caa91.

2022-08-23 20:35:02,083 INFO
org.apache.flink.runtime.checkpoint.CheckpointCoordinator    [] -
Decline checkpoint 6 by task a65383dad01bc15f654c4afe4aa63b6d of job
8d231de75b8227a1b715b1aa665caa91 at 10.35.95.150:13151-3dfdc5 @
xxx.xxx.com (dataPort=13156).
(此处看起来是被decline了,原因是 task failed?)
org.apache.flink.util.SerializedThrowable: Task name with subtask :
Source: XXX_Kafka(startTs:latest) ->... ->... ->... (10/10)#2 Failure
reason: Task has failed.
        at org.apache.flink.runtime.taskmanager.Task.declineCheckpoint(Task.java:1388)
~[flink-dist-1.15.1.jar:1.15.1]
        at org.apache.flink.runtime.taskmanager.Task.lambda$triggerCheckpointBarrier$3(Task.java:1331)
~[flink-dist-1.15.1.jar:1.15.1]
        at java.util.concurrent.CompletableFuture.uniHandle(CompletableFuture.java:836)
~[?:1.8.0_251]
        at java.util.concurrent.CompletableFuture$UniHandle.tryFire(CompletableFuture.java:811)
~[?:1.8.0_251]
        at java.util.concurrent.CompletableFuture.postComplete(CompletableFuture.java:488)
~[?:1.8.0_251]
        at java.util.concurrent.CompletableFuture.completeExceptionally(CompletableFuture.java:1990)
~[?:1.8.0_251]
        at org.apache.flink.streaming.runtime.tasks.SourceStreamTask$LegacySourceFunctionThread.run(SourceStreamTask.java:343)
~[flink-dist-1.15.1.jar:1.15.1]
Caused by: org.apache.flink.util.SerializedThrowable:
org.apache.flink.streaming.connectors.kafka.internals.Handover$ClosedException
        at java.util.concurrent.CompletableFuture.encodeThrowable(CompletableFuture.java:292)
~[?:1.8.0_251]
        at java.util.concurrent.CompletableFuture.completeThrowable(CompletableFuture.java:308)
~[?:1.8.0_251]
        at java.util.concurrent.CompletableFuture.uniCompose(CompletableFuture.java:957)
~[?:1.8.0_251]
        at java.util.concurrent.CompletableFuture$UniCompose.tryFire(CompletableFuture.java:940)
~[?:1.8.0_251]
        ... 3 more
Caused by: org.apache.flink.util.SerializedThrowable
        at org.apache.flink.streaming.connectors.kafka.internals.Handover.close(Handover.java:177)
~[?:?]
        at org.apache.flink.streaming.connectors.kafka.internals.KafkaFetcher.cancel(KafkaFetcher.java:164)
~[?:?]
        at org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumerBase.cancel(FlinkKafkaConsumerBase.java:1002)
~[?:?]
        at org.apache.flink.streaming.api.operators.StreamSource.stop(StreamSource.java:128)
~[flink-dist-1.15.1.jar:1.15.1]
        at org.apache.flink.streaming.runtime.tasks.SourceStreamTask.stopOperatorForStopWithSavepoint(SourceStreamTask.java:305)
~[flink-dist-1.15.1.jar:1.15.1]
        at org.apache.flink.streaming.runtime.tasks.SourceStreamTask.lambda$triggerStopWithSavepointAsync$1(SourceStreamTask.java:285)
~[flink-dist-1.15.1.jar:1.15.1]
        at org.apache.flink.streaming.runtime.tasks.StreamTaskActionExecutor$SynchronizedStreamTaskActionExecutor.runThrowing(StreamTaskActionExecutor.java:93)
~[flink-dist-1.15.1.jar:1.15.1]
        at org.apache.flink.streaming.runtime.tasks.mailbox.Mail.run(Mail.java:90)
~[flink-dist-1.15.1.jar:1.15.1]
        at org.apache.flink.streaming.runtime.tasks.mailbox.MailboxProcessor.processMailsWhenDefaultActionUnavailable(MailboxProcessor.java:338)
~[flink-dist-1.15.1.jar:1.15.1]
        at org.apache.flink.streaming.runtime.tasks.mailbox.MailboxProcessor.processMail(MailboxProcessor.java:324)
~[flink-dist-1.15.1.jar:1.15.1]
        at org.apache.flink.streaming.runtime.tasks.mailbox.MailboxProcessor.runMailboxLoop(MailboxProcessor.java:201)
~[flink-dist-1.15.1.jar:1.15.1]
        at org.apache.flink.streaming.runtime.tasks.StreamTask.runMailboxLoop(StreamTask.java:804)
~[flink-dist-1.15.1.jar:1.15.1]
        at org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:753)
~[flink-dist-1.15.1.jar:1.15.1]
        at org.apache.flink.runtime.taskmanager.Task.runWithSystemExitMonitoring(Task.java:948)
~[flink-dist-1.15.1.jar:1.15.1]
        at org.apache.flink.runtime.taskmanager.Task.restoreAndInvoke(Task.java:927)
~[flink-dist-1.15.1.jar:1.15.1]
        at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:741)
~[flink-dist-1.15.1.jar:1.15.1]
        at org.apache.flink.runtime.taskmanager.Task.run(Task.java:563)
~[flink-dist-1.15.1.jar:1.15.1]
        at java.lang.Thread.run(Thread.java:748) [?:1.8.0_251]

yidan zhao <hi...@gmail.com> 于2022年8月23日周二 20:31写道:
>
> 如题,stop,停止并保存检查点失败。
> 测试看 cancel、cancel -s 方式都成功。 cancel -s 可成功生成检查点并退出。
>
> stop则不行,报错主要是
> Could not stop with a savepoint job "1b87f308e2582f3cc0e3ccc812471201"
> ...
> Caused by: java.util.concurrent.ExecutionException:
> java.util.concurrent.CompletionException:
> org.apache.flink.runtime.checkpoint.CheckpointEx
> ception: Task has failed.
> ...
> Caused by: org.apache.flink.util.SerializedThrowable:
> org.apache.flink.runtime.checkpoint.CheckpointException: Task has
> failed.
> ...
> Caused by: org.apache.flink.util.SerializedThrowable: Task has failed.
> ...
>
> ______详细日志: