You are viewing a plain text version of this content. The canonical link for it is here.
Posted to user-zh@flink.apache.org by Yun Tang <my...@live.com> on 2020/01/15 19:23:44 UTC

Re: When I use flink 1.9.1 and produce data to Kafka 1.1.1, The streamTask checkpoint error .

Hi

The root cause is checkpoint error due to fail to send data to kafka during 'preCommit'. The right solution is avoid to send data to kafka unsuccessfully which might be scope of Kafka.

If you cannot ensure the status of kafka with its client and no request for exactly once, you can pass FlinkKafkaProducer.Semantic.NONE to disable sending data during 'preCommit' when creating the kafka producer.

If you don't want job failed due to checkpoint error, you can increase the tolerableDeclinedCheckpointNumber:

env.getCheckpointConfig().setTolerableCheckpointFailureNumber(tolerableDeclinedCheckpointNumber);

Best
Yun Tang
________________________________
From: jose farfan <jo...@gmail.com>
Sent: Wednesday, January 15, 2020 23:21
To: ouywl <ou...@139.com>
Cc: user <us...@flink.apache.org>; user-zh@flink.apache.org <us...@flink.apache.org>
Subject: Re: When I use flink 1.9.1 and produce data to Kafka 1.1.1, The streamTask checkpoint error .

Hi

I have the same issue.

BR
Jose

On Thu, 9 Jan 2020 at 10:28, ouywl <ou...@139.com>> wrote:
Hi all:
When I use flink 1.9.1 and produce data to Kafka 1.1.1. the error was happen as log-1,code is::

input.addSink(
        new FlinkKafkaProducer<KafkaEvent>(
                parameterTool.getRequired("bootstrap.servers"),
                parameterTool.getRequired("output-topic"),
                new KafkaEventDeSchema()));

Log-1:
2020-01-09 09:13:44,476 INFO org.apache.flink.runtime.checkpoint.CheckpointCoordinator - Triggering checkpoint 1 @ 1578561224466 for job d8827b3f4165b6ba27c8b59c7aa1a400.
2020-01-09 09:15:33,069 INFO org.apache.flink.runtime.checkpoint.CheckpointCoordinator - Decline checkpoint 1 by task f643244ff791dbd3fbfb88bfafdf1872 of job d8827b3f4165b6ba27c8b59c7aa1a400 at ee8e6d8e92f9a59f578b1de2edd73537 @ producedata-taskmanager-d59d5cb7c-pv27j (dataPort=33361).
2020-01-09 09:15:33,070 INFO org.apache.flink.runtime.checkpoint.CheckpointCoordinator - Discarding checkpoint 1 of job d8827b3f4165b6ba27c8b59c7aa1a400.
org.apache.flink.runtime.checkpoint.CheckpointException: Could not complete snapshot 1 for operator Sink: Unnamed (1/2). Failure reason: Checkpoint was declined.
    at org.apache.flink.streaming.api.operators.AbstractStreamOperator.snapshotState(AbstractStreamOperator.java:431)
    at org.apache.flink.streaming.runtime.tasks.StreamTask$CheckpointingOperation.checkpointStreamOperator(StreamTask.java:1282)
    at org.apache.flink.streaming.runtime.tasks.StreamTask$CheckpointingOperation.executeCheckpointing(StreamTask.java:1216)
    at org.apache.flink.streaming.runtime.tasks.StreamTask.checkpointState(StreamTask.java:872)
    at org.apache.flink.streaming.runtime.tasks.StreamTask.performCheckpoint(StreamTask.java:777)
    at org.apache.flink.streaming.runtime.tasks.StreamTask.triggerCheckpointOnBarrier(StreamTask.java:708)
    at org.apache.flink.streaming.runtime.io<http://org.apache.flink.streaming.runtime.io>.CheckpointBarrierHandler.notifyCheckpoint(CheckpointBarrierHandler.java:88)
    at org.apache.flink.streaming.runtime.io<http://org.apache.flink.streaming.runtime.io>.CheckpointBarrierAligner.processBarrier(CheckpointBarrierAligner.java:113)
    at org.apache.flink.streaming.runtime.io<http://org.apache.flink.streaming.runtime.io>.CheckpointedInputGate.pollNext(CheckpointedInputGate.java:155)
    at org.apache.flink.streaming.runtime.io<http://org.apache.flink.streaming.runtime.io>.StreamTaskNetworkInput.pollNextNullable(StreamTaskNetworkInput.java:102)
    at org.apache.flink.streaming.runtime.io<http://org.apache.flink.streaming.runtime.io>.StreamTaskNetworkInput.pollNextNullable(StreamTaskNetworkInput.java:47)
    at org.apache.flink.streaming.runtime.io<http://org.apache.flink.streaming.runtime.io>.StreamOneInputProcessor.processInput(StreamOneInputProcessor.java:135)
    at org.apache.flink.streaming.runtime.tasks.StreamTask.processInput(StreamTask.java:279)
    at org.apache.flink.streaming.runtime.tasks.StreamTask.run(StreamTask.java:301)
    at org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:406)
    at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:705)
    at org.apache.flink.runtime.taskmanager.Task.run(Task.java:530)
    at java.lang.Thread.run(Thread.java:748)
Caused by: org.apache.flink.streaming.connectors.kafka.FlinkKafkaException: Failed to send data to Kafka: Expiring 58 record(s) for k8s-test-data-0:120018 ms has passed since batch creation
    at org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducer.checkErroneous(FlinkKafkaProducer.java:1196)
    at org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducer.flush(FlinkKafkaProducer.java:968)
    at org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducer.preCommit(FlinkKafkaProducer.java:892)
    at org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducer.preCommit(FlinkKafkaProducer.java:98)
    at org.apache.flink.streaming.api.functions.sink.TwoPhaseCommitSinkFunction.snapshotState(TwoPhaseCommitSinkFunction.java:311)
    at org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducer.snapshotState(FlinkKafkaProducer.java:973)
    at org.apache.flink.streaming.util.functions.StreamingFunctionUtils.trySnapshotFunctionState(StreamingFunctionUtils.java:118)
    at org.apache.flink.streaming.util.functions.StreamingFunctionUtils.snapshotFunctionState(StreamingFunctionUtils.java:99)
    at org.apache.flink.streaming.api.operators.AbstractUdfStreamOperator.snapshotState(AbstractUdfStreamOperator.java:90)
    at org.apache.flink.streaming.api.operators.AbstractStreamOperator.snapshotState(AbstractStreamOperator.java:399)
    ... 17 more
Caused by: org.apache.kafka.common.errors.TimeoutException: Expiring 58 record(s) for k8s-test-data-0:120018 ms has passed since batch creation
2020-01-09 09:15:33,074 INFO org.apache.flink.runtime.executiongraph.ExecutionGraph - Job producer data frequece (d8827b3f4165b6ba27c8b59c7aa1a400) switched from state RUNNING to FAILING.
org.apache.flink.util.FlinkRuntimeException: Exceeded checkpoint tolerable failure threshold.
    at org.apache.flink.runtime.checkpoint.CheckpointFailureManager.handleTaskLevelCheckpointException(CheckpointFailureManager.java:87)
    at org.apache.flink.runtime.checkpoint.CheckpointCoordinator.failPendingCheckpointDueToTaskFailure(CheckpointCoordinator.java:1443)
    at org.apache.flink.runtime.checkpoint.CheckpointCoordinator.discardCheckpoint(CheckpointCoordinator.java:1353)
    at org.apache.flink.runtime.checkpoint.CheckpointCoordinator.receiveDeclineMessage(CheckpointCoordinator.java:722)
    at org.apache.flink.runtime.scheduler.LegacyScheduler.lambda$declineCheckpoint$2(LegacyScheduler.java:573)
    at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
    at java.util.concurrent.FutureTask.run(FutureTask.java:266)
    at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.access$201(ScheduledThreadPoolExecutor.java:180)
    at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:293)
    at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
    at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
    at java.lang.Thread.run(Thread.java:748)

Best,
Ouywl