You are viewing a plain text version of this content. The canonical link for it is here.
Posted to user@flink.apache.org by Juan Rodríguez Hortalá <ju...@gmail.com> on 2016/11/21 06:57:40 UTC

IterativeStream seems to ignore maxWaitTimeMillis

Hi,

I wrote a proof of concept for a Java version of mapWithState with
time-based state eviction
https://github.com/juanrh/flink-state-eviction/blob/a6bb0d4ca0908d2f4350209a4a41e381e99c76c5/src/main/java/com/github/juanrh/streaming/MapWithStateIterPoC.java.
The idea is:

 - Convert an input KeyedStream with key K and value V into a KeyedStream
of Either<V, K>, with the original values as Left.
 - Replace a ValueState<S> by a ValueState for a POJO that besides S it
stores the timestamp of the last time that state was accessed.
 - Define a IterativeStream from the Either stream, and apply a
transformation function that periorically sends "tombstone" events as Right
events in the closeWith of the IterativeStream. When a tombstone is
received, delete the state with clear if it the time since it was last
accessed is bigger than a configured time to live.

This seems to work so far, but there are some things that look weird to me:

 - The program never seems to stop, event though I Ihave defined the
IterativeStream with
https://ci.apache.org/projects/flink/flink-docs-master/api/java/org/apache/flink/streaming/api/datastream/DataStream.html#iterate-long-
. The value of seems to be ignored. I'm using a custom source function, but
it seems like the method SourceFunction.cancel() it's not being called.

 - I'm getting several messages "WARN MetricGroup: Name collision: Group
already contains a Metric with the name 'numRecordsOut'. Metric will not be
reported. (null)". What does that mean?

Thanks,

Juan

Re: IterativeStream seems to ignore maxWaitTimeMillis

Posted by Juan Rodríguez Hortalá <ju...@gmail.com>.
Thanks a lot for your suggestion Aljoscha, it has helped me discovered the
problem: I was using an Executor inside a RichFunction and I wasn't
shutting down the executor. Now I call executor.shutdownNow() in
RichFunction .close(), and the job stops when both the input and the loop
are exhausted.

Greetings,

Juan



On Wed, Nov 23, 2016 at 2:19 AM, Aljoscha Krettek <al...@apache.org>
wrote:

> Ah, cancel() won't be called on the source if it is already stopped, I
> think. Could you try boiling it down to the very basics, i.e. have just the
> source and an iteration and check what happens.
>
> On Wed, 23 Nov 2016 at 05:08 Juan Rodríguez Hortalá <
> juan.rodriguez.hortala@gmail.com> wrote:
>
>> Thanks for your answer Aljoscha,
>>
>> The source stops, when I comment all the transformed streams and just
>> print the input, the program completes. But this is custom SourceFunction,
>> could this be related to this? Maybe I should implement emitWatermark? I'm
>> using ingestion time so I assumed this wasn't needed.
>>
>> Greetings,
>>
>> Juan
>>
>> On Mon, Nov 21, 2016 at 9:17 AM, Aljoscha Krettek <al...@apache.org>
>> wrote:
>>
>> Might it be that your initial source never stops? A loop will only
>> terminate if both the original source stops and the loop timeout is reached.
>>
>> On Mon, 21 Nov 2016 at 07:58 Juan Rodríguez Hortalá <
>> juan.rodriguez.hortala@gmail.com> wrote:
>>
>> Hi,
>>
>> I wrote a proof of concept for a Java version of mapWithState with
>> time-based state eviction https://github.com/juanrh/
>> flink-state-eviction/blob/a6bb0d4ca0908d2f4350209a4a41e3
>> 81e99c76c5/src/main/java/com/github/juanrh/streaming/
>> MapWithStateIterPoC.java. The idea is:
>>
>>  - Convert an input KeyedStream with key K and value V into a KeyedStream
>> of Either<V, K>, with the original values as Left.
>>  - Replace a ValueState<S> by a ValueState for a POJO that besides S it
>> stores the timestamp of the last time that state was accessed.
>>  - Define a IterativeStream from the Either stream, and apply a
>> transformation function that periorically sends "tombstone" events as Right
>> events in the closeWith of the IterativeStream. When a tombstone is
>> received, delete the state with clear if it the time since it was last
>> accessed is bigger than a configured time to live.
>>
>> This seems to work so far, but there are some things that look weird to
>> me:
>>
>>  - The program never seems to stop, event though I Ihave defined the
>> IterativeStream with https://ci.apache.org/projects/flink/flink-docs-
>> master/api/java/org/apache/flink/streaming/api/
>> datastream/DataStream.html#iterate-long- . The value of seems to be
>> ignored. I'm using a custom source function, but it seems like the method
>> SourceFunction.cancel() it's not being called.
>>
>>  - I'm getting several messages "WARN MetricGroup: Name collision: Group
>> already contains a Metric with the name 'numRecordsOut'. Metric will not be
>> reported. (null)". What does that mean?
>>
>> Thanks,
>>
>> Juan
>>
>>
>>

Re: IterativeStream seems to ignore maxWaitTimeMillis

Posted by Aljoscha Krettek <al...@apache.org>.
Ah, cancel() won't be called on the source if it is already stopped, I
think. Could you try boiling it down to the very basics, i.e. have just the
source and an iteration and check what happens.

On Wed, 23 Nov 2016 at 05:08 Juan Rodríguez Hortalá <
juan.rodriguez.hortala@gmail.com> wrote:

> Thanks for your answer Aljoscha,
>
> The source stops, when I comment all the transformed streams and just
> print the input, the program completes. But this is custom SourceFunction,
> could this be related to this? Maybe I should implement emitWatermark? I'm
> using ingestion time so I assumed this wasn't needed.
>
> Greetings,
>
> Juan
>
> On Mon, Nov 21, 2016 at 9:17 AM, Aljoscha Krettek <al...@apache.org>
> wrote:
>
> Might it be that your initial source never stops? A loop will only
> terminate if both the original source stops and the loop timeout is reached.
>
> On Mon, 21 Nov 2016 at 07:58 Juan Rodríguez Hortalá <
> juan.rodriguez.hortala@gmail.com> wrote:
>
> Hi,
>
> I wrote a proof of concept for a Java version of mapWithState with
> time-based state eviction
> https://github.com/juanrh/flink-state-eviction/blob/a6bb0d4ca0908d2f4350209a4a41e381e99c76c5/src/main/java/com/github/juanrh/streaming/MapWithStateIterPoC.java.
> The idea is:
>
>  - Convert an input KeyedStream with key K and value V into a KeyedStream
> of Either<V, K>, with the original values as Left.
>  - Replace a ValueState<S> by a ValueState for a POJO that besides S it
> stores the timestamp of the last time that state was accessed.
>  - Define a IterativeStream from the Either stream, and apply a
> transformation function that periorically sends "tombstone" events as Right
> events in the closeWith of the IterativeStream. When a tombstone is
> received, delete the state with clear if it the time since it was last
> accessed is bigger than a configured time to live.
>
> This seems to work so far, but there are some things that look weird to
> me:
>
>  - The program never seems to stop, event though I Ihave defined the
> IterativeStream with
> https://ci.apache.org/projects/flink/flink-docs-master/api/java/org/apache/flink/streaming/api/datastream/DataStream.html#iterate-long-
> . The value of seems to be ignored. I'm using a custom source function, but
> it seems like the method SourceFunction.cancel() it's not being called.
>
>  - I'm getting several messages "WARN MetricGroup: Name collision: Group
> already contains a Metric with the name 'numRecordsOut'. Metric will not be
> reported. (null)". What does that mean?
>
> Thanks,
>
> Juan
>
>
>

Re: IterativeStream seems to ignore maxWaitTimeMillis

Posted by Juan Rodríguez Hortalá <ju...@gmail.com>.
Thanks for your answer Aljoscha,

The source stops, when I comment all the transformed streams and just print
the input, the program completes. But this is custom SourceFunction, could
this be related to this? Maybe I should implement emitWatermark? I'm using
ingestion time so I assumed this wasn't needed.

Greetings,

Juan

On Mon, Nov 21, 2016 at 9:17 AM, Aljoscha Krettek <al...@apache.org>
wrote:

> Might it be that your initial source never stops? A loop will only
> terminate if both the original source stops and the loop timeout is reached.
>
> On Mon, 21 Nov 2016 at 07:58 Juan Rodríguez Hortalá <
> juan.rodriguez.hortala@gmail.com> wrote:
>
>> Hi,
>>
>> I wrote a proof of concept for a Java version of mapWithState with
>> time-based state eviction https://github.com/juanrh/
>> flink-state-eviction/blob/a6bb0d4ca0908d2f4350209a4a41e3
>> 81e99c76c5/src/main/java/com/github/juanrh/streaming/
>> MapWithStateIterPoC.java. The idea is:
>>
>>  - Convert an input KeyedStream with key K and value V into a KeyedStream
>> of Either<V, K>, with the original values as Left.
>>  - Replace a ValueState<S> by a ValueState for a POJO that besides S it
>> stores the timestamp of the last time that state was accessed.
>>  - Define a IterativeStream from the Either stream, and apply a
>> transformation function that periorically sends "tombstone" events as Right
>> events in the closeWith of the IterativeStream. When a tombstone is
>> received, delete the state with clear if it the time since it was last
>> accessed is bigger than a configured time to live.
>>
>> This seems to work so far, but there are some things that look weird to
>> me:
>>
>>  - The program never seems to stop, event though I Ihave defined the
>> IterativeStream with https://ci.apache.org/projects/flink/flink-docs-
>> master/api/java/org/apache/flink/streaming/api/
>> datastream/DataStream.html#iterate-long- . The value of seems to be
>> ignored. I'm using a custom source function, but it seems like the method
>> SourceFunction.cancel() it's not being called.
>>
>>  - I'm getting several messages "WARN MetricGroup: Name collision: Group
>> already contains a Metric with the name 'numRecordsOut'. Metric will not be
>> reported. (null)". What does that mean?
>>
>> Thanks,
>>
>> Juan
>>
>

Re: IterativeStream seems to ignore maxWaitTimeMillis

Posted by Aljoscha Krettek <al...@apache.org>.
Might it be that your initial source never stops? A loop will only
terminate if both the original source stops and the loop timeout is reached.

On Mon, 21 Nov 2016 at 07:58 Juan Rodríguez Hortalá <
juan.rodriguez.hortala@gmail.com> wrote:

> Hi,
>
> I wrote a proof of concept for a Java version of mapWithState with
> time-based state eviction
> https://github.com/juanrh/flink-state-eviction/blob/a6bb0d4ca0908d2f4350209a4a41e381e99c76c5/src/main/java/com/github/juanrh/streaming/MapWithStateIterPoC.java.
> The idea is:
>
>  - Convert an input KeyedStream with key K and value V into a KeyedStream
> of Either<V, K>, with the original values as Left.
>  - Replace a ValueState<S> by a ValueState for a POJO that besides S it
> stores the timestamp of the last time that state was accessed.
>  - Define a IterativeStream from the Either stream, and apply a
> transformation function that periorically sends "tombstone" events as Right
> events in the closeWith of the IterativeStream. When a tombstone is
> received, delete the state with clear if it the time since it was last
> accessed is bigger than a configured time to live.
>
> This seems to work so far, but there are some things that look weird to
> me:
>
>  - The program never seems to stop, event though I Ihave defined the
> IterativeStream with
> https://ci.apache.org/projects/flink/flink-docs-master/api/java/org/apache/flink/streaming/api/datastream/DataStream.html#iterate-long-
> . The value of seems to be ignored. I'm using a custom source function, but
> it seems like the method SourceFunction.cancel() it's not being called.
>
>  - I'm getting several messages "WARN MetricGroup: Name collision: Group
> already contains a Metric with the name 'numRecordsOut'. Metric will not be
> reported. (null)". What does that mean?
>
> Thanks,
>
> Juan
>