You are viewing a plain text version of this content. The canonical link for it is here.
Posted to user@flink.apache.org by Cristian Constantinescu <ze...@gmail.com> on 2022/01/28 21:42:38 UTC

EOS from checkpoints doesn't seem to work

Hi everyone,

From the mailing list, I see this question asked a lot. But I can't seem to
find a solution to my problem. I would appreciate some help.

The requirement for our project is that we do not lose data, and not
produce duplicate records. Our pipelines are written with Apache Beam
(2.35.0) and run on a single Flink (1.13.1) node (for now, while we
transition to Kubernetes).

We read a topic from Kafka, join it with another topic, and output the
result in a third topic. When we introduce an exception artificially into
the pipeline (by listening to a debug topic that throws an exception on
every message it gets), we observe that restarting the pipeline from the
last checkpoint does not pick up where it left off. I'm not really sure
why...

On the Beam side, the pipeline is configured with
.withReadCommited().commitOffesetsInFinalize() and enable.auto.commit is
set to false for the consumer, and with .withEOS(1, "sink-group-name"). On
the Flink side, --externilizeCheckpointsEnabled is set to true, and
--checkpointInterval is set to 1minute.

I let the pipeline run for 4 checkpoints. Between checkpoint #2 and #3, I
observe that the kafka consumer group of the main topic which started from
the start of the topic has already reached the end. I trigger the exception
between checkpoint #4 and #5 and the pipeline stops because
--numberOfExecutionRetries=2. When I restart the pipeline and specify the
metadata file in the chk-4 directory, I would expect the pipeline to
continue processing the items still pending to be processed (estimated to
about 80k of them) after checkpoint #4. Unfortunately, no item is
processed. Nothing is read from kafka. The pipeline just sits around
waiting for new messages in the main topic.

Could anyone help me figure out what's going on? I'm sure it's a user
mistake, but I'm unsure how to debug it.

Cheers,
Cristian