You are viewing a plain text version of this content. The canonical link for it is here.
Posted to user@flink.apache.org by "Griess, Andrew" <an...@sap.com> on 2015/12/15 16:45:32 UTC

global watermark across multiple kafka consumers

Hi guys,

I have a question related to utilizing watermarks with multiple FlinkKakfkaConsumer082 instances. The aim is to have a global watermark across multiple kafka consumers where any message from any kafka partition would update the same watermark. When testing a simple TimeStampExtractor implementation it seems each consumer results in a separate watermark. Is there a prescribed way of handling such a thing that anyone has any experience with?

Thanks for your help,

Andrew Griess


Re: global watermark across multiple kafka consumers

Posted by Stephan Ewen <se...@apache.org>.
@Andrew: Just to make sure that there is no confusion:

Even though every Kafka Source generates a local watermark, these
watermarks are merged when the streams are merged, for example in union()
or keyBy() steps.
The operator that merges streams tracks the current watermark of each
stream and then emits according watermarks.

That way, most cases do not need and global watermark coordination.



Here is an example:

DataStream<X> kafka1 = env.addSource(new KafakConsumer("topicA",
...)).setParallelism(2); // assume this has 2 kafka partitions

DataStream<X> kafka2 = env.addSource(new KafakConsumer("topicB",
...)).setParallelism(1); // asume this has 1 kafka partition

kafka1.keyBy(...).map(...).union(kafka2);


Now assume this:

  - Kafka source 1 (subtask 1) emits watermark 17
  - Kafka source 1 (subtask 2) emits watermark 21
  - Kafka source 2 (subtask 1) emits watermark 5

  - The map() after the keyBy receives the watermarks from the two subtasks
of kafka1 and has its watermark at 17 (the min)

  - The union() has the watermark at 5

  - Kafka source 1 (subtask 1) emits watermark 24
  - Kafka source 1 (subtask 2) emits watermark 22
  - Kafka source 2 (subtask 1) emits watermark 27

  - The map() after the keyBy now has the watermark at 22
  - The union has the watermark at 22 as well now


Hope that illustrates Flink's mechanism a bit. Do you think this mechanism
handle your watermark coordination?

Greetings,
Stephan



On Wed, Dec 16, 2015 at 11:06 AM, Till Rohrmann <tr...@apache.org>
wrote:

> Hi Andrew,
>
> as far as I know, there is nothing such as a prescribed way of handling
> this kind of situation. If you want to synchronize the watermark generation
> given a set of KafkaConsumers you need some kind of ground truth.
>
> This could be, for example, a central registry such as ZooKeeper in which
> you collect the current watermarks of the different consumers. You could
> access ZooKeeper from inside the TimestampExtractor.
>
> Alternatively, however a bit more hacky, you could exploit that the
> consumer tasks are usually colocated with consumer tasks from different
> topics. This means that you'll have multiple subtasks reading from the
> different Kafka topics running in the same JVM. You could then use class
> variables to synchronize the watermarks. But this assumes that each subtask
> reading the topic t from Kafka is colocated with at least one other subtask
> reading the topic t' from Kafka with t' in T \ {t} and T being the set of
> Kafka topics. Per default this should be the case.
>
> I'm wondering why you need a global watermark for you Kafka topics. Isn't
> it enough that you have individual watermarks for each topic?
>
> Cheers,
> Till
>
> On Tue, Dec 15, 2015 at 4:45 PM, Griess, Andrew <an...@sap.com>
> wrote:
>
>> Hi guys,
>>
>> I have a question related to utilizing watermarks with multiple
>> FlinkKakfkaConsumer082 instances. The aim is to have a global watermark
>> across multiple kafka consumers where any message from any kafka partition
>> would update the same watermark. When testing a simple TimeStampExtractor
>> implementation it seems each consumer results in a separate watermark. Is
>> there a prescribed way of handling such a thing that anyone has any
>> experience with?
>>
>> Thanks for your help,
>>
>> Andrew Griess
>>
>>
>

Re: global watermark across multiple kafka consumers

Posted by Till Rohrmann <tr...@apache.org>.
Hi Andrew,

as far as I know, there is nothing such as a prescribed way of handling
this kind of situation. If you want to synchronize the watermark generation
given a set of KafkaConsumers you need some kind of ground truth.

This could be, for example, a central registry such as ZooKeeper in which
you collect the current watermarks of the different consumers. You could
access ZooKeeper from inside the TimestampExtractor.

Alternatively, however a bit more hacky, you could exploit that the
consumer tasks are usually colocated with consumer tasks from different
topics. This means that you'll have multiple subtasks reading from the
different Kafka topics running in the same JVM. You could then use class
variables to synchronize the watermarks. But this assumes that each subtask
reading the topic t from Kafka is colocated with at least one other subtask
reading the topic t' from Kafka with t' in T \ {t} and T being the set of
Kafka topics. Per default this should be the case.

I'm wondering why you need a global watermark for you Kafka topics. Isn't
it enough that you have individual watermarks for each topic?

Cheers,
Till

On Tue, Dec 15, 2015 at 4:45 PM, Griess, Andrew <an...@sap.com>
wrote:

> Hi guys,
>
> I have a question related to utilizing watermarks with multiple
> FlinkKakfkaConsumer082 instances. The aim is to have a global watermark
> across multiple kafka consumers where any message from any kafka partition
> would update the same watermark. When testing a simple TimeStampExtractor
> implementation it seems each consumer results in a separate watermark. Is
> there a prescribed way of handling such a thing that anyone has any
> experience with?
>
> Thanks for your help,
>
> Andrew Griess
>
>