You are viewing a plain text version of this content. The canonical link for it is here.
Posted to user@flink.apache.org by Juho Autio <ju...@rovio.com> on 2017/12/01 12:43:55 UTC

Re: Kafka consumer to sync topics by event time?

Thanks for the answers, I still don't understand why I can see the offsets
being quickly committed to Kafka for the "small topic"? Are they committed
to Kafka before their watermark has passed on Flink's side? That would be
quite confusing.. Indeed when Flink handles the state/offsets internally,
the consumer offsets are committed to Kafka just for reference.

Otherwise, what you're saying sounds very good to me. The documentation
just doesn't explicitly say anything about how it works across topics.

On Kien's answer: "When you join multiple stream with different
watermarks", note that I'm not joining any topics myself, I get them as a
single stream from the Flink kafka consumer based on the list of topics
that I asked for.

Thanks,
Juho

On Wed, Nov 22, 2017 at 2:57 PM, Tzu-Li (Gordon) Tai <tz...@apache.org>
wrote:

> Hi!
>
> The FlinkKafkaConsumer can handle watermark advancement with
> per-Kafka-partition awareness (across partitions of different topics).
> You can see an example of how to do that here [1].
>
> Basically what this does is that it generates watermarks within the Kafka
> consumer individually for each Kafka partition, and the per-partition
> watermarks are aggregated and emitted from the consumer in the same way
> that
> watermarks are aggregated on a stream shuffle; only when the low watermark
> advances across all partitions, should a watermark be emitted from the
> consumer.
>
> Therefore, this helps avoid the problem that you described, in which a
> "big_topic" has subscribed partitions that lags behind others. In this case
> and when the above feature is used, the event time would advance along with
> the lagging "big_topic" partitions and would not result in messages being
> recognized as late and discarded.
>
> Cheers,
> Gordon
>
> [1]
> https://ci.apache.org/projects/flink/flink-docs-release-1.3/dev/event_
> timestamps_watermarks.html#timestamps-per-kafka-partition
>
>
>
> --
> Sent from: http://apache-flink-user-mailing-list-archive.2336050.
> n4.nabble.com/
>

Re: Kafka consumer to sync topics by event time?

Posted by Fabian Hueske <fh...@gmail.com>.
Awesome!

I've given you contributor permissions and assigned FLINK-9183 to you. With
the permissions you can also do that yourself in the future.
Here's a guide for contributions to the documentation [1].

Best, Fabian

[1] http://flink.apache.org/contribute-documentation.html

2018-04-16 15:38 GMT+02:00 Juho Autio <ju...@rovio.com>:

> Great. I'd be happy to contribute. I added 2 sub-tasks in
> https://issues.apache.org/jira/browse/FLINK-5479.
>
> Someone with the privileges could assign this sub-task to me:
> https://issues.apache.org/jira/browse/FLINK-9183?
>
> On Mon, Apr 16, 2018 at 3:14 PM, Fabian Hueske <fh...@gmail.com> wrote:
>
>> Fully agree Juho!
>>
>> Do you want to contribute the docs fix?
>> If yes, we should update FLINK-5479 to make sure that the warning is
>> removed once the bug is fixed.
>>
>> Thanks, Fabian
>>
>> 2018-04-12 9:32 GMT+02:00 Juho Autio <ju...@rovio.com>:
>>
>>> Looks like the bug https://issues.apache.org/jira/browse/FLINK-5479 is
>>> entirely preventing this feature to be used if there are any idle
>>> partitions. It would be nice to mention in documentation that currently
>>> this requires all subscribed partitions to have a constant stream of data
>>> with growing timestamps. When watermark gets stalled on an idle partition
>>> it blocks everything.
>>>
>>> Link to current documentation:
>>> https://ci.apache.org/projects/flink/flink-docs-master/dev/c
>>> onnectors/kafka.html#kafka-consumers-and-timestamp-extractio
>>> nwatermark-emission
>>>
>>> On Mon, Dec 4, 2017 at 4:29 PM, Fabian Hueske <fh...@gmail.com> wrote:
>>>
>>>> You are right, offsets cannot be used for tracking processing progress.
>>>> I think setting Kafka offsets with respect to some progress notion other
>>>> than "has been consumed" would be highly application specific and hard to
>>>> generalize.
>>>> As you said, there might be a window (such as a session window) that is
>>>> open much longer than all other windows and which would hold back the
>>>> offset. Other applications might not use the built-in windows at all but
>>>> custom ProcessFunctions.
>>>>
>>>> Have you considered tracking progress using watermarks?
>>>>
>>>> 2017-12-04 14:42 GMT+01:00 Juho Autio <ju...@rovio.com>:
>>>>
>>>>> Thank you Fabian. Really clear explanation. That matches with my
>>>>> observation indeed (data is not dropped from either small or big topic, but
>>>>> the offsets are advancing in kafka side already before those offsets have
>>>>> been triggered from a window operator).
>>>>>
>>>>> This means that it's a bit harder to meaningfully monitor the job's
>>>>> progress solely based on kafka consumer offsets. Is there a reason why
>>>>> Flink couldn't instead commit the offsets after they have been triggered
>>>>> from downstream windows? I could imagine that this might pose a problem if
>>>>> there are any windows that remain open for a very long time, but in general
>>>>> it would be useful IMHO. Or Flink could even commit both (read vs.
>>>>> triggered) offsets to kafka for monitoring purposes.
>>>>>
>>>>> On Mon, Dec 4, 2017 at 3:30 PM, Fabian Hueske <fh...@gmail.com>
>>>>> wrote:
>>>>>
>>>>>> Hi Juho,
>>>>>>
>>>>>> the partitions of both topics are independently consumed, i.e., at
>>>>>> their own speed without coordination. With the configuration that Gordon
>>>>>> linked, watermarks are generated per partition.
>>>>>> Each source task maintains the latest (and highest) watermark per
>>>>>> partition and propagates the smallest watermark. The same mechanism is
>>>>>> applied for watermarks across tasks (this is what Kien referred to).
>>>>>>
>>>>>> In the case that you are describing, the partitions of the smaller
>>>>>> topic are faster consumed (hence the offsets are faster aligned) but
>>>>>> watermarks are emitted "at the speed" of the bigger topic.
>>>>>> Therefore, the timestamps of records from the smaller topic can be
>>>>>> much ahead of the watermark.
>>>>>> In principle, that does not pose a problem. Stateful operators (such
>>>>>> as windows) remember the "early" records and process them when they receive
>>>>>> a watermark passes the timestamps of the early records.
>>>>>>
>>>>>> Regarding your question "Are they committed to Kafka before their
>>>>>> watermark has passed on Flink's side?":
>>>>>> The offsets of the smaller topic might be checkpointed when all
>>>>>> partitions have been read to the "end" and the bigger topic is still
>>>>>> catching up.
>>>>>> The watermarks are moving at the speed of the bigger topic, but all
>>>>>> "early" events of the smaller topic are stored in stateful operators and
>>>>>> are checkpointed as well.
>>>>>>
>>>>>> So, you do not lose neither early nor late data.
>>>>>>
>>>>>> Best, Fabian
>>>>>>
>>>>>>
>>>>>>
>>>>>> 2017-12-01 13:43 GMT+01:00 Juho Autio <ju...@rovio.com>:
>>>>>>
>>>>>>> Thanks for the answers, I still don't understand why I can see the
>>>>>>> offsets being quickly committed to Kafka for the "small topic"? Are they
>>>>>>> committed to Kafka before their watermark has passed on Flink's side? That
>>>>>>> would be quite confusing.. Indeed when Flink handles the state/offsets
>>>>>>> internally, the consumer offsets are committed to Kafka just for reference.
>>>>>>>
>>>>>>> Otherwise, what you're saying sounds very good to me. The
>>>>>>> documentation just doesn't explicitly say anything about how it works
>>>>>>> across topics.
>>>>>>>
>>>>>>> On Kien's answer: "When you join multiple stream with different
>>>>>>> watermarks", note that I'm not joining any topics myself, I get them as a
>>>>>>> single stream from the Flink kafka consumer based on the list of topics
>>>>>>> that I asked for.
>>>>>>>
>>>>>>> Thanks,
>>>>>>> Juho
>>>>>>>
>>>>>>> On Wed, Nov 22, 2017 at 2:57 PM, Tzu-Li (Gordon) Tai <
>>>>>>> tzulitai@apache.org> wrote:
>>>>>>>
>>>>>>>> Hi!
>>>>>>>>
>>>>>>>> The FlinkKafkaConsumer can handle watermark advancement with
>>>>>>>> per-Kafka-partition awareness (across partitions of different
>>>>>>>> topics).
>>>>>>>> You can see an example of how to do that here [1].
>>>>>>>>
>>>>>>>> Basically what this does is that it generates watermarks within the
>>>>>>>> Kafka
>>>>>>>> consumer individually for each Kafka partition, and the
>>>>>>>> per-partition
>>>>>>>> watermarks are aggregated and emitted from the consumer in the same
>>>>>>>> way that
>>>>>>>> watermarks are aggregated on a stream shuffle; only when the low
>>>>>>>> watermark
>>>>>>>> advances across all partitions, should a watermark be emitted from
>>>>>>>> the
>>>>>>>> consumer.
>>>>>>>>
>>>>>>>> Therefore, this helps avoid the problem that you described, in
>>>>>>>> which a
>>>>>>>> "big_topic" has subscribed partitions that lags behind others. In
>>>>>>>> this case
>>>>>>>> and when the above feature is used, the event time would advance
>>>>>>>> along with
>>>>>>>> the lagging "big_topic" partitions and would not result in messages
>>>>>>>> being
>>>>>>>> recognized as late and discarded.
>>>>>>>>
>>>>>>>> Cheers,
>>>>>>>> Gordon
>>>>>>>>
>>>>>>>> [1]
>>>>>>>> https://ci.apache.org/projects/flink/flink-docs-release-1.3/
>>>>>>>> dev/event_timestamps_watermarks.html#timestamps-per-kafka-partition
>>>>>>>>
>>>>>>>>
>>>>>>>>
>>>>>>>> --
>>>>>>>> Sent from: http://apache-flink-user-maili
>>>>>>>> ng-list-archive.2336050.n4.nabble.com/
>>>>>>>>
>>>>>>>
>>>>>>>
>>>>>
>>>>
>>>
>>
>
>

Re: Kafka consumer to sync topics by event time?

Posted by Juho Autio <ju...@rovio.com>.
Great. I'd be happy to contribute. I added 2 sub-tasks in
https://issues.apache.org/jira/browse/FLINK-5479.

Someone with the privileges could assign this sub-task to me:
https://issues.apache.org/jira/browse/FLINK-9183?

On Mon, Apr 16, 2018 at 3:14 PM, Fabian Hueske <fh...@gmail.com> wrote:

> Fully agree Juho!
>
> Do you want to contribute the docs fix?
> If yes, we should update FLINK-5479 to make sure that the warning is
> removed once the bug is fixed.
>
> Thanks, Fabian
>
> 2018-04-12 9:32 GMT+02:00 Juho Autio <ju...@rovio.com>:
>
>> Looks like the bug https://issues.apache.org/jira/browse/FLINK-5479 is
>> entirely preventing this feature to be used if there are any idle
>> partitions. It would be nice to mention in documentation that currently
>> this requires all subscribed partitions to have a constant stream of data
>> with growing timestamps. When watermark gets stalled on an idle partition
>> it blocks everything.
>>
>> Link to current documentation:
>> https://ci.apache.org/projects/flink/flink-docs-master/dev/
>> connectors/kafka.html#kafka-consumers-and-timestamp-
>> extractionwatermark-emission
>>
>> On Mon, Dec 4, 2017 at 4:29 PM, Fabian Hueske <fh...@gmail.com> wrote:
>>
>>> You are right, offsets cannot be used for tracking processing progress.
>>> I think setting Kafka offsets with respect to some progress notion other
>>> than "has been consumed" would be highly application specific and hard to
>>> generalize.
>>> As you said, there might be a window (such as a session window) that is
>>> open much longer than all other windows and which would hold back the
>>> offset. Other applications might not use the built-in windows at all but
>>> custom ProcessFunctions.
>>>
>>> Have you considered tracking progress using watermarks?
>>>
>>> 2017-12-04 14:42 GMT+01:00 Juho Autio <ju...@rovio.com>:
>>>
>>>> Thank you Fabian. Really clear explanation. That matches with my
>>>> observation indeed (data is not dropped from either small or big topic, but
>>>> the offsets are advancing in kafka side already before those offsets have
>>>> been triggered from a window operator).
>>>>
>>>> This means that it's a bit harder to meaningfully monitor the job's
>>>> progress solely based on kafka consumer offsets. Is there a reason why
>>>> Flink couldn't instead commit the offsets after they have been triggered
>>>> from downstream windows? I could imagine that this might pose a problem if
>>>> there are any windows that remain open for a very long time, but in general
>>>> it would be useful IMHO. Or Flink could even commit both (read vs.
>>>> triggered) offsets to kafka for monitoring purposes.
>>>>
>>>> On Mon, Dec 4, 2017 at 3:30 PM, Fabian Hueske <fh...@gmail.com>
>>>> wrote:
>>>>
>>>>> Hi Juho,
>>>>>
>>>>> the partitions of both topics are independently consumed, i.e., at
>>>>> their own speed without coordination. With the configuration that Gordon
>>>>> linked, watermarks are generated per partition.
>>>>> Each source task maintains the latest (and highest) watermark per
>>>>> partition and propagates the smallest watermark. The same mechanism is
>>>>> applied for watermarks across tasks (this is what Kien referred to).
>>>>>
>>>>> In the case that you are describing, the partitions of the smaller
>>>>> topic are faster consumed (hence the offsets are faster aligned) but
>>>>> watermarks are emitted "at the speed" of the bigger topic.
>>>>> Therefore, the timestamps of records from the smaller topic can be
>>>>> much ahead of the watermark.
>>>>> In principle, that does not pose a problem. Stateful operators (such
>>>>> as windows) remember the "early" records and process them when they receive
>>>>> a watermark passes the timestamps of the early records.
>>>>>
>>>>> Regarding your question "Are they committed to Kafka before their
>>>>> watermark has passed on Flink's side?":
>>>>> The offsets of the smaller topic might be checkpointed when all
>>>>> partitions have been read to the "end" and the bigger topic is still
>>>>> catching up.
>>>>> The watermarks are moving at the speed of the bigger topic, but all
>>>>> "early" events of the smaller topic are stored in stateful operators and
>>>>> are checkpointed as well.
>>>>>
>>>>> So, you do not lose neither early nor late data.
>>>>>
>>>>> Best, Fabian
>>>>>
>>>>>
>>>>>
>>>>> 2017-12-01 13:43 GMT+01:00 Juho Autio <ju...@rovio.com>:
>>>>>
>>>>>> Thanks for the answers, I still don't understand why I can see the
>>>>>> offsets being quickly committed to Kafka for the "small topic"? Are they
>>>>>> committed to Kafka before their watermark has passed on Flink's side? That
>>>>>> would be quite confusing.. Indeed when Flink handles the state/offsets
>>>>>> internally, the consumer offsets are committed to Kafka just for reference.
>>>>>>
>>>>>> Otherwise, what you're saying sounds very good to me. The
>>>>>> documentation just doesn't explicitly say anything about how it works
>>>>>> across topics.
>>>>>>
>>>>>> On Kien's answer: "When you join multiple stream with different
>>>>>> watermarks", note that I'm not joining any topics myself, I get them as a
>>>>>> single stream from the Flink kafka consumer based on the list of topics
>>>>>> that I asked for.
>>>>>>
>>>>>> Thanks,
>>>>>> Juho
>>>>>>
>>>>>> On Wed, Nov 22, 2017 at 2:57 PM, Tzu-Li (Gordon) Tai <
>>>>>> tzulitai@apache.org> wrote:
>>>>>>
>>>>>>> Hi!
>>>>>>>
>>>>>>> The FlinkKafkaConsumer can handle watermark advancement with
>>>>>>> per-Kafka-partition awareness (across partitions of different
>>>>>>> topics).
>>>>>>> You can see an example of how to do that here [1].
>>>>>>>
>>>>>>> Basically what this does is that it generates watermarks within the
>>>>>>> Kafka
>>>>>>> consumer individually for each Kafka partition, and the per-partition
>>>>>>> watermarks are aggregated and emitted from the consumer in the same
>>>>>>> way that
>>>>>>> watermarks are aggregated on a stream shuffle; only when the low
>>>>>>> watermark
>>>>>>> advances across all partitions, should a watermark be emitted from
>>>>>>> the
>>>>>>> consumer.
>>>>>>>
>>>>>>> Therefore, this helps avoid the problem that you described, in which
>>>>>>> a
>>>>>>> "big_topic" has subscribed partitions that lags behind others. In
>>>>>>> this case
>>>>>>> and when the above feature is used, the event time would advance
>>>>>>> along with
>>>>>>> the lagging "big_topic" partitions and would not result in messages
>>>>>>> being
>>>>>>> recognized as late and discarded.
>>>>>>>
>>>>>>> Cheers,
>>>>>>> Gordon
>>>>>>>
>>>>>>> [1]
>>>>>>> https://ci.apache.org/projects/flink/flink-docs-release-1.3/
>>>>>>> dev/event_timestamps_watermarks.html#timestamps-per-kafka-partition
>>>>>>>
>>>>>>>
>>>>>>>
>>>>>>> --
>>>>>>> Sent from: http://apache-flink-user-maili
>>>>>>> ng-list-archive.2336050.n4.nabble.com/
>>>>>>>
>>>>>>
>>>>>>
>>>>
>>>
>>
>

Re: Kafka consumer to sync topics by event time?

Posted by Fabian Hueske <fh...@gmail.com>.
Fully agree Juho!

Do you want to contribute the docs fix?
If yes, we should update FLINK-5479 to make sure that the warning is
removed once the bug is fixed.

Thanks, Fabian

2018-04-12 9:32 GMT+02:00 Juho Autio <ju...@rovio.com>:

> Looks like the bug https://issues.apache.org/jira/browse/FLINK-5479 is
> entirely preventing this feature to be used if there are any idle
> partitions. It would be nice to mention in documentation that currently
> this requires all subscribed partitions to have a constant stream of data
> with growing timestamps. When watermark gets stalled on an idle partition
> it blocks everything.
>
> Link to current documentation:
> https://ci.apache.org/projects/flink/flink-docs-
> master/dev/connectors/kafka.html#kafka-consumers-and-
> timestamp-extractionwatermark-emission
>
> On Mon, Dec 4, 2017 at 4:29 PM, Fabian Hueske <fh...@gmail.com> wrote:
>
>> You are right, offsets cannot be used for tracking processing progress. I
>> think setting Kafka offsets with respect to some progress notion other than
>> "has been consumed" would be highly application specific and hard to
>> generalize.
>> As you said, there might be a window (such as a session window) that is
>> open much longer than all other windows and which would hold back the
>> offset. Other applications might not use the built-in windows at all but
>> custom ProcessFunctions.
>>
>> Have you considered tracking progress using watermarks?
>>
>> 2017-12-04 14:42 GMT+01:00 Juho Autio <ju...@rovio.com>:
>>
>>> Thank you Fabian. Really clear explanation. That matches with my
>>> observation indeed (data is not dropped from either small or big topic, but
>>> the offsets are advancing in kafka side already before those offsets have
>>> been triggered from a window operator).
>>>
>>> This means that it's a bit harder to meaningfully monitor the job's
>>> progress solely based on kafka consumer offsets. Is there a reason why
>>> Flink couldn't instead commit the offsets after they have been triggered
>>> from downstream windows? I could imagine that this might pose a problem if
>>> there are any windows that remain open for a very long time, but in general
>>> it would be useful IMHO. Or Flink could even commit both (read vs.
>>> triggered) offsets to kafka for monitoring purposes.
>>>
>>> On Mon, Dec 4, 2017 at 3:30 PM, Fabian Hueske <fh...@gmail.com> wrote:
>>>
>>>> Hi Juho,
>>>>
>>>> the partitions of both topics are independently consumed, i.e., at
>>>> their own speed without coordination. With the configuration that Gordon
>>>> linked, watermarks are generated per partition.
>>>> Each source task maintains the latest (and highest) watermark per
>>>> partition and propagates the smallest watermark. The same mechanism is
>>>> applied for watermarks across tasks (this is what Kien referred to).
>>>>
>>>> In the case that you are describing, the partitions of the smaller
>>>> topic are faster consumed (hence the offsets are faster aligned) but
>>>> watermarks are emitted "at the speed" of the bigger topic.
>>>> Therefore, the timestamps of records from the smaller topic can be much
>>>> ahead of the watermark.
>>>> In principle, that does not pose a problem. Stateful operators (such as
>>>> windows) remember the "early" records and process them when they receive a
>>>> watermark passes the timestamps of the early records.
>>>>
>>>> Regarding your question "Are they committed to Kafka before their
>>>> watermark has passed on Flink's side?":
>>>> The offsets of the smaller topic might be checkpointed when all
>>>> partitions have been read to the "end" and the bigger topic is still
>>>> catching up.
>>>> The watermarks are moving at the speed of the bigger topic, but all
>>>> "early" events of the smaller topic are stored in stateful operators and
>>>> are checkpointed as well.
>>>>
>>>> So, you do not lose neither early nor late data.
>>>>
>>>> Best, Fabian
>>>>
>>>>
>>>>
>>>> 2017-12-01 13:43 GMT+01:00 Juho Autio <ju...@rovio.com>:
>>>>
>>>>> Thanks for the answers, I still don't understand why I can see the
>>>>> offsets being quickly committed to Kafka for the "small topic"? Are they
>>>>> committed to Kafka before their watermark has passed on Flink's side? That
>>>>> would be quite confusing.. Indeed when Flink handles the state/offsets
>>>>> internally, the consumer offsets are committed to Kafka just for reference.
>>>>>
>>>>> Otherwise, what you're saying sounds very good to me. The
>>>>> documentation just doesn't explicitly say anything about how it works
>>>>> across topics.
>>>>>
>>>>> On Kien's answer: "When you join multiple stream with different
>>>>> watermarks", note that I'm not joining any topics myself, I get them as a
>>>>> single stream from the Flink kafka consumer based on the list of topics
>>>>> that I asked for.
>>>>>
>>>>> Thanks,
>>>>> Juho
>>>>>
>>>>> On Wed, Nov 22, 2017 at 2:57 PM, Tzu-Li (Gordon) Tai <
>>>>> tzulitai@apache.org> wrote:
>>>>>
>>>>>> Hi!
>>>>>>
>>>>>> The FlinkKafkaConsumer can handle watermark advancement with
>>>>>> per-Kafka-partition awareness (across partitions of different topics).
>>>>>> You can see an example of how to do that here [1].
>>>>>>
>>>>>> Basically what this does is that it generates watermarks within the
>>>>>> Kafka
>>>>>> consumer individually for each Kafka partition, and the per-partition
>>>>>> watermarks are aggregated and emitted from the consumer in the same
>>>>>> way that
>>>>>> watermarks are aggregated on a stream shuffle; only when the low
>>>>>> watermark
>>>>>> advances across all partitions, should a watermark be emitted from the
>>>>>> consumer.
>>>>>>
>>>>>> Therefore, this helps avoid the problem that you described, in which a
>>>>>> "big_topic" has subscribed partitions that lags behind others. In
>>>>>> this case
>>>>>> and when the above feature is used, the event time would advance
>>>>>> along with
>>>>>> the lagging "big_topic" partitions and would not result in messages
>>>>>> being
>>>>>> recognized as late and discarded.
>>>>>>
>>>>>> Cheers,
>>>>>> Gordon
>>>>>>
>>>>>> [1]
>>>>>> https://ci.apache.org/projects/flink/flink-docs-release-1.3/
>>>>>> dev/event_timestamps_watermarks.html#timestamps-per-kafka-partition
>>>>>>
>>>>>>
>>>>>>
>>>>>> --
>>>>>> Sent from: http://apache-flink-user-maili
>>>>>> ng-list-archive.2336050.n4.nabble.com/
>>>>>>
>>>>>
>>>>>
>>>
>>
>

Re: Kafka consumer to sync topics by event time?

Posted by Juho Autio <ju...@rovio.com>.
Looks like the bug https://issues.apache.org/jira/browse/FLINK-5479 is
entirely preventing this feature to be used if there are any idle
partitions. It would be nice to mention in documentation that currently
this requires all subscribed partitions to have a constant stream of data
with growing timestamps. When watermark gets stalled on an idle partition
it blocks everything.

Link to current documentation:
https://ci.apache.org/projects/flink/flink-docs-master/dev/connectors/kafka.html#kafka-consumers-and-timestamp-extractionwatermark-emission

On Mon, Dec 4, 2017 at 4:29 PM, Fabian Hueske <fh...@gmail.com> wrote:

> You are right, offsets cannot be used for tracking processing progress. I
> think setting Kafka offsets with respect to some progress notion other than
> "has been consumed" would be highly application specific and hard to
> generalize.
> As you said, there might be a window (such as a session window) that is
> open much longer than all other windows and which would hold back the
> offset. Other applications might not use the built-in windows at all but
> custom ProcessFunctions.
>
> Have you considered tracking progress using watermarks?
>
> 2017-12-04 14:42 GMT+01:00 Juho Autio <ju...@rovio.com>:
>
>> Thank you Fabian. Really clear explanation. That matches with my
>> observation indeed (data is not dropped from either small or big topic, but
>> the offsets are advancing in kafka side already before those offsets have
>> been triggered from a window operator).
>>
>> This means that it's a bit harder to meaningfully monitor the job's
>> progress solely based on kafka consumer offsets. Is there a reason why
>> Flink couldn't instead commit the offsets after they have been triggered
>> from downstream windows? I could imagine that this might pose a problem if
>> there are any windows that remain open for a very long time, but in general
>> it would be useful IMHO. Or Flink could even commit both (read vs.
>> triggered) offsets to kafka for monitoring purposes.
>>
>> On Mon, Dec 4, 2017 at 3:30 PM, Fabian Hueske <fh...@gmail.com> wrote:
>>
>>> Hi Juho,
>>>
>>> the partitions of both topics are independently consumed, i.e., at their
>>> own speed without coordination. With the configuration that Gordon linked,
>>> watermarks are generated per partition.
>>> Each source task maintains the latest (and highest) watermark per
>>> partition and propagates the smallest watermark. The same mechanism is
>>> applied for watermarks across tasks (this is what Kien referred to).
>>>
>>> In the case that you are describing, the partitions of the smaller topic
>>> are faster consumed (hence the offsets are faster aligned) but watermarks
>>> are emitted "at the speed" of the bigger topic.
>>> Therefore, the timestamps of records from the smaller topic can be much
>>> ahead of the watermark.
>>> In principle, that does not pose a problem. Stateful operators (such as
>>> windows) remember the "early" records and process them when they receive a
>>> watermark passes the timestamps of the early records.
>>>
>>> Regarding your question "Are they committed to Kafka before their
>>> watermark has passed on Flink's side?":
>>> The offsets of the smaller topic might be checkpointed when all
>>> partitions have been read to the "end" and the bigger topic is still
>>> catching up.
>>> The watermarks are moving at the speed of the bigger topic, but all
>>> "early" events of the smaller topic are stored in stateful operators and
>>> are checkpointed as well.
>>>
>>> So, you do not lose neither early nor late data.
>>>
>>> Best, Fabian
>>>
>>>
>>>
>>> 2017-12-01 13:43 GMT+01:00 Juho Autio <ju...@rovio.com>:
>>>
>>>> Thanks for the answers, I still don't understand why I can see the
>>>> offsets being quickly committed to Kafka for the "small topic"? Are they
>>>> committed to Kafka before their watermark has passed on Flink's side? That
>>>> would be quite confusing.. Indeed when Flink handles the state/offsets
>>>> internally, the consumer offsets are committed to Kafka just for reference.
>>>>
>>>> Otherwise, what you're saying sounds very good to me. The documentation
>>>> just doesn't explicitly say anything about how it works across topics.
>>>>
>>>> On Kien's answer: "When you join multiple stream with different
>>>> watermarks", note that I'm not joining any topics myself, I get them as a
>>>> single stream from the Flink kafka consumer based on the list of topics
>>>> that I asked for.
>>>>
>>>> Thanks,
>>>> Juho
>>>>
>>>> On Wed, Nov 22, 2017 at 2:57 PM, Tzu-Li (Gordon) Tai <
>>>> tzulitai@apache.org> wrote:
>>>>
>>>>> Hi!
>>>>>
>>>>> The FlinkKafkaConsumer can handle watermark advancement with
>>>>> per-Kafka-partition awareness (across partitions of different topics).
>>>>> You can see an example of how to do that here [1].
>>>>>
>>>>> Basically what this does is that it generates watermarks within the
>>>>> Kafka
>>>>> consumer individually for each Kafka partition, and the per-partition
>>>>> watermarks are aggregated and emitted from the consumer in the same
>>>>> way that
>>>>> watermarks are aggregated on a stream shuffle; only when the low
>>>>> watermark
>>>>> advances across all partitions, should a watermark be emitted from the
>>>>> consumer.
>>>>>
>>>>> Therefore, this helps avoid the problem that you described, in which a
>>>>> "big_topic" has subscribed partitions that lags behind others. In this
>>>>> case
>>>>> and when the above feature is used, the event time would advance along
>>>>> with
>>>>> the lagging "big_topic" partitions and would not result in messages
>>>>> being
>>>>> recognized as late and discarded.
>>>>>
>>>>> Cheers,
>>>>> Gordon
>>>>>
>>>>> [1]
>>>>> https://ci.apache.org/projects/flink/flink-docs-release-1.3/
>>>>> dev/event_timestamps_watermarks.html#timestamps-per-kafka-partition
>>>>>
>>>>>
>>>>>
>>>>> --
>>>>> Sent from: http://apache-flink-user-maili
>>>>> ng-list-archive.2336050.n4.nabble.com/
>>>>>
>>>>
>>>>
>>
>

Re: Kafka consumer to sync topics by event time?

Posted by Fabian Hueske <fh...@gmail.com>.
You are right, offsets cannot be used for tracking processing progress. I
think setting Kafka offsets with respect to some progress notion other than
"has been consumed" would be highly application specific and hard to
generalize.
As you said, there might be a window (such as a session window) that is
open much longer than all other windows and which would hold back the
offset. Other applications might not use the built-in windows at all but
custom ProcessFunctions.

Have you considered tracking progress using watermarks?

2017-12-04 14:42 GMT+01:00 Juho Autio <ju...@rovio.com>:

> Thank you Fabian. Really clear explanation. That matches with my
> observation indeed (data is not dropped from either small or big topic, but
> the offsets are advancing in kafka side already before those offsets have
> been triggered from a window operator).
>
> This means that it's a bit harder to meaningfully monitor the job's
> progress solely based on kafka consumer offsets. Is there a reason why
> Flink couldn't instead commit the offsets after they have been triggered
> from downstream windows? I could imagine that this might pose a problem if
> there are any windows that remain open for a very long time, but in general
> it would be useful IMHO. Or Flink could even commit both (read vs.
> triggered) offsets to kafka for monitoring purposes.
>
> On Mon, Dec 4, 2017 at 3:30 PM, Fabian Hueske <fh...@gmail.com> wrote:
>
>> Hi Juho,
>>
>> the partitions of both topics are independently consumed, i.e., at their
>> own speed without coordination. With the configuration that Gordon linked,
>> watermarks are generated per partition.
>> Each source task maintains the latest (and highest) watermark per
>> partition and propagates the smallest watermark. The same mechanism is
>> applied for watermarks across tasks (this is what Kien referred to).
>>
>> In the case that you are describing, the partitions of the smaller topic
>> are faster consumed (hence the offsets are faster aligned) but watermarks
>> are emitted "at the speed" of the bigger topic.
>> Therefore, the timestamps of records from the smaller topic can be much
>> ahead of the watermark.
>> In principle, that does not pose a problem. Stateful operators (such as
>> windows) remember the "early" records and process them when they receive a
>> watermark passes the timestamps of the early records.
>>
>> Regarding your question "Are they committed to Kafka before their
>> watermark has passed on Flink's side?":
>> The offsets of the smaller topic might be checkpointed when all
>> partitions have been read to the "end" and the bigger topic is still
>> catching up.
>> The watermarks are moving at the speed of the bigger topic, but all
>> "early" events of the smaller topic are stored in stateful operators and
>> are checkpointed as well.
>>
>> So, you do not lose neither early nor late data.
>>
>> Best, Fabian
>>
>>
>>
>> 2017-12-01 13:43 GMT+01:00 Juho Autio <ju...@rovio.com>:
>>
>>> Thanks for the answers, I still don't understand why I can see the
>>> offsets being quickly committed to Kafka for the "small topic"? Are they
>>> committed to Kafka before their watermark has passed on Flink's side? That
>>> would be quite confusing.. Indeed when Flink handles the state/offsets
>>> internally, the consumer offsets are committed to Kafka just for reference.
>>>
>>> Otherwise, what you're saying sounds very good to me. The documentation
>>> just doesn't explicitly say anything about how it works across topics.
>>>
>>> On Kien's answer: "When you join multiple stream with different
>>> watermarks", note that I'm not joining any topics myself, I get them as a
>>> single stream from the Flink kafka consumer based on the list of topics
>>> that I asked for.
>>>
>>> Thanks,
>>> Juho
>>>
>>> On Wed, Nov 22, 2017 at 2:57 PM, Tzu-Li (Gordon) Tai <
>>> tzulitai@apache.org> wrote:
>>>
>>>> Hi!
>>>>
>>>> The FlinkKafkaConsumer can handle watermark advancement with
>>>> per-Kafka-partition awareness (across partitions of different topics).
>>>> You can see an example of how to do that here [1].
>>>>
>>>> Basically what this does is that it generates watermarks within the
>>>> Kafka
>>>> consumer individually for each Kafka partition, and the per-partition
>>>> watermarks are aggregated and emitted from the consumer in the same way
>>>> that
>>>> watermarks are aggregated on a stream shuffle; only when the low
>>>> watermark
>>>> advances across all partitions, should a watermark be emitted from the
>>>> consumer.
>>>>
>>>> Therefore, this helps avoid the problem that you described, in which a
>>>> "big_topic" has subscribed partitions that lags behind others. In this
>>>> case
>>>> and when the above feature is used, the event time would advance along
>>>> with
>>>> the lagging "big_topic" partitions and would not result in messages
>>>> being
>>>> recognized as late and discarded.
>>>>
>>>> Cheers,
>>>> Gordon
>>>>
>>>> [1]
>>>> https://ci.apache.org/projects/flink/flink-docs-release-1.3/
>>>> dev/event_timestamps_watermarks.html#timestamps-per-kafka-partition
>>>>
>>>>
>>>>
>>>> --
>>>> Sent from: http://apache-flink-user-mailing-list-archive.2336050.n4.nab
>>>> ble.com/
>>>>
>>>
>>>
>

Re: Kafka consumer to sync topics by event time?

Posted by Juho Autio <ju...@rovio.com>.
Thank you Fabian. Really clear explanation. That matches with my
observation indeed (data is not dropped from either small or big topic, but
the offsets are advancing in kafka side already before those offsets have
been triggered from a window operator).

This means that it's a bit harder to meaningfully monitor the job's
progress solely based on kafka consumer offsets. Is there a reason why
Flink couldn't instead commit the offsets after they have been triggered
from downstream windows? I could imagine that this might pose a problem if
there are any windows that remain open for a very long time, but in general
it would be useful IMHO. Or Flink could even commit both (read vs.
triggered) offsets to kafka for monitoring purposes.

On Mon, Dec 4, 2017 at 3:30 PM, Fabian Hueske <fh...@gmail.com> wrote:

> Hi Juho,
>
> the partitions of both topics are independently consumed, i.e., at their
> own speed without coordination. With the configuration that Gordon linked,
> watermarks are generated per partition.
> Each source task maintains the latest (and highest) watermark per
> partition and propagates the smallest watermark. The same mechanism is
> applied for watermarks across tasks (this is what Kien referred to).
>
> In the case that you are describing, the partitions of the smaller topic
> are faster consumed (hence the offsets are faster aligned) but watermarks
> are emitted "at the speed" of the bigger topic.
> Therefore, the timestamps of records from the smaller topic can be much
> ahead of the watermark.
> In principle, that does not pose a problem. Stateful operators (such as
> windows) remember the "early" records and process them when they receive a
> watermark passes the timestamps of the early records.
>
> Regarding your question "Are they committed to Kafka before their
> watermark has passed on Flink's side?":
> The offsets of the smaller topic might be checkpointed when all partitions
> have been read to the "end" and the bigger topic is still catching up.
> The watermarks are moving at the speed of the bigger topic, but all
> "early" events of the smaller topic are stored in stateful operators and
> are checkpointed as well.
>
> So, you do not lose neither early nor late data.
>
> Best, Fabian
>
>
>
> 2017-12-01 13:43 GMT+01:00 Juho Autio <ju...@rovio.com>:
>
>> Thanks for the answers, I still don't understand why I can see the
>> offsets being quickly committed to Kafka for the "small topic"? Are they
>> committed to Kafka before their watermark has passed on Flink's side? That
>> would be quite confusing.. Indeed when Flink handles the state/offsets
>> internally, the consumer offsets are committed to Kafka just for reference.
>>
>> Otherwise, what you're saying sounds very good to me. The documentation
>> just doesn't explicitly say anything about how it works across topics.
>>
>> On Kien's answer: "When you join multiple stream with different
>> watermarks", note that I'm not joining any topics myself, I get them as a
>> single stream from the Flink kafka consumer based on the list of topics
>> that I asked for.
>>
>> Thanks,
>> Juho
>>
>> On Wed, Nov 22, 2017 at 2:57 PM, Tzu-Li (Gordon) Tai <tzulitai@apache.org
>> > wrote:
>>
>>> Hi!
>>>
>>> The FlinkKafkaConsumer can handle watermark advancement with
>>> per-Kafka-partition awareness (across partitions of different topics).
>>> You can see an example of how to do that here [1].
>>>
>>> Basically what this does is that it generates watermarks within the Kafka
>>> consumer individually for each Kafka partition, and the per-partition
>>> watermarks are aggregated and emitted from the consumer in the same way
>>> that
>>> watermarks are aggregated on a stream shuffle; only when the low
>>> watermark
>>> advances across all partitions, should a watermark be emitted from the
>>> consumer.
>>>
>>> Therefore, this helps avoid the problem that you described, in which a
>>> "big_topic" has subscribed partitions that lags behind others. In this
>>> case
>>> and when the above feature is used, the event time would advance along
>>> with
>>> the lagging "big_topic" partitions and would not result in messages being
>>> recognized as late and discarded.
>>>
>>> Cheers,
>>> Gordon
>>>
>>> [1]
>>> https://ci.apache.org/projects/flink/flink-docs-release-1.3/
>>> dev/event_timestamps_watermarks.html#timestamps-per-kafka-partition
>>>
>>>
>>>
>>> --
>>> Sent from: http://apache-flink-user-mailing-list-archive.2336050.n4.nab
>>> ble.com/
>>>
>>
>>

Re: Kafka consumer to sync topics by event time?

Posted by Fabian Hueske <fh...@gmail.com>.
Hi Juho,

the partitions of both topics are independently consumed, i.e., at their
own speed without coordination. With the configuration that Gordon linked,
watermarks are generated per partition.
Each source task maintains the latest (and highest) watermark per partition
and propagates the smallest watermark. The same mechanism is applied for
watermarks across tasks (this is what Kien referred to).

In the case that you are describing, the partitions of the smaller topic
are faster consumed (hence the offsets are faster aligned) but watermarks
are emitted "at the speed" of the bigger topic.
Therefore, the timestamps of records from the smaller topic can be much
ahead of the watermark.
In principle, that does not pose a problem. Stateful operators (such as
windows) remember the "early" records and process them when they receive a
watermark passes the timestamps of the early records.

Regarding your question "Are they committed to Kafka before their watermark
has passed on Flink's side?":
The offsets of the smaller topic might be checkpointed when all partitions
have been read to the "end" and the bigger topic is still catching up.
The watermarks are moving at the speed of the bigger topic, but all "early"
events of the smaller topic are stored in stateful operators and are
checkpointed as well.

So, you do not lose neither early nor late data.

Best, Fabian



2017-12-01 13:43 GMT+01:00 Juho Autio <ju...@rovio.com>:

> Thanks for the answers, I still don't understand why I can see the offsets
> being quickly committed to Kafka for the "small topic"? Are they committed
> to Kafka before their watermark has passed on Flink's side? That would be
> quite confusing.. Indeed when Flink handles the state/offsets internally,
> the consumer offsets are committed to Kafka just for reference.
>
> Otherwise, what you're saying sounds very good to me. The documentation
> just doesn't explicitly say anything about how it works across topics.
>
> On Kien's answer: "When you join multiple stream with different
> watermarks", note that I'm not joining any topics myself, I get them as a
> single stream from the Flink kafka consumer based on the list of topics
> that I asked for.
>
> Thanks,
> Juho
>
> On Wed, Nov 22, 2017 at 2:57 PM, Tzu-Li (Gordon) Tai <tz...@apache.org>
> wrote:
>
>> Hi!
>>
>> The FlinkKafkaConsumer can handle watermark advancement with
>> per-Kafka-partition awareness (across partitions of different topics).
>> You can see an example of how to do that here [1].
>>
>> Basically what this does is that it generates watermarks within the Kafka
>> consumer individually for each Kafka partition, and the per-partition
>> watermarks are aggregated and emitted from the consumer in the same way
>> that
>> watermarks are aggregated on a stream shuffle; only when the low watermark
>> advances across all partitions, should a watermark be emitted from the
>> consumer.
>>
>> Therefore, this helps avoid the problem that you described, in which a
>> "big_topic" has subscribed partitions that lags behind others. In this
>> case
>> and when the above feature is used, the event time would advance along
>> with
>> the lagging "big_topic" partitions and would not result in messages being
>> recognized as late and discarded.
>>
>> Cheers,
>> Gordon
>>
>> [1]
>> https://ci.apache.org/projects/flink/flink-docs-release-1.3/
>> dev/event_timestamps_watermarks.html#timestamps-per-kafka-partition
>>
>>
>>
>> --
>> Sent from: http://apache-flink-user-mailing-list-archive.2336050.n4.
>> nabble.com/
>>
>
>