You are viewing a plain text version of this content. The canonical link for it is here.
Posted to user@flink.apache.org by Dan Hill <qu...@gmail.com> on 2021/03/03 07:15:40 UTC

Flink, local development, finish processing a stream of Kafka data

Hi.

For local and tests development, I want to flush the events in my system to
make sure I'm processing everything.  My watermark does not progress to
finish all of the data.

What's the best practice for local development or tests?

If I use idle sources for 1 Kafka partition, this appears broken.  I'm
guessing there is logic to prevent removing an idle partition if it's the
only partition.  Is there a version of this I can enable for local
development that supports 1 partition?

I see this tech talk.  Are there other talks to watch?
https://www.youtube.com/watch?v=bQmz7JOmE_4&feature=youtu.be

Do I need to write my own watermark generator?  Or change my test data to
have a way of generating watermarks?

I've tried a few variants of the following source code.  The watermark
doesn't progress in the operator right after creating the source.

SingleOutputStreamOperator<T> viewInput = env.addSource(...)
        .uid("source-view")
        .assignTimestampsAndWatermarks(

WatermarkStrategy.<T>forBoundedOutOfOrderness(Duration.ofSeconds(1)).withIdleness(Duration.ofMinutes(1)));

Re: Does WatermarkStrategy.withIdleness work?

Posted by Dan Hill <qu...@gmail.com>.
JFYI in case other users find this in the future.

ProcessingTimeTrailingBoundedOutOfOrdernessTimestampExtractor has a small
issue if modified to be used with the new watermark API and if the events
can have the same timestamp.  I changed my code to do this onPeriodicEmit.
In this situation, we have a lot of events with the same timestamp.  If the
code is still processing events for the same timestamp, periodic emit will
think we ran out of events (even though we've processed a bunch of events)
and then return a bad watermark.  We modified our copy of this code to keep
track of how many events have been emitted.  Since we're just using this
for local development, it's fine.


On Fri, Mar 12, 2021 at 1:55 AM Dan Hill <qu...@gmail.com> wrote:

> Thanks David!
>
> On Fri, Mar 12, 2021, 01:54 David Anderson <da...@apache.org> wrote:
>
>> WatermarkStrategy.withIdleness works by marking idle streams as idle, so
>> that downstream operators will ignore those streams and allow the
>> watermarks to progress based only on the advancement of the watermarks of
>> the still active streams. As you suspected, this mechanism does not provide
>> for the watermark to be advanced in situations where all of the streams are
>> idle.
>>
>> If your goal is ensure that all of the events are processed and all
>> event-time timers are fired (and all event-time windows are closed) before
>> a job ends, Flink already includes a mechanism for this purpose. If you are
>> using a bounded source, then when that source reaches the end of its input,
>> a final Watermark of value Watermark.MAX_WATERMARK will be automatically
>> emitted. The --drain option, as in
>>
>> ./bin/flink stop --drain <job-id>
>>
>> also has this effect [1].
>>
>> With a Kafka source, you can arrange for this to happen by having your
>> kafka deserializer return true from its isEndOfStream() method. Or you
>> could use the new KafkaSource connector included in Flink 1.12 with
>> its setBounded option.
>>
>> On the other hand, if you really did need to advance the watermark
>> despite a (possibly temporary) total lack of events, you could implement a
>> watermark strategy that artificially advances the watermark based on the
>> passage of processing time. You'll find an example in [2], though it hasn't
>> been updated to use the new watermark strategy interface.
>>
>> Regards,
>> David
>>
>> [1]
>> https://ci.apache.org/projects/flink/flink-docs-stable/deployment/cli.html#stopping-a-job-gracefully-creating-a-final-savepoint
>> [2]
>> https://github.com/aljoscha/flink/blob/6e4419e550caa0e5b162bc0d2ccc43f6b0b3860f/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/timestamps/ProcessingTimeTrailingBoundedOutOfOrdernessTimestampExtractor.java
>>
>> On Fri, Mar 12, 2021 at 9:47 AM Dan Hill <qu...@gmail.com> wrote:
>>
>>> I haven't been able to get WatermarkStrategy.withIdleness to work.  Is
>>> it broken?  None of my timers trigger when I'd expect idleness to take over.
>>>
>>> On Tue, Mar 2, 2021 at 11:15 PM Dan Hill <qu...@gmail.com> wrote:
>>>
>>>> Hi.
>>>>
>>>> For local and tests development, I want to flush the events in my
>>>> system to make sure I'm processing everything.  My watermark does not
>>>> progress to finish all of the data.
>>>>
>>>> What's the best practice for local development or tests?
>>>>
>>>> If I use idle sources for 1 Kafka partition, this appears broken.  I'm
>>>> guessing there is logic to prevent removing an idle partition if it's the
>>>> only partition.  Is there a version of this I can enable for local
>>>> development that supports 1 partition?
>>>>
>>>> I see this tech talk.  Are there other talks to watch?
>>>> https://www.youtube.com/watch?v=bQmz7JOmE_4&feature=youtu.be
>>>>
>>>> Do I need to write my own watermark generator?  Or change my test data
>>>> to have a way of generating watermarks?
>>>>
>>>> I've tried a few variants of the following source code.  The watermark
>>>> doesn't progress in the operator right after creating the source.
>>>>
>>>> SingleOutputStreamOperator<T> viewInput = env.addSource(...)
>>>>         .uid("source-view")
>>>>         .assignTimestampsAndWatermarks(
>>>>
>>>> WatermarkStrategy.<T>forBoundedOutOfOrderness(Duration.ofSeconds(1)).withIdleness(Duration.ofMinutes(1)));
>>>>
>>>

Re: Does WatermarkStrategy.withIdleness work?

Posted by Dan Hill <qu...@gmail.com>.
Thanks David!

On Fri, Mar 12, 2021, 01:54 David Anderson <da...@apache.org> wrote:

> WatermarkStrategy.withIdleness works by marking idle streams as idle, so
> that downstream operators will ignore those streams and allow the
> watermarks to progress based only on the advancement of the watermarks of
> the still active streams. As you suspected, this mechanism does not provide
> for the watermark to be advanced in situations where all of the streams are
> idle.
>
> If your goal is ensure that all of the events are processed and all
> event-time timers are fired (and all event-time windows are closed) before
> a job ends, Flink already includes a mechanism for this purpose. If you are
> using a bounded source, then when that source reaches the end of its input,
> a final Watermark of value Watermark.MAX_WATERMARK will be automatically
> emitted. The --drain option, as in
>
> ./bin/flink stop --drain <job-id>
>
> also has this effect [1].
>
> With a Kafka source, you can arrange for this to happen by having your
> kafka deserializer return true from its isEndOfStream() method. Or you
> could use the new KafkaSource connector included in Flink 1.12 with
> its setBounded option.
>
> On the other hand, if you really did need to advance the watermark despite
> a (possibly temporary) total lack of events, you could implement a
> watermark strategy that artificially advances the watermark based on the
> passage of processing time. You'll find an example in [2], though it hasn't
> been updated to use the new watermark strategy interface.
>
> Regards,
> David
>
> [1]
> https://ci.apache.org/projects/flink/flink-docs-stable/deployment/cli.html#stopping-a-job-gracefully-creating-a-final-savepoint
> [2]
> https://github.com/aljoscha/flink/blob/6e4419e550caa0e5b162bc0d2ccc43f6b0b3860f/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/timestamps/ProcessingTimeTrailingBoundedOutOfOrdernessTimestampExtractor.java
>
> On Fri, Mar 12, 2021 at 9:47 AM Dan Hill <qu...@gmail.com> wrote:
>
>> I haven't been able to get WatermarkStrategy.withIdleness to work.  Is it
>> broken?  None of my timers trigger when I'd expect idleness to take over.
>>
>> On Tue, Mar 2, 2021 at 11:15 PM Dan Hill <qu...@gmail.com> wrote:
>>
>>> Hi.
>>>
>>> For local and tests development, I want to flush the events in my system
>>> to make sure I'm processing everything.  My watermark does not progress to
>>> finish all of the data.
>>>
>>> What's the best practice for local development or tests?
>>>
>>> If I use idle sources for 1 Kafka partition, this appears broken.  I'm
>>> guessing there is logic to prevent removing an idle partition if it's the
>>> only partition.  Is there a version of this I can enable for local
>>> development that supports 1 partition?
>>>
>>> I see this tech talk.  Are there other talks to watch?
>>> https://www.youtube.com/watch?v=bQmz7JOmE_4&feature=youtu.be
>>>
>>> Do I need to write my own watermark generator?  Or change my test data
>>> to have a way of generating watermarks?
>>>
>>> I've tried a few variants of the following source code.  The watermark
>>> doesn't progress in the operator right after creating the source.
>>>
>>> SingleOutputStreamOperator<T> viewInput = env.addSource(...)
>>>         .uid("source-view")
>>>         .assignTimestampsAndWatermarks(
>>>
>>> WatermarkStrategy.<T>forBoundedOutOfOrderness(Duration.ofSeconds(1)).withIdleness(Duration.ofMinutes(1)));
>>>
>>

Re: Does WatermarkStrategy.withIdleness work?

Posted by David Anderson <da...@apache.org>.
WatermarkStrategy.withIdleness works by marking idle streams as idle, so
that downstream operators will ignore those streams and allow the
watermarks to progress based only on the advancement of the watermarks of
the still active streams. As you suspected, this mechanism does not provide
for the watermark to be advanced in situations where all of the streams are
idle.

If your goal is ensure that all of the events are processed and all
event-time timers are fired (and all event-time windows are closed) before
a job ends, Flink already includes a mechanism for this purpose. If you are
using a bounded source, then when that source reaches the end of its input,
a final Watermark of value Watermark.MAX_WATERMARK will be automatically
emitted. The --drain option, as in

./bin/flink stop --drain <job-id>

also has this effect [1].

With a Kafka source, you can arrange for this to happen by having your
kafka deserializer return true from its isEndOfStream() method. Or you
could use the new KafkaSource connector included in Flink 1.12 with
its setBounded option.

On the other hand, if you really did need to advance the watermark despite
a (possibly temporary) total lack of events, you could implement a
watermark strategy that artificially advances the watermark based on the
passage of processing time. You'll find an example in [2], though it hasn't
been updated to use the new watermark strategy interface.

Regards,
David

[1]
https://ci.apache.org/projects/flink/flink-docs-stable/deployment/cli.html#stopping-a-job-gracefully-creating-a-final-savepoint
[2]
https://github.com/aljoscha/flink/blob/6e4419e550caa0e5b162bc0d2ccc43f6b0b3860f/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/timestamps/ProcessingTimeTrailingBoundedOutOfOrdernessTimestampExtractor.java

On Fri, Mar 12, 2021 at 9:47 AM Dan Hill <qu...@gmail.com> wrote:

> I haven't been able to get WatermarkStrategy.withIdleness to work.  Is it
> broken?  None of my timers trigger when I'd expect idleness to take over.
>
> On Tue, Mar 2, 2021 at 11:15 PM Dan Hill <qu...@gmail.com> wrote:
>
>> Hi.
>>
>> For local and tests development, I want to flush the events in my system
>> to make sure I'm processing everything.  My watermark does not progress to
>> finish all of the data.
>>
>> What's the best practice for local development or tests?
>>
>> If I use idle sources for 1 Kafka partition, this appears broken.  I'm
>> guessing there is logic to prevent removing an idle partition if it's the
>> only partition.  Is there a version of this I can enable for local
>> development that supports 1 partition?
>>
>> I see this tech talk.  Are there other talks to watch?
>> https://www.youtube.com/watch?v=bQmz7JOmE_4&feature=youtu.be
>>
>> Do I need to write my own watermark generator?  Or change my test data to
>> have a way of generating watermarks?
>>
>> I've tried a few variants of the following source code.  The watermark
>> doesn't progress in the operator right after creating the source.
>>
>> SingleOutputStreamOperator<T> viewInput = env.addSource(...)
>>         .uid("source-view")
>>         .assignTimestampsAndWatermarks(
>>
>> WatermarkStrategy.<T>forBoundedOutOfOrderness(Duration.ofSeconds(1)).withIdleness(Duration.ofMinutes(1)));
>>
>

Does WatermarkStrategy.withIdleness work?

Posted by Dan Hill <qu...@gmail.com>.
I haven't been able to get WatermarkStrategy.withIdleness to work.  Is it
broken?  None of my timers trigger when I'd expect idleness to take over.

On Tue, Mar 2, 2021 at 11:15 PM Dan Hill <qu...@gmail.com> wrote:

> Hi.
>
> For local and tests development, I want to flush the events in my system
> to make sure I'm processing everything.  My watermark does not progress to
> finish all of the data.
>
> What's the best practice for local development or tests?
>
> If I use idle sources for 1 Kafka partition, this appears broken.  I'm
> guessing there is logic to prevent removing an idle partition if it's the
> only partition.  Is there a version of this I can enable for local
> development that supports 1 partition?
>
> I see this tech talk.  Are there other talks to watch?
> https://www.youtube.com/watch?v=bQmz7JOmE_4&feature=youtu.be
>
> Do I need to write my own watermark generator?  Or change my test data to
> have a way of generating watermarks?
>
> I've tried a few variants of the following source code.  The watermark
> doesn't progress in the operator right after creating the source.
>
> SingleOutputStreamOperator<T> viewInput = env.addSource(...)
>         .uid("source-view")
>         .assignTimestampsAndWatermarks(
>
> WatermarkStrategy.<T>forBoundedOutOfOrderness(Duration.ofSeconds(1)).withIdleness(Duration.ofMinutes(1)));
>