You are viewing a plain text version of this content. The canonical link for it is here.
Posted to user@beam.apache.org by Deepak Nagaraj <de...@primer.ai> on 2022/05/03 21:07:53 UTC

Re: Slow Beam pipeline gets Flink checkpoint timeouts upon Kafka messages

I ran a few more experiments. The gist is that I can get Flink to start
checkpoints even when Kafka backlog > 0, as long as my pipeline is fast
enough.

That is, if the pipeline sleeps for < 0.6 seconds (empirical, take with a
grain of salt), I can see checkpoints getting started even when the Kafka
topic is not fully consumed. But when the pipeline sleeps for longer, I
have always seen it start checkpointing only when backlog goes to zero.

My question now is: why does checkpointing behave differently? i.e., what
allows Flink to start a checkpoint on a fast-enough pipeline?

Also posted on Flink mailing list. [1]

Thanks,
Deepak

[1] https://lists.apache.org/thread/5pwn41q84l4jd5pt3lpym67nw9k715qd

On Wed, Apr 27, 2022 at 9:22 PM Deepak Nagaraj <de...@primer.ai>
wrote:

> Hi Beam team,
>
> We're seeing Apache Beam have checkpoint timeouts on Flink. They happen
> when the pipeline has a slow step and we send a bunch of messages on Kafka.
>
> I have set up a similar pipeline on my laptop that reproduces the problem.
>
> Pipeline details:
> -----------------
>
> * Python, running a Beam pipeline on Flink via PortableRunner
> * Streaming
> * Read from Kafka
> * A slow beam.Map() call
> * WindowInto, Write to files
>
> The pipeline calls beam.Map() on a function with sleep(20 seconds).
>
> Our pipeline configuration:
> * Checkpoint interval = 30s
> * Checkpoint timeout = 60s
> * Fail on checkpointing errors = false
> * Max bundle size = 2
> * Parallelism = 1
>
> Steps to reproduce:
> -------------------
>
> After the pipeline is running, we send 10 Kafka messages from a file as
> follows:
>         seq 1 10 > msgs.txt
>         kcat -b localhost:9092 -P -k "testMessage" -t echo-input -D '\n'
> -l < msgs.txt
>
> What we expected to see:
> ------------------------
>
> Because we set bundle size = 2, we were expecting Beam to pick up 2 Kafka
> records at a time.
>
> Because this would only take 40 seconds, we would be within checkpoint
> timeout, so we were expecting all checkpoints to succeed.
>
> What we see instead:
> --------------------
>
> When we send the messages, we see any checkpoints that start in the next
> 200 seconds to fail with timeouts (20 x 10 messages). Then they start
> working again.
>
> Based on Flink TaskManager logs, it seems that Kafka consumer has read up
> to latest offset (backlog used to be 0, but became 96 bytes):
>
>         2022-04-27 13:12:32,359 DEBUG
> org.apache.beam.sdk.io.kafka.KafkaUnboundedReader            [] - Reader-0:
>  backlog 96
>
> This backlog goes down ultimately to 0 after 200 seconds. Then, the task
> attempts to send an acknowledgment, but by then it's too late.
>
>         2022-04-27 13:15:53,300 WARN
>  org.apache.flink.streaming.runtime.tasks.SubtaskCheckpointCoordinatorImpl
> [] - Time from receiving all checkpoint barriers/RPC to executing it
> exceeded threshold: 198275ms
>
> By this time, Flink JobManager would have already marked that checkpoint
> as failed and started a new one.
>
> Questions on Beam:
> ------------------
>
> We don't fully understand this Beam/Flink behavior, but with regards to
> Beam we wanted to ask:
>
> * Why does Beam seem to process all the Kafka records, even when we have
> set max bundle size = 2?
> * Alternatively, is there any way to limit how many records are read by
> beam.io.kafka.ReadFromKafka()?
>
> I'm happy to share additional logs or any other details I missed here.
>
> Many thanks,
> Deepak
>
>