You are viewing a plain text version of this content. The canonical link for it is here.
Posted to user@flink.apache.org by "victor.reut" <vi...@itrexgroup.com> on 2017/09/11 07:27:26 UTC
Update timeWindow size and trigger value at runtime
Hi,
I want to have an opportunity to update timeWindow size and trigger value in
KeyedStream dynamically at runtime. For example, I have such a piece of
code:
DataStream<SampleObject> stream = env.addSource(new
FlinkKafkaConsumer09<>(TOPIC, new JSONDeserializer(), properties));
Integer numMinutes = ...
Integer triggersCount = ...
stream.keyBy("key")
.timeWindow(Time.minutes(numMinutes))
.trigger(CountTrigger.of(triggersCount))
.reduce(new MetricsReduceFunction() , new
MetricsTimeWindowReduceFunction()).print();
If I just change the values of variables numMinutes and triggersCount, Flink
does not update them. Also I haven't found a good solution in google.
Does anybody know how to solve this issue?
--
Sent from: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/
Re: Update timeWindow size and trigger value at runtime
Posted by Chesnay Schepler <ch...@apache.org>.
You cannot change the size/trigger count while a job is running.
For this to work you will have to take a savepoint, modify the
parameters and reload from the savepoint.
On 11.09.2017 09:27, victor.reut wrote:
> Hi,
>
> I want to have an opportunity to update timeWindow size and trigger value in
> KeyedStream dynamically at runtime. For example, I have such a piece of
> code:
>
> DataStream<SampleObject> stream = env.addSource(new
> FlinkKafkaConsumer09<>(TOPIC, new JSONDeserializer(), properties));
>
> Integer numMinutes = ...
> Integer triggersCount = ...
>
> stream.keyBy("key")
> .timeWindow(Time.minutes(numMinutes))
> .trigger(CountTrigger.of(triggersCount))
> .reduce(new MetricsReduceFunction() , new
> MetricsTimeWindowReduceFunction()).print();
>
> If I just change the values of variables numMinutes and triggersCount, Flink
> does not update them. Also I haven't found a good solution in google.
> Does anybody know how to solve this issue?
>
>
>
>
> --
> Sent from: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/
>