You are viewing a plain text version of this content. The canonical link for it is here.
Posted to user@flink.apache.org by Daniel Peled <da...@gmail.com> on 2020/12/28 06:57:21 UTC

Chaining 2 flink jobs through a KAFKA topic with checkpoint enabled

Hello,

We have 2 flink jobs that communicate with each other through a KAFKA topic.
Both jobs use checkpoints with EXACTLY ONCE semantic.

We have seen the following behaviour and we want to make sure and ask if
this is the expected behaviour or maybe it is a bug.

When the first job produces a message to KAFKA, the message is consumed  by
the second job with a latency that depends on the *first* job *checkpoint
interval*.

We are able to read the message using the kafka tool or using another kafka
consumer, but NOT with a flink kafka consumer that again depends on the
checkpoint interval of the first job.

How come the consumer of the second job depends on the producer transaction
commit time of the first job ?

BR,
Danny

Re: Chaining 2 flink jobs through a KAFKA topic with checkpoint enabled

Posted by Daniel Peled <da...@gmail.com>.
Thank you for your answers.
We ended up changing the isolation level to read_uncommitted and it solves
the latency problem between the two jobs understanding that we may have
duplications in the second job when the first job fails and roll back.

בתאריך יום ג׳, 5 בינו׳ 2021 ב-15:23 מאת 赵一旦 <hi...@gmail.com>:

> I think what you need is
> http://kafka.apache.org/documentation/#consumerconfigs_isolation.level .
>
> The isolation.level setting's default value is read_uncommitted. So,
> maybe you do not use the default setting?
>
> 赵一旦 <hi...@gmail.com> 于2021年1月5日周二 下午9:10写道:
>
>> I do not have this problem, so I guess it is related with the config of
>> your kafka producer and consumer, and maybe kafka topic properties or kafka
>> server properties also.
>>
>> Arvid Heise <ar...@ververica.com> 于2021年1月5日周二 下午6:47写道:
>>
>>> Hi Daniel,
>>>
>>> Flink commits transactions on checkpoints while Kafka Streams/connect
>>> usually commits on record. This is the typical tradeoff between Throughput
>>> and Latency. By decreasing the checkpoint interval in Flink, you can reach
>>> comparable latency to Kafka Streams.
>>>
>>> If you have two exactly once jobs, the second job may only read data
>>> that has been committed (not dirty as Chesnay said). If the second job were
>>> to consume data that is uncommitted, it will result in duplicates, in case
>>> the first job fails and rolls back.
>>>
>>> You can configure the read behavior with isolation.level. If you want
>>> to implement exactly once behavior, you also need to set that level in your
>>> other Kafka consumers [1]. Also compare what Kafka Streams is setting if
>>> you want to go exactly once [2].
>>>
>>> If you really want low latency, please also double-check if you really
>>> need exactly once.
>>>
>>> [1]
>>> https://kafka.apache.org/documentation/#consumerconfigs_isolation.level
>>> [2]
>>> https://kafka.apache.org/documentation/streams/developer-guide/config-streams.html#processing-guarantee
>>>
>>> On Mon, Dec 28, 2020 at 12:22 PM Chesnay Schepler <ch...@apache.org>
>>> wrote:
>>>
>>>> I don't particularly know the our Kafka connector, but it sounds like a
>>>> matter of whether a given consumer does dirty reads.
>>>> Flink does not, whereas the other tools you're using do.
>>>>
>>>> On 12/28/2020 7:57 AM, Daniel Peled wrote:
>>>>
>>>> Hello,
>>>>
>>>> We have 2 flink jobs that communicate with each other through a KAFKA
>>>> topic.
>>>> Both jobs use checkpoints with EXACTLY ONCE semantic.
>>>>
>>>> We have seen the following behaviour and we want to make sure and ask
>>>> if this is the expected behaviour or maybe it is a bug.
>>>>
>>>> When the first job produces a message to KAFKA, the message is consumed
>>>>  by the second job with a latency that depends on the *first* job *checkpoint
>>>> interval*.
>>>>
>>>> We are able to read the message using the kafka tool or using another
>>>> kafka consumer, but NOT with a flink kafka consumer that again depends on
>>>> the checkpoint interval of the first job.
>>>>
>>>> How come the consumer of the second job depends on the producer
>>>> transaction commit time of the first job ?
>>>>
>>>> BR,
>>>> Danny
>>>>
>>>>
>>>>
>>>
>>> --
>>>
>>> Arvid Heise | Senior Java Developer
>>>
>>> <https://www.ververica.com/>
>>>
>>> Follow us @VervericaData
>>>
>>> --
>>>
>>> Join Flink Forward <https://flink-forward.org/> - The Apache Flink
>>> Conference
>>>
>>> Stream Processing | Event Driven | Real Time
>>>
>>> --
>>>
>>> Ververica GmbH | Invalidenstrasse 115, 10115 Berlin, Germany
>>> <https://www.google.com/maps/search/Invalidenstrasse+115,+10115+Berlin,+Germany?entry=gmail&source=g>
>>>
>>> --
>>> Ververica GmbH
>>> Registered at Amtsgericht Charlottenburg: HRB 158244 B
>>> Managing Directors: Timothy Alexander Steinert, Yip Park Tung Jason, Ji
>>> (Toni) Cheng
>>>
>>

Re: Chaining 2 flink jobs through a KAFKA topic with checkpoint enabled

Posted by 赵一旦 <hi...@gmail.com>.
I think what you need is
http://kafka.apache.org/documentation/#consumerconfigs_isolation.level .

The isolation.level setting's default value is read_uncommitted. So, maybe
you do not use the default setting?

赵一旦 <hi...@gmail.com> 于2021年1月5日周二 下午9:10写道:

> I do not have this problem, so I guess it is related with the config of
> your kafka producer and consumer, and maybe kafka topic properties or kafka
> server properties also.
>
> Arvid Heise <ar...@ververica.com> 于2021年1月5日周二 下午6:47写道:
>
>> Hi Daniel,
>>
>> Flink commits transactions on checkpoints while Kafka Streams/connect
>> usually commits on record. This is the typical tradeoff between Throughput
>> and Latency. By decreasing the checkpoint interval in Flink, you can reach
>> comparable latency to Kafka Streams.
>>
>> If you have two exactly once jobs, the second job may only read data that
>> has been committed (not dirty as Chesnay said). If the second job were to
>> consume data that is uncommitted, it will result in duplicates, in case the
>> first job fails and rolls back.
>>
>> You can configure the read behavior with isolation.level. If you want to
>> implement exactly once behavior, you also need to set that level in your
>> other Kafka consumers [1]. Also compare what Kafka Streams is setting if
>> you want to go exactly once [2].
>>
>> If you really want low latency, please also double-check if you really
>> need exactly once.
>>
>> [1]
>> https://kafka.apache.org/documentation/#consumerconfigs_isolation.level
>> [2]
>> https://kafka.apache.org/documentation/streams/developer-guide/config-streams.html#processing-guarantee
>>
>> On Mon, Dec 28, 2020 at 12:22 PM Chesnay Schepler <ch...@apache.org>
>> wrote:
>>
>>> I don't particularly know the our Kafka connector, but it sounds like a
>>> matter of whether a given consumer does dirty reads.
>>> Flink does not, whereas the other tools you're using do.
>>>
>>> On 12/28/2020 7:57 AM, Daniel Peled wrote:
>>>
>>> Hello,
>>>
>>> We have 2 flink jobs that communicate with each other through a KAFKA
>>> topic.
>>> Both jobs use checkpoints with EXACTLY ONCE semantic.
>>>
>>> We have seen the following behaviour and we want to make sure and ask if
>>> this is the expected behaviour or maybe it is a bug.
>>>
>>> When the first job produces a message to KAFKA, the message is consumed
>>>  by the second job with a latency that depends on the *first* job *checkpoint
>>> interval*.
>>>
>>> We are able to read the message using the kafka tool or using another
>>> kafka consumer, but NOT with a flink kafka consumer that again depends on
>>> the checkpoint interval of the first job.
>>>
>>> How come the consumer of the second job depends on the producer
>>> transaction commit time of the first job ?
>>>
>>> BR,
>>> Danny
>>>
>>>
>>>
>>
>> --
>>
>> Arvid Heise | Senior Java Developer
>>
>> <https://www.ververica.com/>
>>
>> Follow us @VervericaData
>>
>> --
>>
>> Join Flink Forward <https://flink-forward.org/> - The Apache Flink
>> Conference
>>
>> Stream Processing | Event Driven | Real Time
>>
>> --
>>
>> Ververica GmbH | Invalidenstrasse 115, 10115 Berlin, Germany
>>
>> --
>> Ververica GmbH
>> Registered at Amtsgericht Charlottenburg: HRB 158244 B
>> Managing Directors: Timothy Alexander Steinert, Yip Park Tung Jason, Ji
>> (Toni) Cheng
>>
>

Re: Chaining 2 flink jobs through a KAFKA topic with checkpoint enabled

Posted by 赵一旦 <hi...@gmail.com>.
I do not have this problem, so I guess it is related with the config of
your kafka producer and consumer, and maybe kafka topic properties or kafka
server properties also.

Arvid Heise <ar...@ververica.com> 于2021年1月5日周二 下午6:47写道:

> Hi Daniel,
>
> Flink commits transactions on checkpoints while Kafka Streams/connect
> usually commits on record. This is the typical tradeoff between Throughput
> and Latency. By decreasing the checkpoint interval in Flink, you can reach
> comparable latency to Kafka Streams.
>
> If you have two exactly once jobs, the second job may only read data that
> has been committed (not dirty as Chesnay said). If the second job were to
> consume data that is uncommitted, it will result in duplicates, in case the
> first job fails and rolls back.
>
> You can configure the read behavior with isolation.level. If you want to
> implement exactly once behavior, you also need to set that level in your
> other Kafka consumers [1]. Also compare what Kafka Streams is setting if
> you want to go exactly once [2].
>
> If you really want low latency, please also double-check if you really
> need exactly once.
>
> [1]
> https://kafka.apache.org/documentation/#consumerconfigs_isolation.level
> [2]
> https://kafka.apache.org/documentation/streams/developer-guide/config-streams.html#processing-guarantee
>
> On Mon, Dec 28, 2020 at 12:22 PM Chesnay Schepler <ch...@apache.org>
> wrote:
>
>> I don't particularly know the our Kafka connector, but it sounds like a
>> matter of whether a given consumer does dirty reads.
>> Flink does not, whereas the other tools you're using do.
>>
>> On 12/28/2020 7:57 AM, Daniel Peled wrote:
>>
>> Hello,
>>
>> We have 2 flink jobs that communicate with each other through a KAFKA
>> topic.
>> Both jobs use checkpoints with EXACTLY ONCE semantic.
>>
>> We have seen the following behaviour and we want to make sure and ask if
>> this is the expected behaviour or maybe it is a bug.
>>
>> When the first job produces a message to KAFKA, the message is consumed
>>  by the second job with a latency that depends on the *first* job *checkpoint
>> interval*.
>>
>> We are able to read the message using the kafka tool or using another
>> kafka consumer, but NOT with a flink kafka consumer that again depends on
>> the checkpoint interval of the first job.
>>
>> How come the consumer of the second job depends on the producer
>> transaction commit time of the first job ?
>>
>> BR,
>> Danny
>>
>>
>>
>
> --
>
> Arvid Heise | Senior Java Developer
>
> <https://www.ververica.com/>
>
> Follow us @VervericaData
>
> --
>
> Join Flink Forward <https://flink-forward.org/> - The Apache Flink
> Conference
>
> Stream Processing | Event Driven | Real Time
>
> --
>
> Ververica GmbH | Invalidenstrasse 115, 10115 Berlin, Germany
>
> --
> Ververica GmbH
> Registered at Amtsgericht Charlottenburg: HRB 158244 B
> Managing Directors: Timothy Alexander Steinert, Yip Park Tung Jason, Ji
> (Toni) Cheng
>

Re: Chaining 2 flink jobs through a KAFKA topic with checkpoint enabled

Posted by Arvid Heise <ar...@ververica.com>.
Hi Daniel,

Flink commits transactions on checkpoints while Kafka Streams/connect
usually commits on record. This is the typical tradeoff between Throughput
and Latency. By decreasing the checkpoint interval in Flink, you can reach
comparable latency to Kafka Streams.

If you have two exactly once jobs, the second job may only read data that
has been committed (not dirty as Chesnay said). If the second job were to
consume data that is uncommitted, it will result in duplicates, in case the
first job fails and rolls back.

You can configure the read behavior with isolation.level. If you want to
implement exactly once behavior, you also need to set that level in your
other Kafka consumers [1]. Also compare what Kafka Streams is setting if
you want to go exactly once [2].

If you really want low latency, please also double-check if you really need
exactly once.

[1] https://kafka.apache.org/documentation/#consumerconfigs_isolation.level
[2]
https://kafka.apache.org/documentation/streams/developer-guide/config-streams.html#processing-guarantee

On Mon, Dec 28, 2020 at 12:22 PM Chesnay Schepler <ch...@apache.org>
wrote:

> I don't particularly know the our Kafka connector, but it sounds like a
> matter of whether a given consumer does dirty reads.
> Flink does not, whereas the other tools you're using do.
>
> On 12/28/2020 7:57 AM, Daniel Peled wrote:
>
> Hello,
>
> We have 2 flink jobs that communicate with each other through a KAFKA
> topic.
> Both jobs use checkpoints with EXACTLY ONCE semantic.
>
> We have seen the following behaviour and we want to make sure and ask if
> this is the expected behaviour or maybe it is a bug.
>
> When the first job produces a message to KAFKA, the message is consumed
>  by the second job with a latency that depends on the *first* job *checkpoint
> interval*.
>
> We are able to read the message using the kafka tool or using another
> kafka consumer, but NOT with a flink kafka consumer that again depends on
> the checkpoint interval of the first job.
>
> How come the consumer of the second job depends on the producer
> transaction commit time of the first job ?
>
> BR,
> Danny
>
>
>

-- 

Arvid Heise | Senior Java Developer

<https://www.ververica.com/>

Follow us @VervericaData

--

Join Flink Forward <https://flink-forward.org/> - The Apache Flink
Conference

Stream Processing | Event Driven | Real Time

--

Ververica GmbH | Invalidenstrasse 115, 10115 Berlin, Germany

--
Ververica GmbH
Registered at Amtsgericht Charlottenburg: HRB 158244 B
Managing Directors: Timothy Alexander Steinert, Yip Park Tung Jason, Ji
(Toni) Cheng

Re: Chaining 2 flink jobs through a KAFKA topic with checkpoint enabled

Posted by Chesnay Schepler <ch...@apache.org>.
I don't particularly know the our Kafka connector, but it sounds like a 
matter of whether a given consumer does dirty reads.
Flink does not, whereas the other tools you're using do.

On 12/28/2020 7:57 AM, Daniel Peled wrote:
> Hello,
>
> We have 2 flink jobs that communicate with each other through a KAFKA 
> topic.
> Both jobs use checkpoints with EXACTLY ONCE semantic.
>
> We have seen the following behaviour and we want to make sure and ask 
> if this is the expected behaviour or maybe it is a bug.
>
> When the first job produces a message to KAFKA, the message is 
> consumed  by the second job with a latency that depends on the *first* 
> job _checkpoint interval_.
>
> We are able to read the message using the kafka tool or using another 
> kafka consumer, but NOT with a flink kafka consumer that again depends 
> on the checkpoint interval of the first job.
>
> How come the consumer of the second job depends on the producer 
> transaction commit time of the first job ?
>
> BR,
> Danny