You are viewing a plain text version of this content. The canonical link for it is here.
Posted to user@flink.apache.org by nick toker <ni...@gmail.com> on 2020/12/23 09:10:32 UTC

Long latency when consuming a message from KAFKA and checkpoint is enabled

Hello

We noticed the following behavior:
If we enable the flink checkpoints, we saw that there is a delay between
the time we write a message to the KAFKA topic and the time the flink kafka
connector consumes this message.
The delay is closely related to checkpointInterval and/or
minPauseBetweenCheckpoints meening that the MAX delay when consuming a
message from KAFKA will be one of these parameters

If we disable the checkpoints, the message is immediately consumed
We work with the EXACTLY_ONCE semantic
Please note that we inject only one message

Could you please advise how we can remove/control this delay?

Please see the attached code of AbstractFetcher and KafkaFetcher (as a png
file)
(For example emitRecordsWithTimestamps() use a lock on checkpointLock).
Could this explain the behaviour ?


BR

Re: Long latency when consuming a message from KAFKA and checkpoint is enabled

Posted by Arvid Heise <ar...@ververica.com>.
Hi Nick,

I'm not entirely sure that I understand your setup correctly.

Basically, when enabling exactly once and checkpointing, Flink will only
consume messages that have been committed.
If you chain two Flink jobs with an intermediate Kafka topic, then the
first Flink job will only commit messages on checkpoints and thus the
second Flink job will only read these messages with a delay up to the
checkpoint interval.

Now if your input record is created with a different tool, make sure that
you commit it immediately. Then, Flink should immediately also process that
record. However, note that Flink again writes the record in a transaction.
Thus, if your tests involve you checking for the output, you would need to
configure your reader to read uncommitted data [1].

You can decrease the latency by decreasing the checkpointing interval. If
you have a need for very low latency, you might also check if you really
need exactly once (that's typically not necessary).

[1] https://kafka.apache.org/documentation/#consumerconfigs_isolation.level

On Mon, Dec 28, 2020 at 3:07 AM Danny Chan <da...@apache.org> wrote:

> Hi, Nick ~
> The behavior is as expected, because Kafka source/sink relies on the
> Checkpoints to complement the exactly-once write semantics, a checkpoint
> snapshot the states on a time point which is used for recovering, the
> current internals for Kafka sink is that it writes to Kafka but only
> commits it when a checkpoint completes.
>
> For your needs, i guess you want a more near-real-time write but still
> keep the exactly once semantics, i'm sorry to tell that there is no other infrastructure
> that we can use for exactly-once semantics except for the checkpoints.
>
> nick toker <ni...@gmail.com> 于2020年12月27日周日 下午3:12写道:
>
>> Hi
>>
>> any idea?
>> is it a bug?
>>
>>
>> regards'
>> nick
>>
>> ‫בתאריך יום ד׳, 23 בדצמ׳ 2020 ב-11:10 מאת ‪nick toker‬‏ <‪
>> nick.toker.dev@gmail.com‬‏>:‬
>>
>>> Hello
>>>
>>> We noticed the following behavior:
>>> If we enable the flink checkpoints, we saw that there is a delay between
>>> the time we write a message to the KAFKA topic and the time the flink kafka
>>> connector consumes this message.
>>> The delay is closely related to checkpointInterval and/or
>>> minPauseBetweenCheckpoints meening that the MAX delay when consuming a
>>> message from KAFKA will be one of these parameters
>>>
>>> If we disable the checkpoints, the message is immediately consumed
>>> We work with the EXACTLY_ONCE semantic
>>> Please note that we inject only one message
>>>
>>> Could you please advise how we can remove/control this delay?
>>>
>>> Please see the attached code of AbstractFetcher and KafkaFetcher (as a
>>> png file)
>>> (For example emitRecordsWithTimestamps() use a lock on checkpointLock).
>>> Could this explain the behaviour ?
>>>
>>>
>>> BR
>>>
>>

-- 

Arvid Heise | Senior Java Developer

<https://www.ververica.com/>

Follow us @VervericaData

--

Join Flink Forward <https://flink-forward.org/> - The Apache Flink
Conference

Stream Processing | Event Driven | Real Time

--

Ververica GmbH | Invalidenstrasse 115, 10115 Berlin, Germany

--
Ververica GmbH
Registered at Amtsgericht Charlottenburg: HRB 158244 B
Managing Directors: Timothy Alexander Steinert, Yip Park Tung Jason, Ji
(Toni) Cheng

Re: Long latency when consuming a message from KAFKA and checkpoint is enabled

Posted by Danny Chan <da...@apache.org>.
Hi, Nick ~
The behavior is as expected, because Kafka source/sink relies on the
Checkpoints to complement the exactly-once write semantics, a checkpoint
snapshot the states on a time point which is used for recovering, the
current internals for Kafka sink is that it writes to Kafka but only
commits it when a checkpoint completes.

For your needs, i guess you want a more near-real-time write but still keep
the exactly once semantics, i'm sorry to tell that there is no other
infrastructure
that we can use for exactly-once semantics except for the checkpoints.

nick toker <ni...@gmail.com> 于2020年12月27日周日 下午3:12写道:

> Hi
>
> any idea?
> is it a bug?
>
>
> regards'
> nick
>
> ‫בתאריך יום ד׳, 23 בדצמ׳ 2020 ב-11:10 מאת ‪nick toker‬‏ <‪
> nick.toker.dev@gmail.com‬‏>:‬
>
>> Hello
>>
>> We noticed the following behavior:
>> If we enable the flink checkpoints, we saw that there is a delay between
>> the time we write a message to the KAFKA topic and the time the flink kafka
>> connector consumes this message.
>> The delay is closely related to checkpointInterval and/or
>> minPauseBetweenCheckpoints meening that the MAX delay when consuming a
>> message from KAFKA will be one of these parameters
>>
>> If we disable the checkpoints, the message is immediately consumed
>> We work with the EXACTLY_ONCE semantic
>> Please note that we inject only one message
>>
>> Could you please advise how we can remove/control this delay?
>>
>> Please see the attached code of AbstractFetcher and KafkaFetcher (as a
>> png file)
>> (For example emitRecordsWithTimestamps() use a lock on checkpointLock).
>> Could this explain the behaviour ?
>>
>>
>> BR
>>
>

Re: Long latency when consuming a message from KAFKA and checkpoint is enabled

Posted by nick toker <ni...@gmail.com>.
Hi

any idea?
is it a bug?


regards'
nick

‫בתאריך יום ד׳, 23 בדצמ׳ 2020 ב-11:10 מאת ‪nick toker‬‏ <‪
nick.toker.dev@gmail.com‬‏>:‬

> Hello
>
> We noticed the following behavior:
> If we enable the flink checkpoints, we saw that there is a delay between
> the time we write a message to the KAFKA topic and the time the flink kafka
> connector consumes this message.
> The delay is closely related to checkpointInterval and/or
> minPauseBetweenCheckpoints meening that the MAX delay when consuming a
> message from KAFKA will be one of these parameters
>
> If we disable the checkpoints, the message is immediately consumed
> We work with the EXACTLY_ONCE semantic
> Please note that we inject only one message
>
> Could you please advise how we can remove/control this delay?
>
> Please see the attached code of AbstractFetcher and KafkaFetcher (as a png
> file)
> (For example emitRecordsWithTimestamps() use a lock on checkpointLock).
> Could this explain the behaviour ?
>
>
> BR
>