You are viewing a plain text version of this content. The canonical link for it is here.
Posted to user@flink.apache.org by Vijay Balakrishnan <bv...@gmail.com> on 2018/12/14 16:59:40 UTC

Watermark not firing to push data

Hi,
Observations on Watermarks:
Read this great article:
https://data-artisans.com/blog/watermarks-in-apache-flink-made-easy

* Watermark means when for any event TS, when to stop waiting for arrival
of earlier events.
* Watermark t means all events with Timestamp < t have already arrived.
* When to push data out - When watermark with TS >= t arrives

Only *using incrementing current time for watermark seems to be working
correctly* but not sure if it aligns up correctly with EventTime processing.
*Using the incoming records intervalStart as the Watermark source  for
EventTime causes data to not be pushed at all* in cases when i have just 5
records in the Source.

My source generation for intervalStart has intervalStart incrementing at a
regular interval.
I tried using the intervalStart for my Watermark with a out of order late
boundedness of 3 secs.
The *AggregateFunction* I am using calls the add() fine but *never calls
the getResult().*
My assumption was that the AggregateFunction I am using would push the data
to getResult
based on the Watermark based on intervalStart incrementing beyong the
previous watermark t.
But it doesn't -is it because I have limited number of input records and
once intervalStart gets to the end
of the input records too fast, it stops incrementing the watermar and hence
doesn't push data ?

With System.currentTimeMillis, it happily keeps increasing and hence pushes
the data.

Created this class:
public class MonitoringAssigner implements
AssignerWithPunctuatedWatermarks<Monitoring> {
    private long bound = 3 * 1000;//3 secs out of order bound in millisecs

    public MonitoringAssigner(long bound) {
        this.bound = bound;
    }
    public Watermark checkAndGetNextWatermark(Monitoring monitoring, long
extractedTimestamp) {
        long nextWatermark = extractedTimestamp - bound;
        //simply emit a Watermark with every event
        return new Watermark(nextWatermark);
    }

    @Override
    public long extractTimestamp(Monitoring monitoring, long previousTS) {
        /*LocalDateTime intervalStart =
Utils.getLocalDateTime(monitoring.getIntervalStart());//2012-07-12
02:21:06.057
        long extractedTS =
Utils.getLongFromLocalDateTime(intervalStart);//*using
this stopped pushing recs after a certain time*
        return extractedTS;*/
        return *System.currentTimeMillis*();//incrementing current time

    }

Re: Watermark not firing to push data

Posted by Vijay Balakrishnan <bv...@gmail.com>.
Hi,
After looking at the code in EventTimeTrigger, I changed the Watermark to
be System.currentMillisecs + boundSecs( 5 secs) so that the window's maxTS
was <= watermark. I was able to consumer from Kinesis when I had only 50
records.

For TumblingWindow of 5 secs , the window maxTS was usually like around
currTime + 5 secs.
So, I set the watermark to System.currentMillisecs + 5 secs.
This way, the trigger fired and got into the AggregateFunction.getResult().

@Override
public TriggerResult onElement(Object element, long timestamp,
TimeWindow window, TriggerContext ctx) throws Exception {
   if (window.maxTimestamp() <= ctx.getCurrentWatermark()) {//<== This
check had to be met
      // if the watermark is already past the window fire immediately
      return TriggerResult.FIRE;
   } else {
      ctx.registerEventTimeTimer(window.maxTimestamp());
      return TriggerResult.CONTINUE;
   }
}


On Mon, Dec 17, 2018 at 10:00 AM Vijay Balakrishnan <bv...@gmail.com>
wrote:

> Hi,
> Thx for your reply and pointers on the currentLowWatermark. Looks like the
> Flink UI has tab for Watermarks itself for an Operator.
>
> I dump 5 records into the Kinesis Data Stream and am trying to read the
> same record from the FlinkKinesisConsumer and am not able to.
> I am using the same monitoring.getIntervalStart() in the Watermark
> generation(intervalStart - bound) in *MonitoringAssigner* class that I
> used to generate data on the producer side. I generate intervalStart on the
> Producer side which increments on each record by 3-10 millisecs. The
> watermark is being generated with intervalStart - bound(3 secs)-so, every
> watermark generated is > than the previous one. So, why does it not push
> data out ?  It gets into the MGroupingWindowAggregate.add(..) method but
> never gets into the MGroupingWindowAggregate.getResult(..) method ?? It
> works when i produce 1000 records or so into Kinesis data stream.
>
> Here is a gist of my code-
> StreamExecutionEnvironment env =
> StreamExecutionEnvironment.getExecutionEnvironment();
> env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);
> //FlinkConsumer
> Properties kinesisConsumerConfig = new Properties();
>         ......
> kinesisConsumerConfig.setProperty(ConsumerConfigConstants.SHARD_GETRECORDS_MAX,
> "10000");
> kinesisConsumerConfig.setProperty(ConsumerConfigConstants.SHARD_GETRECORDS_INTERVAL_MILLIS,
> "2000");//2000
> kinesisConsumerConfig.put(ConsumerConfigConstants.STREAM_INITIAL_POSITION,
> ConsumerConfigConstants.InitialPosition.*TRIM_HORIZON*.name());
> FlinkKinesisConsumer<Monitoring> kinesisConsumer = FlinkKinesisConsumer<>(
>                 kinesisTopicRead, new MonitoringMapKinesisSchema(),
> kinesisConsumerConfig);
> final DataStreamSource<Monitoring> monitoringDataStreamSource =
> env.addSource(kinesisConsumer);
> DataStream<Monitoring> kinesisStream =
> monitoringDataStreamSource.assignTimestampsAndWatermarks(new
> *MonitoringAssigner*(3000));//code at bottom
>
> org.apache.flink.streaming.api.windowing.time.Time timeWindow =
> Time.seconds(5);
> final WindowedStream<Monitoring, Tuple, TimeWindow> windowStream =
>                 kinesisStream.timeWindow(timeWindow);
> DataStream<MonitoringGrouping> enrichedComponentInstanceStream1 =
> windowStream.*aggregate*(
>                 new *MGroupingWindowAggregate*(....),//AggregateFunction
> impl
>                 new *MGroupingAggregateWindowProcessing*(...));
>
> public class MonitoringAssigner implements
> AssignerWithPunctuatedWatermarks<Monitoring> {
>     private long *bound = 3 * 1000*;//3 secs out of order bound in
> millisecs
> public MonitoringAssigner(long bound) {
>         this.bound = bound;
>     }
>     public Watermark checkAndGetNextWatermark(Monitoring monitoring, long
> extractedTimestamp) {
>         long nextWatermark = extractedTimestamp - bound;
>         return new Watermark(nextWatermark);
>     }
>     public long extractTimestamp(Monitoring monitoring, long previousTS) {
>         LocalDateTime intervalStart = Utils.getLocalDateTime(
> *monitoring.getIntervalStart()*);//2012-07-12 02:21:06.057
>         long extractedTS = Utils.getLongFromLocalDateTime(intervalStart);
>         return extractedTS;
>         //return System.currentTimeMillis(); //this works fine.
>     }
> }
>
> TIA,
> Vijay
>
> On Sat, Dec 15, 2018 at 5:42 AM Hequn Cheng <ch...@gmail.com> wrote:
>
>> Hi Vijay,
>>
>> Could you provide more information about your problem? For example
>> - Which kind of window do you use?
>> - What's the window size?
>> - A relatively complete code is better :-)
>>
>> As for the problem, it is probably the event time has not reached the end
>> of the window. You can monitor the watermark in the web dashboard[1].
>> Also, changing even time to processing time is another way to verify if
>> it is a watermark problem.
>>
>> Best, Hequn
>>
>> [1]
>> https://ci.apache.org/projects/flink/flink-docs-master/monitoring/debugging_event_time.html
>>
>>
>> On Sat, Dec 15, 2018 at 12:59 AM Vijay Balakrishnan <bv...@gmail.com>
>> wrote:
>>
>>> Hi,
>>> Observations on Watermarks:
>>> Read this great article:
>>> https://data-artisans.com/blog/watermarks-in-apache-flink-made-easy
>>>
>>> * Watermark means when for any event TS, when to stop waiting for
>>> arrival of earlier events.
>>> * Watermark t means all events with Timestamp < t have already arrived.
>>> * When to push data out - When watermark with TS >= t arrives
>>>
>>> Only *using incrementing current time for watermark seems to be working
>>> correctly* but not sure if it aligns up correctly with EventTime
>>> processing.
>>> *Using the incoming records intervalStart as the Watermark source  for
>>> EventTime causes data to not be pushed at all* in cases when i have
>>> just 5 records in the Source.
>>>
>>> My source generation for intervalStart has intervalStart incrementing at
>>> a regular interval.
>>> I tried using the intervalStart for my Watermark with a out of order
>>> late boundedness of 3 secs.
>>> The *AggregateFunction* I am using calls the add() fine but *never
>>> calls the getResult().*
>>> My assumption was that the AggregateFunction I am using would push the
>>> data to getResult
>>> based on the Watermark based on intervalStart incrementing beyong the
>>> previous watermark t.
>>> But it doesn't -is it because I have limited number of input records and
>>> once intervalStart gets to the end
>>> of the input records too fast, it stops incrementing the watermar and
>>> hence doesn't push data ?
>>>
>>> With System.currentTimeMillis, it happily keeps increasing and hence
>>> pushes the data.
>>>
>>> Created this class:
>>> public class MonitoringAssigner implements
>>> AssignerWithPunctuatedWatermarks<Monitoring> {
>>>     private long bound = 3 * 1000;//3 secs out of order bound in
>>> millisecs
>>>
>>>     public MonitoringAssigner(long bound) {
>>>         this.bound = bound;
>>>     }
>>>     public Watermark checkAndGetNextWatermark(Monitoring monitoring,
>>> long extractedTimestamp) {
>>>         long nextWatermark = extractedTimestamp - bound;
>>>         //simply emit a Watermark with every event
>>>         return new Watermark(nextWatermark);
>>>     }
>>>
>>>     @Override
>>>     public long extractTimestamp(Monitoring monitoring, long previousTS)
>>> {
>>>         /*LocalDateTime intervalStart =
>>> Utils.getLocalDateTime(monitoring.getIntervalStart());//2012-07-12
>>> 02:21:06.057
>>>         long extractedTS =
>>> Utils.getLongFromLocalDateTime(intervalStart);//*using this stopped
>>> pushing recs after a certain time*
>>>         return extractedTS;*/
>>>         return *System.currentTimeMillis*();//incrementing current time
>>>
>>>     }
>>>
>>>

Re: Watermark not firing to push data

Posted by Vijay Balakrishnan <bv...@gmail.com>.
Hi,
Thx for your reply and pointers on the currentLowWatermark. Looks like the
Flink UI has tab for Watermarks itself for an Operator.

I dump 5 records into the Kinesis Data Stream and am trying to read the
same record from the FlinkKinesisConsumer and am not able to.
I am using the same monitoring.getIntervalStart() in the Watermark
generation(intervalStart - bound) in *MonitoringAssigner* class that I used
to generate data on the producer side. I generate intervalStart on the
Producer side which increments on each record by 3-10 millisecs. The
watermark is being generated with intervalStart - bound(3 secs)-so, every
watermark generated is > than the previous one. So, why does it not push
data out ?  It gets into the MGroupingWindowAggregate.add(..) method but
never gets into the MGroupingWindowAggregate.getResult(..) method ?? It
works when i produce 1000 records or so into Kinesis data stream.

Here is a gist of my code-
StreamExecutionEnvironment env =
StreamExecutionEnvironment.getExecutionEnvironment();
env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);
//FlinkConsumer
Properties kinesisConsumerConfig = new Properties();
        ......
kinesisConsumerConfig.setProperty(ConsumerConfigConstants.SHARD_GETRECORDS_MAX,
"10000");
kinesisConsumerConfig.setProperty(ConsumerConfigConstants.SHARD_GETRECORDS_INTERVAL_MILLIS,
"2000");//2000
kinesisConsumerConfig.put(ConsumerConfigConstants.STREAM_INITIAL_POSITION,
ConsumerConfigConstants.InitialPosition.*TRIM_HORIZON*.name());
FlinkKinesisConsumer<Monitoring> kinesisConsumer = FlinkKinesisConsumer<>(
                kinesisTopicRead, new MonitoringMapKinesisSchema(),
kinesisConsumerConfig);
final DataStreamSource<Monitoring> monitoringDataStreamSource =
env.addSource(kinesisConsumer);
DataStream<Monitoring> kinesisStream =
monitoringDataStreamSource.assignTimestampsAndWatermarks(new
*MonitoringAssigner*(3000));//code at bottom

org.apache.flink.streaming.api.windowing.time.Time timeWindow =
Time.seconds(5);
final WindowedStream<Monitoring, Tuple, TimeWindow> windowStream =
                kinesisStream.timeWindow(timeWindow);
DataStream<MonitoringGrouping> enrichedComponentInstanceStream1 =
windowStream.*aggregate*(
                new *MGroupingWindowAggregate*(....),//AggregateFunction
impl
                new *MGroupingAggregateWindowProcessing*(...));

public class MonitoringAssigner implements
AssignerWithPunctuatedWatermarks<Monitoring> {
    private long *bound = 3 * 1000*;//3 secs out of order bound in millisecs
public MonitoringAssigner(long bound) {
        this.bound = bound;
    }
    public Watermark checkAndGetNextWatermark(Monitoring monitoring, long
extractedTimestamp) {
        long nextWatermark = extractedTimestamp - bound;
        return new Watermark(nextWatermark);
    }
    public long extractTimestamp(Monitoring monitoring, long previousTS) {
        LocalDateTime intervalStart = Utils.getLocalDateTime(
*monitoring.getIntervalStart()*);//2012-07-12 02:21:06.057
        long extractedTS = Utils.getLongFromLocalDateTime(intervalStart);
        return extractedTS;
        //return System.currentTimeMillis(); //this works fine.
    }
}

TIA,
Vijay

On Sat, Dec 15, 2018 at 5:42 AM Hequn Cheng <ch...@gmail.com> wrote:

> Hi Vijay,
>
> Could you provide more information about your problem? For example
> - Which kind of window do you use?
> - What's the window size?
> - A relatively complete code is better :-)
>
> As for the problem, it is probably the event time has not reached the end
> of the window. You can monitor the watermark in the web dashboard[1].
> Also, changing even time to processing time is another way to verify if it
> is a watermark problem.
>
> Best, Hequn
>
> [1]
> https://ci.apache.org/projects/flink/flink-docs-master/monitoring/debugging_event_time.html
>
>
> On Sat, Dec 15, 2018 at 12:59 AM Vijay Balakrishnan <bv...@gmail.com>
> wrote:
>
>> Hi,
>> Observations on Watermarks:
>> Read this great article:
>> https://data-artisans.com/blog/watermarks-in-apache-flink-made-easy
>>
>> * Watermark means when for any event TS, when to stop waiting for arrival
>> of earlier events.
>> * Watermark t means all events with Timestamp < t have already arrived.
>> * When to push data out - When watermark with TS >= t arrives
>>
>> Only *using incrementing current time for watermark seems to be working
>> correctly* but not sure if it aligns up correctly with EventTime
>> processing.
>> *Using the incoming records intervalStart as the Watermark source  for
>> EventTime causes data to not be pushed at all* in cases when i have just
>> 5 records in the Source.
>>
>> My source generation for intervalStart has intervalStart incrementing at
>> a regular interval.
>> I tried using the intervalStart for my Watermark with a out of order late
>> boundedness of 3 secs.
>> The *AggregateFunction* I am using calls the add() fine but *never calls
>> the getResult().*
>> My assumption was that the AggregateFunction I am using would push the
>> data to getResult
>> based on the Watermark based on intervalStart incrementing beyong the
>> previous watermark t.
>> But it doesn't -is it because I have limited number of input records and
>> once intervalStart gets to the end
>> of the input records too fast, it stops incrementing the watermar and
>> hence doesn't push data ?
>>
>> With System.currentTimeMillis, it happily keeps increasing and hence
>> pushes the data.
>>
>> Created this class:
>> public class MonitoringAssigner implements
>> AssignerWithPunctuatedWatermarks<Monitoring> {
>>     private long bound = 3 * 1000;//3 secs out of order bound in millisecs
>>
>>     public MonitoringAssigner(long bound) {
>>         this.bound = bound;
>>     }
>>     public Watermark checkAndGetNextWatermark(Monitoring monitoring, long
>> extractedTimestamp) {
>>         long nextWatermark = extractedTimestamp - bound;
>>         //simply emit a Watermark with every event
>>         return new Watermark(nextWatermark);
>>     }
>>
>>     @Override
>>     public long extractTimestamp(Monitoring monitoring, long previousTS) {
>>         /*LocalDateTime intervalStart =
>> Utils.getLocalDateTime(monitoring.getIntervalStart());//2012-07-12
>> 02:21:06.057
>>         long extractedTS =
>> Utils.getLongFromLocalDateTime(intervalStart);//*using this stopped
>> pushing recs after a certain time*
>>         return extractedTS;*/
>>         return *System.currentTimeMillis*();//incrementing current time
>>
>>     }
>>
>>

Re: Watermark not firing to push data

Posted by Hequn Cheng <ch...@gmail.com>.
Hi Vijay,

Could you provide more information about your problem? For example
- Which kind of window do you use?
- What's the window size?
- A relatively complete code is better :-)

As for the problem, it is probably the event time has not reached the end
of the window. You can monitor the watermark in the web dashboard[1].
Also, changing even time to processing time is another way to verify if it
is a watermark problem.

Best, Hequn

[1]
https://ci.apache.org/projects/flink/flink-docs-master/monitoring/debugging_event_time.html


On Sat, Dec 15, 2018 at 12:59 AM Vijay Balakrishnan <bv...@gmail.com>
wrote:

> Hi,
> Observations on Watermarks:
> Read this great article:
> https://data-artisans.com/blog/watermarks-in-apache-flink-made-easy
>
> * Watermark means when for any event TS, when to stop waiting for arrival
> of earlier events.
> * Watermark t means all events with Timestamp < t have already arrived.
> * When to push data out - When watermark with TS >= t arrives
>
> Only *using incrementing current time for watermark seems to be working
> correctly* but not sure if it aligns up correctly with EventTime
> processing.
> *Using the incoming records intervalStart as the Watermark source  for
> EventTime causes data to not be pushed at all* in cases when i have just
> 5 records in the Source.
>
> My source generation for intervalStart has intervalStart incrementing at a
> regular interval.
> I tried using the intervalStart for my Watermark with a out of order late
> boundedness of 3 secs.
> The *AggregateFunction* I am using calls the add() fine but *never calls
> the getResult().*
> My assumption was that the AggregateFunction I am using would push the
> data to getResult
> based on the Watermark based on intervalStart incrementing beyong the
> previous watermark t.
> But it doesn't -is it because I have limited number of input records and
> once intervalStart gets to the end
> of the input records too fast, it stops incrementing the watermar and
> hence doesn't push data ?
>
> With System.currentTimeMillis, it happily keeps increasing and hence
> pushes the data.
>
> Created this class:
> public class MonitoringAssigner implements
> AssignerWithPunctuatedWatermarks<Monitoring> {
>     private long bound = 3 * 1000;//3 secs out of order bound in millisecs
>
>     public MonitoringAssigner(long bound) {
>         this.bound = bound;
>     }
>     public Watermark checkAndGetNextWatermark(Monitoring monitoring, long
> extractedTimestamp) {
>         long nextWatermark = extractedTimestamp - bound;
>         //simply emit a Watermark with every event
>         return new Watermark(nextWatermark);
>     }
>
>     @Override
>     public long extractTimestamp(Monitoring monitoring, long previousTS) {
>         /*LocalDateTime intervalStart =
> Utils.getLocalDateTime(monitoring.getIntervalStart());//2012-07-12
> 02:21:06.057
>         long extractedTS = Utils.getLongFromLocalDateTime(intervalStart);//*using
> this stopped pushing recs after a certain time*
>         return extractedTS;*/
>         return *System.currentTimeMillis*();//incrementing current time
>
>     }
>
>