You are viewing a plain text version of this content. The canonical link for it is here.
Posted to user@flink.apache.org by Tony Wei <to...@gmail.com> on 2017/09/05 06:50:41 UTC

How to flush all window states after Kafka (0.10.x) topic was removed

Hi,

I have a simple streaming job consuming data from Kafka and use time window
to aggregate them.
I am wondering if there is a built-in function to send a max watermark when
consumer find this topic is not available, so that the window function can
flush all state to the sink function.

My Kafka version is 0.10.x. Currently, the workaround to me is using
`TimestampAssigner` to check a specific record as termination message, and
make the watermark be Long.MAX_VALUE.
I will send this message to all partitions before I remove that topic.

I would appreciate if anyone has some suggestions. Thank you.

Best Regards,
Tony Wei

Re: How to flush all window states after Kafka (0.10.x) topic was removed

Posted by "Tzu-Li (Gordon) Tai" <tz...@apache.org>.
Hi Tony,

Currently, the functionality that you described does not exist in the
consumer. When a topic is deleted, as far as I know, the consumer would
simply consider the partitions as unreachable and continue to try fetching
records from them until they are up again.
I'm not entirely sure if a removed topic is distinguishable from a
temporarily out-of-service partition due to Kafka brokers being down in the
Kafka API, may need to take a look.

As for the "workaround" that you are using at the moment, you can actually
use `KeyedDeserializationSchema#isEndOfStream` for that. When that returns
true and the source subtask closes, the Long.MAX_VALUE watermark will be
emitted.

Cheers,
Gordon

On Tue, Sep 5, 2017 at 2:50 PM, Tony Wei <to...@gmail.com> wrote:

> Hi,
>
> I have a simple streaming job consuming data from Kafka and use time
> window to aggregate them.
> I am wondering if there is a built-in function to send a max watermark
> when consumer find this topic is not available, so that the window function
> can flush all state to the sink function.
>
> My Kafka version is 0.10.x. Currently, the workaround to me is using
> `TimestampAssigner` to check a specific record as termination message, and
> make the watermark be Long.MAX_VALUE.
> I will send this message to all partitions before I remove that topic.
>
> I would appreciate if anyone has some suggestions. Thank you.
>
> Best Regards,
> Tony Wei
>