You are viewing a plain text version of this content. The canonical link for it is here.
Posted to user@flink.apache.org by 祁明良 <mq...@xiaohongshu.com> on 2018/08/22 09:08:48 UTC

Batch expired in FlinkKafkaProducer09

Hi All,

When using FlinkKafkaProducer09 (Flink version 1.4.2), I’m facing an Kafka batch expired error when checkpoint starts. The error log is attached below.

Here is what I have investigated:
1. The error only and always occurs when checkpoint starts.
2. The error seems not related to flushOnCheckpoint config, since it is detected before flush check.
3. There is checkErroneous in the beginning of FlinkKafkaProducerBase.invoke  and FlinkKafkaProducerBase.snapshotState, I don’t know  why the invoke method works fine.
4. There is no problem when having the same code writing to another Kafka cluster. (We just got a new Kafka server to migrate:)
5. The Kafka server is actually of version 0.11, in this job we need to consume from 0.9, and write to 0.11, so we used 09 version.

Best,
Mingliang

java.lang.Exception: Error while triggering checkpoint 3 for Source: v2-kafkaRawUbt -> v2-flatMapUbtEnrich -> v2-filterUbt -> (v2-flatMapUbtAbs -> v2-mapAbsEvent -> v2-flatMapUbtAbsToAlgoEventV2 -> Filter -> Sink: v2-sinkUbtAlgoEventRealtimeV2, v2-filterUbtFeLogErr) (41/226)
at org.apache.flink.runtime.taskmanager.Task$2.run(Task.java:1210)
at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
at java.util.concurrent.FutureTask.run(FutureTask.java:266)
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
at java.lang.Thread.run(Thread.java:748)
Caused by: java.lang.Exception: Could not perform checkpoint 3 for operator Source: v2-kafkaRawUbt -> v2-flatMapUbtEnrich -> v2-filterUbt -> (v2-flatMapUbtAbs -> v2-mapAbsEvent -> v2-flatMapUbtAbsToAlgoEventV2 -> Filter -> Sink: v2-sinkUbtAlgoEventRealtimeV2, v2-filterUbtFeLogErr) (41/226).
at org.apache.flink.streaming.runtime.tasks.StreamTask.triggerCheckpoint(StreamTask.java:544)
at org.apache.flink.streaming.runtime.tasks.SourceStreamTask.triggerCheckpoint(SourceStreamTask.java:111)
at org.apache.flink.runtime.taskmanager.Task$2.run(Task.java:1199)
... 5 more
Caused by: java.lang.Exception: Could not complete snapshot 3 for operator Source: v2-kafkaRawUbt -> v2-flatMapUbtEnrich -> v2-filterUbt -> (v2-flatMapUbtAbs -> v2-mapAbsEvent -> v2-flatMapUbtAbsToAlgoEventV2 -> Filter -> Sink: v2-sinkUbtAlgoEventRealtimeV2, v2-filterUbtFeLogErr) (41/226).
at org.apache.flink.streaming.api.operators.AbstractStreamOperator.snapshotState(AbstractStreamOperator.java:378)
at org.apache.flink.streaming.runtime.tasks.StreamTask$CheckpointingOperation.checkpointStreamOperator(StreamTask.java:1089)
at org.apache.flink.streaming.runtime.tasks.StreamTask$CheckpointingOperation.executeCheckpointing(StreamTask.java:1038)
at org.apache.flink.streaming.runtime.tasks.StreamTask.checkpointState(StreamTask.java:671)
at org.apache.flink.streaming.runtime.tasks.StreamTask.performCheckpoint(StreamTask.java:607)
at org.apache.flink.streaming.runtime.tasks.StreamTask.triggerCheckpoint(StreamTask.java:538)
... 7 more
Caused by: java.lang.Exception: Failed to send data to Kafka: Batch Expired
at org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducerBase.checkErroneous(FlinkKafkaProducerBase.java:373)
at org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducerBase.snapshotState(FlinkKafkaProducerBase.java:350)
at org.apache.flink.streaming.util.functions.StreamingFunctionUtils.trySnapshotFunctionState(StreamingFunctionUtils.java:118)
at org.apache.flink.streaming.util.functions.StreamingFunctionUtils.snapshotFunctionState(StreamingFunctionUtils.java:99)
at org.apache.flink.streaming.api.operators.AbstractUdfStreamOperator.snapshotState(AbstractUdfStreamOperator.java:90)
at org.apache.flink.streaming.api.operators.AbstractStreamOperator.snapshotState(AbstractStreamOperator.java:356)
... 12 more
Caused by: org.apache.kafka.common.errors.TimeoutException: Batch Expired

本邮件及其附件含有小红书公司的保密信息,仅限于发送给以上收件人或群组。禁止任何其他人以任何形式使用(包括但不限于全部或部分地泄露、复制、或散发)本邮件中的信息。如果您错收了本邮件,请您立即电话或邮件通知发件人并删除本邮件!
This communication may contain privileged or other confidential information of Red. If you have received it in error, please advise the sender by reply e-mail and immediately delete the message and any attachments without copying or disclosing the contents. Thank you.