You are viewing a plain text version of this content. The canonical link for it is here.
Posted to user@flink.apache.org by aitozi <gj...@gmail.com> on 2017/08/03 15:41:40 UTC

WaterMark & Eventwindow not fired correctly

Hi,

i have encounted a problem, i apply generate and assign watermark at the
datastream, and then keyBy, and  EventTimewindow and  apply window Function.

in the log, i can see that watermark and the eventtime with the message are
correct , and i think the situation bellow will trigger the window function
:

1、watermark Time >= window_end_time
2、there is data in [window_start_time,window_end_time)

i check the log , it is satisfied . and i try to apply the
trigger(CountTrigger.of(5)) Function and i can see in the log the
windowapply Function is invocked.

And i am doubt why is the windowapply Function can not be triggerd only by
the event time and watermark

thanks,
aitozi



--
View this message in context: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/WaterMark-Eventwindow-not-fired-correctly-tp14668.html
Sent from the Apache Flink User Mailing List archive. mailing list archive at Nabble.com.

Re: WaterMark & Eventwindow not fired correctly

Posted by Aljoscha Krettek <al...@apache.org>.
Hi,

So when the parallelism of the timestamp assigner is different from the parallelism of the map(KeyMapFunc()) or the window then it works? But when the parallelism is the same it does not work?

If this is true, then I would assume, that some parallel instances of the timestamp assigner don't get any events and therefore don't advance the watermark. This, in turn, would mean that the downstream watermark also doesn't advance. Could you check in the web interface if all parallel instances of the assigner are processing elements when you have the same parallelism for all operations?

Best,
Aljoscha

> On 9. Aug 2017, at 11:33, aitozi <gj...@gmail.com> wrote:
> 
> Hi, Bellow is my code 
> 
> splitStream.select(duringTime + "")
>                .map(new KeyMapFunc())
>                .assignTimestampsAndWatermarks(new DelaySaltWatermarks())
>                .setParallelism(300)
>                .keyBy(_SQL, _KEY, _SALT)
> 
> .window(TumblingEventTimeWindows.of(Time.seconds(duringTime/10)))
>                .apply(new WindowSaltFunc())
>                .keyBy(_SQL, _KEY)
> 
> .window(TumblingEventTimeWindows.of(Time.seconds(duringTime)))
>                .apply(new WindowFunc())
>                .addSink(new FlinkKafkaProducer010<>("topic", new
> SimpleSerializationSchema(), this.properties));
> 
> and 
> 
> public class DelaySaltWatermarks implements
> AssignerWithPeriodicWatermarks<ContentMessage> {
> 
>    private long currentMaxTimestamp;
> 
>    @Nullable
>    @Override
>    public Watermark getCurrentWatermark() {
>        return new Watermark(currentMaxTimestamp - MAX_OUT_OF_ORDER);
>    }
> 
>    @Override
>    public long extractTimestamp(ContentMessage contentMessage, long l) {
>        long timestamp = contentMessage.getTimestamp();
>        currentMaxTimestamp = Math.max(timestamp, currentMaxTimestamp);
>        return timestamp;
>    }
> }
> 
> and when i changed the Parallelism(300) of assigntimestampandwatermarks ,
> the window can be fired.
> 
> thanks,
> aitozi
> 
> 
> Aljoscha Krettek wrote
>> Hi,
>> 
>> So I understood that you have roughly this pipeline:
>> 
>> Input 1 --\
>>           |- CoFlatMap - TimestampAndWatermarkAssigner - KeyBy - Window    
>> Input 2 --/
>> 
>> If the timestamp assigner is after the CoFlatMap the processInput() method
>> of the extractor should still be called. Not by the StreamInputProcessor
>> but by ChainingOutput [1], which basically connects the Two-Input
>> CoFlatMap to the one-input operator that comes after that. The could still
>> be a bug in there somewhere, however.
>> 
>> Could you maybe send me the relevant parts of your code, so that I can
>> have a look. Or provide a minimal example.
>> 
>> Best,
>> Aljoscha
>> 
>> [1]
>> https://github.com/apache/flink/blob/6f5fa7f741538207244368c275bee9958c43a25a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/OperatorChain.java#L394
>> 
>>> On 7. Aug 2017, at 19:21, aitozi &lt;
> 
>> gjying1314@
> 
>> &gt; wrote:
>>> 
>>> 
>>> Hi,
>>> 
>>> my flink version is 1.2
>>> 
>>> i am work on this problem these days. Below is my found.
>>> 
>>> when i use "assignTimestampsAndWatermark" with same parallelism as 240 as
>>> the before operator, the before operator has two input(it is a
>>> "connected"
>>> Co-FlatMap operator with parallelism 240), it runs into that the
>>> watermark
>>> didn't update.
>>> 
>>> the i look into the source code, that the
>>> StreamTwoInputProcessor.java#processInput called by TwoInputStreamTask
>>> has
>>> method with processElement1() and processElement2() method, but all of
>>> them
>>> do not run processElement in StreamInputProcessor to
>>> extractTimestamp(shown
>>> in TimestampsAndPeriodicWatermarksOperator)
>>> 
>>> so that, the timestamp is not update, and my waterMark is update just
>>> like
>>> the class BoundedOutOfOrdernessTimestampExtractor .
>>> 
>>> So, is it a bug that the timestamp is not update when deal with a two
>>> input
>>> stream.
>>> 
>>> Ps: my English is not very good , i dont know can you understand me :)
>>> 
>>> thanks,
>>> aitozi
>>> 
>>> 
>>> 
>>> --
>>> View this message in context:
>>> http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/WaterMark-Eventwindow-not-fired-correctly-tp14668p14727.html
>>> Sent from the Apache Flink User Mailing List archive. mailing list
>>> archive at Nabble.com.
> 
> 
> 
> 
> 
> --
> View this message in context: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/WaterMark-Eventwindow-not-fired-correctly-tp14668p14753.html <http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/WaterMark-Eventwindow-not-fired-correctly-tp14668p14753.html>
> Sent from the Apache Flink User Mailing List archive. mailing list archive at Nabble.com <http://nabble.com/>.


Re: WaterMark & Eventwindow not fired correctly

Posted by aitozi <gj...@gmail.com>.
Hi, Bellow is my code 

 splitStream.select(duringTime + "")
                .map(new KeyMapFunc())
                .assignTimestampsAndWatermarks(new DelaySaltWatermarks())
                .setParallelism(300)
                .keyBy(_SQL, _KEY, _SALT)
               
.window(TumblingEventTimeWindows.of(Time.seconds(duringTime/10)))
                .apply(new WindowSaltFunc())
                .keyBy(_SQL, _KEY)
               
.window(TumblingEventTimeWindows.of(Time.seconds(duringTime)))
                .apply(new WindowFunc())
                .addSink(new FlinkKafkaProducer010<>("topic", new
SimpleSerializationSchema(), this.properties));

and 

public class DelaySaltWatermarks implements
AssignerWithPeriodicWatermarks<ContentMessage> {

    private long currentMaxTimestamp;

    @Nullable
    @Override
    public Watermark getCurrentWatermark() {
        return new Watermark(currentMaxTimestamp - MAX_OUT_OF_ORDER);
    }

    @Override
    public long extractTimestamp(ContentMessage contentMessage, long l) {
        long timestamp = contentMessage.getTimestamp();
        currentMaxTimestamp = Math.max(timestamp, currentMaxTimestamp);
        return timestamp;
    }
}

and when i changed the Parallelism(300) of assigntimestampandwatermarks ,
the window can be fired.

thanks,
aitozi


Aljoscha Krettek wrote
> Hi,
> 
> So I understood that you have roughly this pipeline:
> 
> Input 1 --\
>            |- CoFlatMap - TimestampAndWatermarkAssigner - KeyBy - Window    
> Input 2 --/
> 
> If the timestamp assigner is after the CoFlatMap the processInput() method
> of the extractor should still be called. Not by the StreamInputProcessor
> but by ChainingOutput [1], which basically connects the Two-Input
> CoFlatMap to the one-input operator that comes after that. The could still
> be a bug in there somewhere, however.
> 
> Could you maybe send me the relevant parts of your code, so that I can
> have a look. Or provide a minimal example.
> 
> Best,
> Aljoscha
> 
> [1]
> https://github.com/apache/flink/blob/6f5fa7f741538207244368c275bee9958c43a25a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/OperatorChain.java#L394
> 
>> On 7. Aug 2017, at 19:21, aitozi &lt;

> gjying1314@

> &gt; wrote:
>> 
>> 
>> Hi,
>> 
>> my flink version is 1.2
>> 
>> i am work on this problem these days. Below is my found.
>> 
>> when i use "assignTimestampsAndWatermark" with same parallelism as 240 as
>> the before operator, the before operator has two input(it is a
>> "connected"
>> Co-FlatMap operator with parallelism 240), it runs into that the
>> watermark
>> didn't update.
>> 
>> the i look into the source code, that the
>> StreamTwoInputProcessor.java#processInput called by TwoInputStreamTask
>> has
>> method with processElement1() and processElement2() method, but all of
>> them
>> do not run processElement in StreamInputProcessor to
>> extractTimestamp(shown
>> in TimestampsAndPeriodicWatermarksOperator)
>> 
>> so that, the timestamp is not update, and my waterMark is update just
>> like
>> the class BoundedOutOfOrdernessTimestampExtractor .
>> 
>> So, is it a bug that the timestamp is not update when deal with a two
>> input
>> stream.
>> 
>> Ps: my English is not very good , i dont know can you understand me :)
>> 
>> thanks,
>> aitozi
>> 
>> 
>> 
>> --
>> View this message in context:
>> http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/WaterMark-Eventwindow-not-fired-correctly-tp14668p14727.html
>> Sent from the Apache Flink User Mailing List archive. mailing list
>> archive at Nabble.com.





--
View this message in context: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/WaterMark-Eventwindow-not-fired-correctly-tp14668p14753.html
Sent from the Apache Flink User Mailing List archive. mailing list archive at Nabble.com.

Re: WaterMark & Eventwindow not fired correctly

Posted by Aljoscha Krettek <al...@apache.org>.
Hi,

So I understood that you have roughly this pipeline:

Input 1 --\
           |- CoFlatMap - TimestampAndWatermarkAssigner - KeyBy - Window    
Input 2 --/

If the timestamp assigner is after the CoFlatMap the processInput() method of the extractor should still be called. Not by the StreamInputProcessor but by ChainingOutput [1], which basically connects the Two-Input CoFlatMap to the one-input operator that comes after that. The could still be a bug in there somewhere, however.

Could you maybe send me the relevant parts of your code, so that I can have a look. Or provide a minimal example.

Best,
Aljoscha

[1] https://github.com/apache/flink/blob/6f5fa7f741538207244368c275bee9958c43a25a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/OperatorChain.java#L394

> On 7. Aug 2017, at 19:21, aitozi <gj...@gmail.com> wrote:
> 
> 
> Hi,
> 
> my flink version is 1.2
> 
> i am work on this problem these days. Below is my found.
> 
> when i use "assignTimestampsAndWatermark" with same parallelism as 240 as
> the before operator, the before operator has two input(it is a "connected"
> Co-FlatMap operator with parallelism 240), it runs into that the watermark
> didn't update.
> 
> the i look into the source code, that the
> StreamTwoInputProcessor.java#processInput called by TwoInputStreamTask has
> method with processElement1() and processElement2() method, but all of them
> do not run processElement in StreamInputProcessor to extractTimestamp(shown
> in TimestampsAndPeriodicWatermarksOperator)
> 
> so that, the timestamp is not update, and my waterMark is update just like
> the class BoundedOutOfOrdernessTimestampExtractor .
> 
> So, is it a bug that the timestamp is not update when deal with a two input
> stream.
> 
> Ps: my English is not very good , i dont know can you understand me :)
> 
> thanks,
> aitozi
> 
> 
> 
> --
> View this message in context: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/WaterMark-Eventwindow-not-fired-correctly-tp14668p14727.html
> Sent from the Apache Flink User Mailing List archive. mailing list archive at Nabble.com.


Re: WaterMark & Eventwindow not fired correctly

Posted by aitozi <gj...@gmail.com>.
Hi,

my flink version is 1.2

i am work on this problem these days. Below is my found.

when i use "assignTimestampsAndWatermark" with same parallelism as 240 as
the before operator, the before operator has two input(it is a "connected"
Co-FlatMap operator with parallelism 240), it runs into that the watermark
didn't update.

the i look into the source code, that the
StreamTwoInputProcessor.java#processInput called by TwoInputStreamTask has
method with processElement1() and processElement2() method, but all of them
do not run processElement in StreamInputProcessor to extractTimestamp(shown
in TimestampsAndPeriodicWatermarksOperator)

so that, the timestamp is not update, and my waterMark is update just like
the class BoundedOutOfOrdernessTimestampExtractor .

So, is it a bug that the timestamp is not update when deal with a two input
stream.

Ps: my English is not very good , i dont know can you understand me :)

thanks,
aitozi



--
View this message in context: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/WaterMark-Eventwindow-not-fired-correctly-tp14668p14727.html
Sent from the Apache Flink User Mailing List archive. mailing list archive at Nabble.com.

Re: WaterMark & Eventwindow not fired correctly

Posted by Aljoscha Krettek <al...@apache.org>.
Hi,

Could you please provide a snipped of code or some minimal example that would help us reproducing your problem?

Best,
Aljoscha

> On 3. Aug 2017, at 17:41, aitozi <gj...@gmail.com> wrote:
> 
> 
> Hi,
> 
> i have encounted a problem, i apply generate and assign watermark at the
> datastream, and then keyBy, and  EventTimewindow and  apply window Function.
> 
> in the log, i can see that watermark and the eventtime with the message are
> correct , and i think the situation bellow will trigger the window function
> :
> 
> 1、watermark Time >= window_end_time
> 2、there is data in [window_start_time,window_end_time)
> 
> i check the log , it is satisfied . and i try to apply the
> trigger(CountTrigger.of(5)) Function and i can see in the log the
> windowapply Function is invocked.
> 
> And i am doubt why is the windowapply Function can not be triggerd only by
> the event time and watermark
> 
> thanks,
> aitozi
> 
> 
> 
> --
> View this message in context: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/WaterMark-Eventwindow-not-fired-correctly-tp14668.html
> Sent from the Apache Flink User Mailing List archive. mailing list archive at Nabble.com.