You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@flink.apache.org by "Yanfei Lei (Jira)" <ji...@apache.org> on 2022/10/13 10:45:00 UTC

[jira] [Commented] (FLINK-28758) Failed to stop with savepoint

    [ https://issues.apache.org/jira/browse/FLINK-28758?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17616946#comment-17616946 ] 

Yanfei Lei commented on FLINK-28758:
------------------------------------

Hi [~hjw] , from the log, this is because the source failed when you posted the savepoint request.

And "It is successful to use savepoint command alone." is because the job is running when you send the request. I think this is expected behavior, the implementation of stop-with-savepoint behind REST and CLI is the same. 

> Failed to stop with savepoint 
> ------------------------------
>
>                 Key: FLINK-28758
>                 URL: https://issues.apache.org/jira/browse/FLINK-28758
>             Project: Flink
>          Issue Type: Bug
>          Components: Connectors / Kafka, Runtime / Checkpointing, Runtime / Task
>    Affects Versions: 1.15.0
>         Environment: Flink version:1.15.0
> deploy mode :K8s applicaiton Mode.   local mini cluster also have this problem.
> Kafka Connector : use Kafka SourceFunction . No new Api.
>            Reporter: hjw
>            Priority: Major
>
> I post a save with savepoint request to Flink Job throught rest api.
> A Error happened in Kafka connector close.
> The job will enter restarting .
> It is successful to use savepoint command alone.
> {code:java}
> 13:33:42.857 [Kafka Fetcher for Source: nlp-kafka-source -> nlp-clean (1/1)#0] DEBUG org.apache.kafka.clients.consumer.KafkaConsumer - [Consumer clientId=consumer-hjw-3, groupId=hjw] Kafka consumer has been closed
> 13:33:42.857 [Kafka Fetcher for Source: cpp-kafka-source -> cpp-clean (1/1)#0] INFO org.apache.kafka.common.utils.AppInfoParser - App info kafka.consumer for consumer-hjw-4 unregistered
> 13:33:42.857 [Kafka Fetcher for Source: cpp-kafka-source -> cpp-clean (1/1)#0] DEBUG org.apache.kafka.clients.consumer.KafkaConsumer - [Consumer clientId=consumer-hjw-4, groupId=hjw] Kafka consumer has been closed
> 13:33:42.860 [Source: nlp-kafka-source -> nlp-clean (1/1)#0] DEBUG org.apache.flink.streaming.runtime.tasks.StreamTask - Cleanup StreamTask (operators closed: false, cancelled: false)
> 13:33:42.860 [jobmanager-io-thread-4] INFO org.apache.flink.runtime.checkpoint.CheckpointCoordinator - Decline checkpoint 5 by task eeefcb27475446241861ad8db3f33144 of job d6fed247feab1c0bcc1b0dcc2cfb4736 at 79edfa88-ccc3-4140-b0e1-7ce8a7f8669f @ 127.0.0.1 (dataPort=-1).
> org.apache.flink.util.SerializedThrowable: Task name with subtask : Source: nlp-kafka-source -> nlp-clean (1/1)#0 Failure reason: Task has failed.
>  at org.apache.flink.runtime.taskmanager.Task.declineCheckpoint(Task.java:1388)
>  at org.apache.flink.runtime.taskmanager.Task.lambda$triggerCheckpointBarrier$3(Task.java:1331)
>  at java.util.concurrent.CompletableFuture.uniHandle(CompletableFuture.java:822)
>  at java.util.concurrent.CompletableFuture$UniHandle.tryFire(CompletableFuture.java:797)
>  at java.util.concurrent.CompletableFuture.postComplete(CompletableFuture.java:474)
>  at java.util.concurrent.CompletableFuture.completeExceptionally(CompletableFuture.java:1977)
>  at org.apache.flink.streaming.runtime.tasks.SourceStreamTask$LegacySourceFunctionThread.run(SourceStreamTask.java:343)
> Caused by: org.apache.flink.util.SerializedThrowable: org.apache.flink.streaming.connectors.kafka.internals.Handover$ClosedException
>  at java.util.concurrent.CompletableFuture.encodeThrowable(CompletableFuture.java:292)
>  at java.util.concurrent.CompletableFuture.completeThrowable(CompletableFuture.java:308)
>  at java.util.concurrent.CompletableFuture.uniCompose(CompletableFuture.java:943)
>  at java.util.concurrent.CompletableFuture$UniCompose.tryFire(CompletableFuture.java:926)
>  ... 3 common frames omitted
> Caused by: org.apache.flink.util.SerializedThrowable: null
>  at org.apache.flink.streaming.connectors.kafka.internals.Handover.close(Handover.java:177)
>  at org.apache.flink.streaming.connectors.kafka.internals.KafkaFetcher.cancel(KafkaFetcher.java:164)
>  at org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumerBase.cancel(FlinkKafkaConsumerBase.java:945)
>  at org.apache.flink.streaming.api.operators.StreamSource.stop(StreamSource.java:128)
>  at org.apache.flink.streaming.runtime.tasks.SourceStreamTask.stopOperatorForStopWithSavepoint(SourceStreamTask.java:305)
>  at org.apache.flink.streaming.runtime.tasks.SourceStreamTask.lambda$triggerStopWithSavepointAsync$1(SourceStreamTask.java:285)
>  at org.apache.flink.streaming.runtime.tasks.StreamTaskActionExecutor$SynchronizedStreamTaskActionExecutor.runThrowing(StreamTaskActionExecutor.java:93)
>  at org.apache.flink.streaming.runtime.tasks.mailbox.Mail.run(Mail.java:90)
>  at org.apache.flink.streaming.runtime.tasks.mailbox.MailboxProcessor.processMailsWhenDefaultActionUnavailable(MailboxProcessor.java:338)
>  at org.apache.flink.streaming.runtime.tasks.mailbox.MailboxProcessor.processMail(MailboxProcessor.java:324)
>  at org.apache.flink.streaming.runtime.tasks.mailbox.MailboxProcessor.runMailboxLoop(MailboxProcessor.java:201)
>  at org.apache.flink.streaming.runtime.tasks.StreamTask.runMailboxLoop(StreamTask.java:804)
>  at org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:753)
>  at org.apache.flink.runtime.taskmanager.Task.runWithSystemExitMonitoring(Task.java:948)
>  at org.apache.flink.runtime.taskmanager.Task.restoreAndInvoke(Task.java:927)
>  at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:741)
>  at org.apache.flink.runtime.taskmanager.Task.run(Task.java:563)
>  at java.lang.Thread.run(Thread.java:748)
> 13:34:00.925 [flink-akka.actor.default-dispatcher-21] DEBUG org.apache.flink.runtime.jobmaster.JobMaster - Trigger heartbeat request.
> 13:34:00.926 [flink-akka.actor.default-dispatcher-22] DEBUG org.apache.flink.runtime.resourcemanager.StandaloneResourceManager - Trigger heartbeat request.
> 13:34:00.926 [flink-akka.actor.default-dispatcher-21] DEBUG org.apache.flink.runtime.taskexecutor.TaskExecutor - Received heartbeat request from 85d44174e17281984d28699c42e3eed6.
> {code}



--
This message was sent by Atlassian Jira
(v8.20.10#820010)