You are viewing a plain text version of this content. The canonical link for it is here.
Posted to user@flink.apache.org by fu...@oracle.com on 2020/11/10 23:27:56 UTC

BoundedOutOfOrderness Watermark Generator is NOT making the event time to advance

Hi Experts,

I am trying to use to implement a KeyedProcessFunction with onTimer() 
callback. I need to use event time and I meet some problems with making 
the watermark available to my operator. I meet some strange behaviors.

I have a joined retracted stream without watermark or timestamp 
information and i need to assign timestamps and watermarks to it. The 
timestamp is just a field in the stream. For the watermark generator part.

Problem:

1. I can use timelag watermark generator and make it work. But for 
BoundedOutofOrdernessGenator, The 
context.timerService().currentWatermark() in ProcessElement() always 
sticks to the initial setup and never updates.

2. I set the autoWatermark interval to 5 seconds for debug purpose, I 
only attach this watermark generator in one place with parallelism 1. 
However, I am getting 8 records at a time. timelag policy will advance 
all 8 records, outOfOrderness policy will only advance 1 records. Maybe 
the mismatch is causing the processElement() to capture the wrong 
default watermark?

https://ci.apache.org/projects/flink/flink-docs-release-1.11/dev/event_timestamps_watermarks.html#writing-a-periodic-watermarkgenerator

This is my code for watermark generator:

@Slf4j
public class PeriodicTableOutputWatermarkGenerator implements 
WatermarkGenerator<Tuple2<Boolean, Row>> {
     private final long maxTimeLag = 15000;
     private transient long currentMaxTimestamp = 15000;
     @Override
     public void onEvent(Tuple2<Boolean, Row> booleanRowTuple2, long 
eventTimestamp, WatermarkOutput output) {
         // the eventTimestamp is get through TimestampAssigner
         // 
https://github.com/apache/flink/blob/f24cb3f3b7e773706188ae92998b3e1ffbf1829e/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/TimestampsAndWatermarksOperator.java
         currentMaxTimestamp = Math.max(eventTimestamp, 
currentMaxTimestamp);
         log.info("eventTimestamp in onEvent method: {}", eventTimestamp);
     }

     @Override
     public void onPeriodicEmit(WatermarkOutput output) {
         // Policy 1: timelag strategy, can work and advance the timestamp
         long watermarkEpochTime = Math.max(System.currentTimeMillis() - 
maxTimeLag, currentMaxTimestamp);
         output.emitWatermark(new Watermark(watermarkEpochTime));

         // Policy 2: periodic emit based on event
         long periodicEmitWatermarkTime = currentMaxTimestamp - maxTimeLag;
         // output.emitWatermark(new Watermark(periodicEmitWatermarkTime));

         log.info("Emit Watermark: watermark based on system time: {}, 
periodicEmitWatermarkTime: {}, currentMaxTimestamp: {}"
                 , watermarkEpochTime, periodicEmitWatermarkTime, 
currentMaxTimestamp);
     }
}


This is my log printed by the slf4j log above. Every time, it will give 
me 8 records, why it is 8 records? I think it should be 1 in theory. I 
am very confused. Also, the policy 1 is advancing all 8 records. Policy 
2 is advancing 1 of the 8 records and not reflected in processElement().

14:28:01,199 INFO 
org.myorg.quickstart.operator.PeriodicTableOutputWatermarkGenerator - 
Emit Watermark: watermark based on system time: 1605047266198, 
periodicEmitWatermarkTime: 0, currentMaxTimestamp: 15000
14:28:01,199 INFO 
org.myorg.quickstart.operator.PeriodicTableOutputWatermarkGenerator - 
Emit Watermark: watermark based on system time: 1605047266199, 
periodicEmitWatermarkTime: 1605047172881, currentMaxTimestamp: 1605047187881
14:28:01,199 INFO 
org.myorg.quickstart.operator.PeriodicTableOutputWatermarkGenerator - 
Emit Watermark: watermark based on system time: 1605047266199, 
periodicEmitWatermarkTime: 0, currentMaxTimestamp: 15000
14:28:01,199 INFO 
org.myorg.quickstart.operator.PeriodicTableOutputWatermarkGenerator - 
Emit Watermark: watermark based on system time: 1605047266198, 
periodicEmitWatermarkTime: 0, currentMaxTimestamp: 15000
14:28:01,199 INFO 
org.myorg.quickstart.operator.PeriodicTableOutputWatermarkGenerator - 
Emit Watermark: watermark based on system time: 1605047266198, 
periodicEmitWatermarkTime: 0, currentMaxTimestamp: 15000
14:28:01,199 INFO 
org.myorg.quickstart.operator.PeriodicTableOutputWatermarkGenerator - 
Emit Watermark: watermark based on system time: 1605047266198, 
periodicEmitWatermarkTime: 0, currentMaxTimestamp: 15000
14:28:01,199 INFO 
org.myorg.quickstart.operator.PeriodicTableOutputWatermarkGenerator - 
Emit Watermark: watermark based on system time: 1605047266198, 
periodicEmitWatermarkTime: 0, currentMaxTimestamp: 15000
14:28:01,199 INFO 
org.myorg.quickstart.operator.PeriodicTableOutputWatermarkGenerator - 
Emit Watermark: watermark based on system time: 1605047266198, 
periodicEmitWatermarkTime: 0, currentMaxTimestamp: 15000

14:28:06,200 INFO 
org.myorg.quickstart.operator.PeriodicTableOutputWatermarkGenerator - 
Emit Watermark: watermark based on system time: 1605047271200, 
periodicEmitWatermarkTime: 0, currentMaxTimestamp: 15000
14:28:06,200 INFO 
org.myorg.quickstart.operator.PeriodicTableOutputWatermarkGenerator - 
Emit Watermark: watermark based on system time: 1605047271200, 
periodicEmitWatermarkTime: 0, currentMaxTimestamp: 15000
14:28:06,200 INFO 
org.myorg.quickstart.operator.PeriodicTableOutputWatermarkGenerator - 
Emit Watermark: watermark based on system time: 1605047271200, 
periodicEmitWatermarkTime: 1605047172881, currentMaxTimestamp: 1605047187881
14:28:06,200 INFO 
org.myorg.quickstart.operator.PeriodicTableOutputWatermarkGenerator - 
Emit Watermark: watermark based on system time: 1605047271200, 
periodicEmitWatermarkTime: 0, currentMaxTimestamp: 15000
14:28:06,200 INFO 
org.myorg.quickstart.operator.PeriodicTableOutputWatermarkGenerator - 
Emit Watermark: watermark based on system time: 1605047271200, 
periodicEmitWatermarkTime: 0, currentMaxTimestamp: 15000
14:28:06,200 INFO 
org.myorg.quickstart.operator.PeriodicTableOutputWatermarkGenerator - 
Emit Watermark: watermark based on system time: 1605047271200, 
periodicEmitWatermarkTime: 0, currentMaxTimestamp: 15000
14:28:06,200 INFO 
org.myorg.quickstart.operator.PeriodicTableOutputWatermarkGenerator - 
Emit Watermark: watermark based on system time: 1605047271200, 
periodicEmitWatermarkTime: 0, currentMaxTimestamp: 15000
14:28:06,200 INFO 
org.myorg.quickstart.operator.PeriodicTableOutputWatermarkGenerator - 
Emit Watermark: watermark based on system time: 1605047271200, 
periodicEmitWatermarkTime: 0, currentMaxTimestamp: 15000


Any insights? Thank you very much!


Best,

Fuyao


Re: BoundedOutOfOrderness Watermark Generator is NOT making the event time to advance

Posted by fu...@oracle.com.
The test workflow attachment is not added in the previous email, sorry 
for the confusion, please refer to the describe text workflow.. Thanks.


On 11/12/20 16:17, fuyao.li@oracle.com wrote:
>
> Hi All,
>
> Just to add a little more context to the problem. I have a full outer 
> join operation before this stage. The source data stream for full 
> outer join is a Kafka Source. I also added timestamp and watermarks to 
> the FlinkKafkaConsumer. After that, it makes no difference to the 
> result, still can not make the watermark to advance.
>
> overall workflow:
>
> two kafka topics -> two data streams in Flink -> join them together 
> and convert to retract stream -> do KeyedProcessFunction and schedule 
> event time timer and onTimer logic in it -> push to downstream sink.
>
> I think there is no issues with my Syntax. But I still could NOT make 
> the watermark to advance for event time using bound out of orderness 
> strategy. (In Flink Cluster, the behavior is different, the watermark 
> is advancing, but onTimer is still not triggered correctly. :(
>
> I guess the reason is that I receive 8 records for each round of 
> onPeriodicEmit(), only one of the eight is updated for 
> BoundedOutOfOrderness Strategy. For timelag strategy (refer to the 
> first email in the thread), they are all updated so that it will make 
> the watermark to advance. I just don't know why I got 8 records every 
> time even if I have parallelism as 1. (logs can be found in the first 
> email in the thread.)
>
> I also tried to debug inside Flink web interface based on the link: 
> https://ci.apache.org/projects/flink/flink-docs-release-1.11/monitoring/debugging_event_time.html
>
> The logs produced by Flink local cluster is different from directly 
> starting my application. *Why the behavior is inconsistent...? *The 
> context timestamp sticks to LONG.MIN_VALUE during IDE debug, however, 
> it is updated correctly in the Flink Cluster, except for the first 
> record to be the the default value. But, I am still not getting the 
> scheduled logic triggered correctly inside the onTimer method. My test 
> workflow can be seen in the attachment. I have read through previous 
> archives about the not updated watermark (stick to LONG.MIN_VALUE), it 
> doesn't help much in my case. Thanks.
>
>
> Best,
>
> Fuyao
>
>
>
> On 11/11/20 11:33, fuyao.li@oracle.com wrote:
>>
>> Hi Community,
>>
>>
>> Regarding this problem, could someone give me an explanation? Thanks.
>>
>> Best,
>>
>> Fuyao
>>
>> On 11/10/20 16:56, fuyao.li@oracle.com wrote:
>>>
>>> Hi Kevin,
>>>
>>> Sorry for the name typo...
>>>
>>> On 11/10/20 16:48, fuyao.li@oracle.com wrote:
>>>>
>>>> Hi Kavin,
>>>>
>>>> Thanks for your example. I think I have already done something very 
>>>> very similar before. I didn't post the full WatermarkStrategy 
>>>> interface in my previous email, but I do have that part already. I 
>>>> think the example you gave me is a punctuatedWatermarkStrategy, not 
>>>> boundoutoforderness one. My major concern now is that why my 
>>>> emitted watermark is not available in processElement() and why I 
>>>> have 8 records for each time the code reaches the onPeriodicEmit 
>>>> part. I will post my code following your example below.
>>>>
>>>> The symptom is that I will get the context watermark as 
>>>> LONG.MIN_VALUE if I use the watermark strategy below.
>>>>
>>>> 16:35:12,969 INFO 
>>>> org.myorg.quickstart.processor.TableOutputProcessFunction - context 
>>>> current key: 69215, context current watermark: -9223372036854775808
>>>>
>>>>
>>>> DataStream<Tuple2<Boolean, Row>> retractStream =tEnv.toRetractStream(table, Row.class);
>>>> retractStream
>>>>      .assignTimestampsAndWatermarks(new PunctuatedWatermarkStrategy())
>>>>      .keyBy(
>>>>          value -> {String invoice_id_key = (String) value.f1.getField(0); if (invoice_id_key ==null) {
>>>>              invoice_id_key = (String) value.f1.getField(4); }
>>>>            return invoice_id_key; })
>>>>      .process(new TableOutputProcessFunction())
>>>>      .name("ProcessTableOutput")
>>>>      .uid("ProcessTableOutput")
>>>>      .addSink(businessObjectSink)
>>>>      .name("businessObjectSink")
>>>>      .uid("businessObjectSink")
>>>>      .setParallelism(1);
>>>>
>>>> watermark strategy:
>>>>
>>>> public class PunctuatedWatermarkStrategyimplements WatermarkStrategy<Tuple2<Boolean, Row>> {
>>>>      @Override public WatermarkGenerator<Tuple2<Boolean, Row>>createWatermarkGenerator(WatermarkGeneratorSupplier.Context context) {
>>>>          return new PunctuatedTableOutputWatermarkGenerator(); }
>>>>
>>>>      @Override public TimestampAssigner<Tuple2<Boolean, Row>>createTimestampAssigner(TimestampAssignerSupplier.Context context) {
>>>>          log.info("Inside timestamp assigner"); return (booleanRowTuple2, previousElementTimestamp) -> {
>>>>              return my timestamp; }; }
>>>> }
>>>>
>>>> watermark generator code:
>>>>
>>>> public class PunctuatedTableOutputWatermarkGeneratorimplements WatermarkGenerator<Tuple2<Boolean, Row>> {
>>>>      @Override public void onEvent(Tuple2<Boolean, Row> booleanRowTuple2, long eventTimestamp, WatermarkOutput watermarkOutput) {
>>>>          watermarkOutput.emitWatermark(new Watermark(eventTimestamp)); log.info("Emit Punctuated watermark: {}", eventTimestamp); }
>>>>
>>>>      @Override public void onPeriodicEmit(WatermarkOutput watermarkOutput) {
>>>>          // don't need to do anything because we emit in reaction to events 
>>>> above }
>>>> }
>>>>
>>>> 16:35:13,584 INFO 
>>>> org.myorg.quickstart.operator.PunctuatedTableOutputWatermarkGenerator 
>>>> - Emit Punctuated watermark: 1605054900905
>>>>
>>>> From the log, I can see, it extract the eventTimestamp and emits 
>>>> the watermark. Why i can't access this piece of information in 
>>>> processElement() function.
>>>>
>>>> Any suggestions? Thank you so much!
>>>>
>>>>
>>>> Best regards,
>>>>
>>>> Fuyao
>>>>
>>>>
>>>>
>>>> On 11/10/20 16:04, Kevin Kwon wrote:
>>>>> Hi Fuyao, I think you need to implement your own 
>>>>> /WatermarkStrategy/ class and register that to 
>>>>> /window/./assignTimestampsAndWatermarks(new 
>>>>> YourEventWatermarkStrategy)/
>>>>> /
>>>>> /
>>>>> Make sure you use KafkaConsumer's /assignTimestampsAndWatermarks 
>>>>> /if you're using Kafka consumers
>>>>> /
>>>>> /
>>>>> an example code for a booking event that has it's internal 
>>>>> timestamp would be
>>>>>
>>>>> public class BookingWatermarkStrategyimplements WatermarkStrategy<Booking> {
>>>>>
>>>>>    @Override public WatermarkGenerator<Booking>createWatermarkGenerator(
>>>>>        WatermarkGeneratorSupplier.Context context
>>>>>    ) {
>>>>>      return new WatermarkGenerator<Booking>() {
>>>>>        private final long OUT_OF_ORDERNESS_MILLIS =30; private long currentMaxTimestamp = Long.MIN_VALUE +OUT_OF_ORDERNESS_MILLIS +1; @Override public void onEvent(Booking bookingEvent, long eventTimestamp, WatermarkOutput output) {
>>>>>          currentMaxTimestamp = Math.max(currentMaxTimestamp, bookingEvent.getTimestamp()); Watermark watermark =new Watermark(currentMaxTimestamp -OUT_OF_ORDERNESS_MILLIS -1); output.emitWatermark(watermark); }
>>>>>
>>>>>        @Override public void onPeriodicEmit(WatermarkOutput output) {
>>>>>          // Do nothing since watermark will be emitted every event }
>>>>>      }; }
>>>>>
>>>>>    @Override public TimestampAssigner<Booking>createTimestampAssigner(
>>>>>        TimestampAssignerSupplier.Context context
>>>>>    ) {
>>>>>      return (booking, recordTimestamp) -> booking.getTimestamp(); }
>>>>> }
>>>>>
>>>>> On Wed, Nov 11, 2020 at 12:28 AM <fuyao.li@oracle.com 
>>>>> <ma...@oracle.com>> wrote:
>>>>>
>>>>>     Hi Experts,
>>>>>
>>>>>     I am trying to use to implement a KeyedProcessFunction with
>>>>>     onTimer()
>>>>>     callback. I need to use event time and I meet some problems
>>>>>     with making
>>>>>     the watermark available to my operator. I meet some strange
>>>>>     behaviors.
>>>>>
>>>>>     I have a joined retracted stream without watermark or timestamp
>>>>>     information and i need to assign timestamps and watermarks to
>>>>>     it. The
>>>>>     timestamp is just a field in the stream. For the watermark
>>>>>     generator part.
>>>>>
>>>>>     Problem:
>>>>>
>>>>>     1. I can use timelag watermark generator and make it work. But
>>>>>     for
>>>>>     BoundedOutofOrdernessGenator, The
>>>>>     context.timerService().currentWatermark() in ProcessElement()
>>>>>     always
>>>>>     sticks to the initial setup and never updates.
>>>>>
>>>>>     2. I set the autoWatermark interval to 5 seconds for debug
>>>>>     purpose, I
>>>>>     only attach this watermark generator in one place with
>>>>>     parallelism 1.
>>>>>     However, I am getting 8 records at a time. timelag policy will
>>>>>     advance
>>>>>     all 8 records, outOfOrderness policy will only advance 1
>>>>>     records. Maybe
>>>>>     the mismatch is causing the processElement() to capture the wrong
>>>>>     default watermark?
>>>>>
>>>>>     https://ci.apache.org/projects/flink/flink-docs-release-1.11/dev/event_timestamps_watermarks.html#writing-a-periodic-watermarkgenerator
>>>>>     <https://urldefense.com/v3/__https://ci.apache.org/projects/flink/flink-docs-release-1.11/dev/event_timestamps_watermarks.html*writing-a-periodic-watermarkgenerator__;Iw!!GqivPVa7Brio!IsqeiKPZU2RRj3KIBPhxTUiQ5GxzTU3oVWXMzZA1E3fghyT4OllCyHMAsIUclfM$>
>>>>>
>>>>>     This is my code for watermark generator:
>>>>>
>>>>>     @Slf4j
>>>>>     public class PeriodicTableOutputWatermarkGenerator implements
>>>>>     WatermarkGenerator<Tuple2<Boolean, Row>> {
>>>>>          private final long maxTimeLag = 15000;
>>>>>          private transient long currentMaxTimestamp = 15000;
>>>>>          @Override
>>>>>          public void onEvent(Tuple2<Boolean, Row>
>>>>>     booleanRowTuple2, long
>>>>>     eventTimestamp, WatermarkOutput output) {
>>>>>              // the eventTimestamp is get through TimestampAssigner
>>>>>              //
>>>>>     https://github.com/apache/flink/blob/f24cb3f3b7e773706188ae92998b3e1ffbf1829e/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/TimestampsAndWatermarksOperator.java
>>>>>     <https://urldefense.com/v3/__https://github.com/apache/flink/blob/f24cb3f3b7e773706188ae92998b3e1ffbf1829e/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/TimestampsAndWatermarksOperator.java__;!!GqivPVa7Brio!IsqeiKPZU2RRj3KIBPhxTUiQ5GxzTU3oVWXMzZA1E3fghyT4OllCyHMAQmdMGjw$>
>>>>>              currentMaxTimestamp = Math.max(eventTimestamp,
>>>>>     currentMaxTimestamp);
>>>>>     log.info
>>>>>     <https://urldefense.com/v3/__http://log.info__;!!GqivPVa7Brio!IsqeiKPZU2RRj3KIBPhxTUiQ5GxzTU3oVWXMzZA1E3fghyT4OllCyHMAGAShS_k$>("eventTimestamp
>>>>>     in onEvent method: {}", eventTimestamp);
>>>>>          }
>>>>>
>>>>>          @Override
>>>>>          public void onPeriodicEmit(WatermarkOutput output) {
>>>>>              // Policy 1: timelag strategy, can work and advance
>>>>>     the timestamp
>>>>>              long watermarkEpochTime =
>>>>>     Math.max(System.currentTimeMillis() -
>>>>>     maxTimeLag, currentMaxTimestamp);
>>>>>              output.emitWatermark(new Watermark(watermarkEpochTime));
>>>>>
>>>>>              // Policy 2: periodic emit based on event
>>>>>              long periodicEmitWatermarkTime = currentMaxTimestamp
>>>>>     - maxTimeLag;
>>>>>              // output.emitWatermark(new
>>>>>     Watermark(periodicEmitWatermarkTime));
>>>>>
>>>>>     log.info
>>>>>     <https://urldefense.com/v3/__http://log.info__;!!GqivPVa7Brio!IsqeiKPZU2RRj3KIBPhxTUiQ5GxzTU3oVWXMzZA1E3fghyT4OllCyHMAGAShS_k$>("Emit
>>>>>     Watermark: watermark based on system time: {},
>>>>>     periodicEmitWatermarkTime: {}, currentMaxTimestamp: {}"
>>>>>                      , watermarkEpochTime, periodicEmitWatermarkTime,
>>>>>     currentMaxTimestamp);
>>>>>          }
>>>>>     }
>>>>>
>>>>>
>>>>>     This is my log printed by the slf4j log above. Every time, it
>>>>>     will give
>>>>>     me 8 records, why it is 8 records? I think it should be 1 in
>>>>>     theory. I
>>>>>     am very confused. Also, the policy 1 is advancing all 8
>>>>>     records. Policy
>>>>>     2 is advancing 1 of the 8 records and not reflected in
>>>>>     processElement().
>>>>>
>>>>>     14:28:01,199 INFO
>>>>>     org.myorg.quickstart.operator.PeriodicTableOutputWatermarkGenerator
>>>>>     -
>>>>>     Emit Watermark: watermark based on system time: 1605047266198,
>>>>>     periodicEmitWatermarkTime: 0, currentMaxTimestamp: 15000
>>>>>     14:28:01,199 INFO
>>>>>     org.myorg.quickstart.operator.PeriodicTableOutputWatermarkGenerator
>>>>>     -
>>>>>     Emit Watermark: watermark based on system time: 1605047266199,
>>>>>     periodicEmitWatermarkTime: 1605047172881, currentMaxTimestamp:
>>>>>     1605047187881
>>>>>     14:28:01,199 INFO
>>>>>     org.myorg.quickstart.operator.PeriodicTableOutputWatermarkGenerator
>>>>>     -
>>>>>     Emit Watermark: watermark based on system time: 1605047266199,
>>>>>     periodicEmitWatermarkTime: 0, currentMaxTimestamp: 15000
>>>>>     14:28:01,199 INFO
>>>>>     org.myorg.quickstart.operator.PeriodicTableOutputWatermarkGenerator
>>>>>     -
>>>>>     Emit Watermark: watermark based on system time: 1605047266198,
>>>>>     periodicEmitWatermarkTime: 0, currentMaxTimestamp: 15000
>>>>>     14:28:01,199 INFO
>>>>>     org.myorg.quickstart.operator.PeriodicTableOutputWatermarkGenerator
>>>>>     -
>>>>>     Emit Watermark: watermark based on system time: 1605047266198,
>>>>>     periodicEmitWatermarkTime: 0, currentMaxTimestamp: 15000
>>>>>     14:28:01,199 INFO
>>>>>     org.myorg.quickstart.operator.PeriodicTableOutputWatermarkGenerator
>>>>>     -
>>>>>     Emit Watermark: watermark based on system time: 1605047266198,
>>>>>     periodicEmitWatermarkTime: 0, currentMaxTimestamp: 15000
>>>>>     14:28:01,199 INFO
>>>>>     org.myorg.quickstart.operator.PeriodicTableOutputWatermarkGenerator
>>>>>     -
>>>>>     Emit Watermark: watermark based on system time: 1605047266198,
>>>>>     periodicEmitWatermarkTime: 0, currentMaxTimestamp: 15000
>>>>>     14:28:01,199 INFO
>>>>>     org.myorg.quickstart.operator.PeriodicTableOutputWatermarkGenerator
>>>>>     -
>>>>>     Emit Watermark: watermark based on system time: 1605047266198,
>>>>>     periodicEmitWatermarkTime: 0, currentMaxTimestamp: 15000
>>>>>
>>>>>     14:28:06,200 INFO
>>>>>     org.myorg.quickstart.operator.PeriodicTableOutputWatermarkGenerator
>>>>>     -
>>>>>     Emit Watermark: watermark based on system time: 1605047271200,
>>>>>     periodicEmitWatermarkTime: 0, currentMaxTimestamp: 15000
>>>>>     14:28:06,200 INFO
>>>>>     org.myorg.quickstart.operator.PeriodicTableOutputWatermarkGenerator
>>>>>     -
>>>>>     Emit Watermark: watermark based on system time: 1605047271200,
>>>>>     periodicEmitWatermarkTime: 0, currentMaxTimestamp: 15000
>>>>>     14:28:06,200 INFO
>>>>>     org.myorg.quickstart.operator.PeriodicTableOutputWatermarkGenerator
>>>>>     -
>>>>>     Emit Watermark: watermark based on system time: 1605047271200,
>>>>>     periodicEmitWatermarkTime: 1605047172881, currentMaxTimestamp:
>>>>>     1605047187881
>>>>>     14:28:06,200 INFO
>>>>>     org.myorg.quickstart.operator.PeriodicTableOutputWatermarkGenerator
>>>>>     -
>>>>>     Emit Watermark: watermark based on system time: 1605047271200,
>>>>>     periodicEmitWatermarkTime: 0, currentMaxTimestamp: 15000
>>>>>     14:28:06,200 INFO
>>>>>     org.myorg.quickstart.operator.PeriodicTableOutputWatermarkGenerator
>>>>>     -
>>>>>     Emit Watermark: watermark based on system time: 1605047271200,
>>>>>     periodicEmitWatermarkTime: 0, currentMaxTimestamp: 15000
>>>>>     14:28:06,200 INFO
>>>>>     org.myorg.quickstart.operator.PeriodicTableOutputWatermarkGenerator
>>>>>     -
>>>>>     Emit Watermark: watermark based on system time: 1605047271200,
>>>>>     periodicEmitWatermarkTime: 0, currentMaxTimestamp: 15000
>>>>>     14:28:06,200 INFO
>>>>>     org.myorg.quickstart.operator.PeriodicTableOutputWatermarkGenerator
>>>>>     -
>>>>>     Emit Watermark: watermark based on system time: 1605047271200,
>>>>>     periodicEmitWatermarkTime: 0, currentMaxTimestamp: 15000
>>>>>     14:28:06,200 INFO
>>>>>     org.myorg.quickstart.operator.PeriodicTableOutputWatermarkGenerator
>>>>>     -
>>>>>     Emit Watermark: watermark based on system time: 1605047271200,
>>>>>     periodicEmitWatermarkTime: 0, currentMaxTimestamp: 15000
>>>>>
>>>>>
>>>>>     Any insights? Thank you very much!
>>>>>
>>>>>
>>>>>     Best,
>>>>>
>>>>>     Fuyao
>>>>>

Re: BoundedOutOfOrderness Watermark Generator is NOT making the event time to advance

Posted by fu...@oracle.com.
Hi All,

Just to add a little more context to the problem. I have a full outer 
join operation before this stage. The source data stream for full outer 
join is a Kafka Source. I also added timestamp and watermarks to the 
FlinkKafkaConsumer. After that, it makes no difference to the result, 
still can not make the watermark to advance.

overall workflow:

two kafka topics -> two data streams in Flink -> join them together and 
convert to retract stream -> do KeyedProcessFunction and schedule event 
time timer and onTimer logic in it -> push to downstream sink.

I think there is no issues with my Syntax. But I still could NOT make 
the watermark to advance for event time using bound out of orderness 
strategy. (In Flink Cluster, the behavior is different, the watermark is 
advancing, but onTimer is still not triggered correctly. :(

I guess the reason is that I receive 8 records for each round of 
onPeriodicEmit(), only one of the eight is updated for 
BoundedOutOfOrderness Strategy. For timelag strategy (refer to the first 
email in the thread), they are all updated so that it will make the 
watermark to advance. I just don't know why I got 8 records every time 
even if I have parallelism as 1. (logs can be found in the first email 
in the thread.)

I also tried to debug inside Flink web interface based on the link: 
https://ci.apache.org/projects/flink/flink-docs-release-1.11/monitoring/debugging_event_time.html

The logs produced by Flink local cluster is different from directly 
starting my application. *Why the behavior is inconsistent...? *The 
context timestamp sticks to LONG.MIN_VALUE during IDE debug, however, it 
is updated correctly in the Flink Cluster, except for the first record 
to be the the default value. But, I am still not getting the scheduled 
logic triggered correctly inside the onTimer method. My test workflow 
can be seen in the attachment. I have read through previous archives 
about the not updated watermark (stick to LONG.MIN_VALUE), it doesn't 
help much in my case. Thanks.


Best,

Fuyao



On 11/11/20 11:33, fuyao.li@oracle.com wrote:
>
> Hi Community,
>
>
> Regarding this problem, could someone give me an explanation? Thanks.
>
> Best,
>
> Fuyao
>
> On 11/10/20 16:56, fuyao.li@oracle.com wrote:
>>
>> Hi Kevin,
>>
>> Sorry for the name typo...
>>
>> On 11/10/20 16:48, fuyao.li@oracle.com wrote:
>>>
>>> Hi Kavin,
>>>
>>> Thanks for your example. I think I have already done something very 
>>> very similar before. I didn't post the full WatermarkStrategy 
>>> interface in my previous email, but I do have that part already. I 
>>> think the example you gave me is a punctuatedWatermarkStrategy, not 
>>> boundoutoforderness one. My major concern now is that why my emitted 
>>> watermark is not available in processElement() and why I have 8 
>>> records for each time the code reaches the onPeriodicEmit part. I 
>>> will post my code following your example below.
>>>
>>> The symptom is that I will get the context watermark as 
>>> LONG.MIN_VALUE if I use the watermark strategy below.
>>>
>>> 16:35:12,969 INFO 
>>> org.myorg.quickstart.processor.TableOutputProcessFunction - context 
>>> current key: 69215, context current watermark: -9223372036854775808
>>>
>>>
>>> DataStream<Tuple2<Boolean, Row>> retractStream =tEnv.toRetractStream(table, Row.class);
>>> retractStream
>>>      .assignTimestampsAndWatermarks(new PunctuatedWatermarkStrategy())
>>>      .keyBy(
>>>          value -> {String invoice_id_key = (String) value.f1.getField(0); if (invoice_id_key ==null) {
>>>              invoice_id_key = (String) value.f1.getField(4); }
>>>            return invoice_id_key; })
>>>      .process(new TableOutputProcessFunction())
>>>      .name("ProcessTableOutput")
>>>      .uid("ProcessTableOutput")
>>>      .addSink(businessObjectSink)
>>>      .name("businessObjectSink")
>>>      .uid("businessObjectSink")
>>>      .setParallelism(1);
>>>
>>> watermark strategy:
>>>
>>> public class PunctuatedWatermarkStrategyimplements WatermarkStrategy<Tuple2<Boolean, Row>> {
>>>      @Override public WatermarkGenerator<Tuple2<Boolean, Row>>createWatermarkGenerator(WatermarkGeneratorSupplier.Context context) {
>>>          return new PunctuatedTableOutputWatermarkGenerator(); }
>>>
>>>      @Override public TimestampAssigner<Tuple2<Boolean, Row>>createTimestampAssigner(TimestampAssignerSupplier.Context context) {
>>>          log.info("Inside timestamp assigner"); return (booleanRowTuple2, previousElementTimestamp) -> {
>>>              return my timestamp; }; }
>>> }
>>>
>>> watermark generator code:
>>>
>>> public class PunctuatedTableOutputWatermarkGeneratorimplements WatermarkGenerator<Tuple2<Boolean, Row>> {
>>>      @Override public void onEvent(Tuple2<Boolean, Row> booleanRowTuple2, long eventTimestamp, WatermarkOutput watermarkOutput) {
>>>          watermarkOutput.emitWatermark(new Watermark(eventTimestamp)); log.info("Emit Punctuated watermark: {}", eventTimestamp); }
>>>
>>>      @Override public void onPeriodicEmit(WatermarkOutput watermarkOutput) {
>>>          // don't need to do anything because we emit in reaction to events 
>>> above }
>>> }
>>>
>>> 16:35:13,584 INFO 
>>> org.myorg.quickstart.operator.PunctuatedTableOutputWatermarkGenerator 
>>> - Emit Punctuated watermark: 1605054900905
>>>
>>> From the log, I can see, it extract the eventTimestamp and emits the 
>>> watermark. Why i can't access this piece of information in 
>>> processElement() function.
>>>
>>> Any suggestions? Thank you so much!
>>>
>>>
>>> Best regards,
>>>
>>> Fuyao
>>>
>>>
>>>
>>> On 11/10/20 16:04, Kevin Kwon wrote:
>>>> Hi Fuyao, I think you need to implement your own 
>>>> /WatermarkStrategy/ class and register that to 
>>>> /window/./assignTimestampsAndWatermarks(new 
>>>> YourEventWatermarkStrategy)/
>>>> /
>>>> /
>>>> Make sure you use KafkaConsumer's /assignTimestampsAndWatermarks 
>>>> /if you're using Kafka consumers
>>>> /
>>>> /
>>>> an example code for a booking event that has it's internal 
>>>> timestamp would be
>>>>
>>>> public class BookingWatermarkStrategyimplements WatermarkStrategy<Booking> {
>>>>
>>>>    @Override public WatermarkGenerator<Booking>createWatermarkGenerator(
>>>>        WatermarkGeneratorSupplier.Context context
>>>>    ) {
>>>>      return new WatermarkGenerator<Booking>() {
>>>>        private final long OUT_OF_ORDERNESS_MILLIS =30; private long currentMaxTimestamp = Long.MIN_VALUE +OUT_OF_ORDERNESS_MILLIS +1; @Override public void onEvent(Booking bookingEvent, long eventTimestamp, WatermarkOutput output) {
>>>>          currentMaxTimestamp = Math.max(currentMaxTimestamp, bookingEvent.getTimestamp()); Watermark watermark =new Watermark(currentMaxTimestamp -OUT_OF_ORDERNESS_MILLIS -1); output.emitWatermark(watermark); }
>>>>
>>>>        @Override public void onPeriodicEmit(WatermarkOutput output) {
>>>>          // Do nothing since watermark will be emitted every event }
>>>>      }; }
>>>>
>>>>    @Override public TimestampAssigner<Booking>createTimestampAssigner(
>>>>        TimestampAssignerSupplier.Context context
>>>>    ) {
>>>>      return (booking, recordTimestamp) -> booking.getTimestamp(); }
>>>> }
>>>>
>>>> On Wed, Nov 11, 2020 at 12:28 AM <fuyao.li@oracle.com 
>>>> <ma...@oracle.com>> wrote:
>>>>
>>>>     Hi Experts,
>>>>
>>>>     I am trying to use to implement a KeyedProcessFunction with
>>>>     onTimer()
>>>>     callback. I need to use event time and I meet some problems
>>>>     with making
>>>>     the watermark available to my operator. I meet some strange
>>>>     behaviors.
>>>>
>>>>     I have a joined retracted stream without watermark or timestamp
>>>>     information and i need to assign timestamps and watermarks to
>>>>     it. The
>>>>     timestamp is just a field in the stream. For the watermark
>>>>     generator part.
>>>>
>>>>     Problem:
>>>>
>>>>     1. I can use timelag watermark generator and make it work. But for
>>>>     BoundedOutofOrdernessGenator, The
>>>>     context.timerService().currentWatermark() in ProcessElement()
>>>>     always
>>>>     sticks to the initial setup and never updates.
>>>>
>>>>     2. I set the autoWatermark interval to 5 seconds for debug
>>>>     purpose, I
>>>>     only attach this watermark generator in one place with
>>>>     parallelism 1.
>>>>     However, I am getting 8 records at a time. timelag policy will
>>>>     advance
>>>>     all 8 records, outOfOrderness policy will only advance 1
>>>>     records. Maybe
>>>>     the mismatch is causing the processElement() to capture the wrong
>>>>     default watermark?
>>>>
>>>>     https://ci.apache.org/projects/flink/flink-docs-release-1.11/dev/event_timestamps_watermarks.html#writing-a-periodic-watermarkgenerator
>>>>     <https://urldefense.com/v3/__https://ci.apache.org/projects/flink/flink-docs-release-1.11/dev/event_timestamps_watermarks.html*writing-a-periodic-watermarkgenerator__;Iw!!GqivPVa7Brio!IsqeiKPZU2RRj3KIBPhxTUiQ5GxzTU3oVWXMzZA1E3fghyT4OllCyHMAsIUclfM$>
>>>>
>>>>     This is my code for watermark generator:
>>>>
>>>>     @Slf4j
>>>>     public class PeriodicTableOutputWatermarkGenerator implements
>>>>     WatermarkGenerator<Tuple2<Boolean, Row>> {
>>>>          private final long maxTimeLag = 15000;
>>>>          private transient long currentMaxTimestamp = 15000;
>>>>          @Override
>>>>          public void onEvent(Tuple2<Boolean, Row> booleanRowTuple2,
>>>>     long
>>>>     eventTimestamp, WatermarkOutput output) {
>>>>              // the eventTimestamp is get through TimestampAssigner
>>>>              //
>>>>     https://github.com/apache/flink/blob/f24cb3f3b7e773706188ae92998b3e1ffbf1829e/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/TimestampsAndWatermarksOperator.java
>>>>     <https://urldefense.com/v3/__https://github.com/apache/flink/blob/f24cb3f3b7e773706188ae92998b3e1ffbf1829e/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/TimestampsAndWatermarksOperator.java__;!!GqivPVa7Brio!IsqeiKPZU2RRj3KIBPhxTUiQ5GxzTU3oVWXMzZA1E3fghyT4OllCyHMAQmdMGjw$>
>>>>              currentMaxTimestamp = Math.max(eventTimestamp,
>>>>     currentMaxTimestamp);
>>>>     log.info
>>>>     <https://urldefense.com/v3/__http://log.info__;!!GqivPVa7Brio!IsqeiKPZU2RRj3KIBPhxTUiQ5GxzTU3oVWXMzZA1E3fghyT4OllCyHMAGAShS_k$>("eventTimestamp
>>>>     in onEvent method: {}", eventTimestamp);
>>>>          }
>>>>
>>>>          @Override
>>>>          public void onPeriodicEmit(WatermarkOutput output) {
>>>>              // Policy 1: timelag strategy, can work and advance
>>>>     the timestamp
>>>>              long watermarkEpochTime =
>>>>     Math.max(System.currentTimeMillis() -
>>>>     maxTimeLag, currentMaxTimestamp);
>>>>              output.emitWatermark(new Watermark(watermarkEpochTime));
>>>>
>>>>              // Policy 2: periodic emit based on event
>>>>              long periodicEmitWatermarkTime = currentMaxTimestamp -
>>>>     maxTimeLag;
>>>>              // output.emitWatermark(new
>>>>     Watermark(periodicEmitWatermarkTime));
>>>>
>>>>     log.info
>>>>     <https://urldefense.com/v3/__http://log.info__;!!GqivPVa7Brio!IsqeiKPZU2RRj3KIBPhxTUiQ5GxzTU3oVWXMzZA1E3fghyT4OllCyHMAGAShS_k$>("Emit
>>>>     Watermark: watermark based on system time: {},
>>>>     periodicEmitWatermarkTime: {}, currentMaxTimestamp: {}"
>>>>                      , watermarkEpochTime, periodicEmitWatermarkTime,
>>>>     currentMaxTimestamp);
>>>>          }
>>>>     }
>>>>
>>>>
>>>>     This is my log printed by the slf4j log above. Every time, it
>>>>     will give
>>>>     me 8 records, why it is 8 records? I think it should be 1 in
>>>>     theory. I
>>>>     am very confused. Also, the policy 1 is advancing all 8
>>>>     records. Policy
>>>>     2 is advancing 1 of the 8 records and not reflected in
>>>>     processElement().
>>>>
>>>>     14:28:01,199 INFO
>>>>     org.myorg.quickstart.operator.PeriodicTableOutputWatermarkGenerator
>>>>     -
>>>>     Emit Watermark: watermark based on system time: 1605047266198,
>>>>     periodicEmitWatermarkTime: 0, currentMaxTimestamp: 15000
>>>>     14:28:01,199 INFO
>>>>     org.myorg.quickstart.operator.PeriodicTableOutputWatermarkGenerator
>>>>     -
>>>>     Emit Watermark: watermark based on system time: 1605047266199,
>>>>     periodicEmitWatermarkTime: 1605047172881, currentMaxTimestamp:
>>>>     1605047187881
>>>>     14:28:01,199 INFO
>>>>     org.myorg.quickstart.operator.PeriodicTableOutputWatermarkGenerator
>>>>     -
>>>>     Emit Watermark: watermark based on system time: 1605047266199,
>>>>     periodicEmitWatermarkTime: 0, currentMaxTimestamp: 15000
>>>>     14:28:01,199 INFO
>>>>     org.myorg.quickstart.operator.PeriodicTableOutputWatermarkGenerator
>>>>     -
>>>>     Emit Watermark: watermark based on system time: 1605047266198,
>>>>     periodicEmitWatermarkTime: 0, currentMaxTimestamp: 15000
>>>>     14:28:01,199 INFO
>>>>     org.myorg.quickstart.operator.PeriodicTableOutputWatermarkGenerator
>>>>     -
>>>>     Emit Watermark: watermark based on system time: 1605047266198,
>>>>     periodicEmitWatermarkTime: 0, currentMaxTimestamp: 15000
>>>>     14:28:01,199 INFO
>>>>     org.myorg.quickstart.operator.PeriodicTableOutputWatermarkGenerator
>>>>     -
>>>>     Emit Watermark: watermark based on system time: 1605047266198,
>>>>     periodicEmitWatermarkTime: 0, currentMaxTimestamp: 15000
>>>>     14:28:01,199 INFO
>>>>     org.myorg.quickstart.operator.PeriodicTableOutputWatermarkGenerator
>>>>     -
>>>>     Emit Watermark: watermark based on system time: 1605047266198,
>>>>     periodicEmitWatermarkTime: 0, currentMaxTimestamp: 15000
>>>>     14:28:01,199 INFO
>>>>     org.myorg.quickstart.operator.PeriodicTableOutputWatermarkGenerator
>>>>     -
>>>>     Emit Watermark: watermark based on system time: 1605047266198,
>>>>     periodicEmitWatermarkTime: 0, currentMaxTimestamp: 15000
>>>>
>>>>     14:28:06,200 INFO
>>>>     org.myorg.quickstart.operator.PeriodicTableOutputWatermarkGenerator
>>>>     -
>>>>     Emit Watermark: watermark based on system time: 1605047271200,
>>>>     periodicEmitWatermarkTime: 0, currentMaxTimestamp: 15000
>>>>     14:28:06,200 INFO
>>>>     org.myorg.quickstart.operator.PeriodicTableOutputWatermarkGenerator
>>>>     -
>>>>     Emit Watermark: watermark based on system time: 1605047271200,
>>>>     periodicEmitWatermarkTime: 0, currentMaxTimestamp: 15000
>>>>     14:28:06,200 INFO
>>>>     org.myorg.quickstart.operator.PeriodicTableOutputWatermarkGenerator
>>>>     -
>>>>     Emit Watermark: watermark based on system time: 1605047271200,
>>>>     periodicEmitWatermarkTime: 1605047172881, currentMaxTimestamp:
>>>>     1605047187881
>>>>     14:28:06,200 INFO
>>>>     org.myorg.quickstart.operator.PeriodicTableOutputWatermarkGenerator
>>>>     -
>>>>     Emit Watermark: watermark based on system time: 1605047271200,
>>>>     periodicEmitWatermarkTime: 0, currentMaxTimestamp: 15000
>>>>     14:28:06,200 INFO
>>>>     org.myorg.quickstart.operator.PeriodicTableOutputWatermarkGenerator
>>>>     -
>>>>     Emit Watermark: watermark based on system time: 1605047271200,
>>>>     periodicEmitWatermarkTime: 0, currentMaxTimestamp: 15000
>>>>     14:28:06,200 INFO
>>>>     org.myorg.quickstart.operator.PeriodicTableOutputWatermarkGenerator
>>>>     -
>>>>     Emit Watermark: watermark based on system time: 1605047271200,
>>>>     periodicEmitWatermarkTime: 0, currentMaxTimestamp: 15000
>>>>     14:28:06,200 INFO
>>>>     org.myorg.quickstart.operator.PeriodicTableOutputWatermarkGenerator
>>>>     -
>>>>     Emit Watermark: watermark based on system time: 1605047271200,
>>>>     periodicEmitWatermarkTime: 0, currentMaxTimestamp: 15000
>>>>     14:28:06,200 INFO
>>>>     org.myorg.quickstart.operator.PeriodicTableOutputWatermarkGenerator
>>>>     -
>>>>     Emit Watermark: watermark based on system time: 1605047271200,
>>>>     periodicEmitWatermarkTime: 0, currentMaxTimestamp: 15000
>>>>
>>>>
>>>>     Any insights? Thank you very much!
>>>>
>>>>
>>>>     Best,
>>>>
>>>>     Fuyao
>>>>

Re: BoundedOutOfOrderness Watermark Generator is NOT making the event time to advance

Posted by fu...@oracle.com.
Hi Community,


Regarding this problem, could someone give me an explanation? Thanks.

Best,

Fuyao

On 11/10/20 16:56, fuyao.li@oracle.com wrote:
>
> Hi Kevin,
>
> Sorry for the name typo...
>
> On 11/10/20 16:48, fuyao.li@oracle.com wrote:
>>
>> Hi Kavin,
>>
>> Thanks for your example. I think I have already done something very 
>> very similar before. I didn't post the full WatermarkStrategy 
>> interface in my previous email, but I do have that part already. I 
>> think the example you gave me is a punctuatedWatermarkStrategy, not 
>> boundoutoforderness one. My major concern now is that why my emitted 
>> watermark is not available in processElement() and why I have 8 
>> records for each time the code reaches the onPeriodicEmit part. I 
>> will post my code following your example below.
>>
>> The symptom is that I will get the context watermark as 
>> LONG.MIN_VALUE if I use the watermark strategy below.
>>
>> 16:35:12,969 INFO 
>> org.myorg.quickstart.processor.TableOutputProcessFunction - context 
>> current key: 69215, context current watermark: -9223372036854775808
>>
>>
>> DataStream<Tuple2<Boolean, Row>> retractStream =tEnv.toRetractStream(table, Row.class);
>> retractStream
>>      .assignTimestampsAndWatermarks(new PunctuatedWatermarkStrategy())
>>      .keyBy(
>>          value -> {String invoice_id_key = (String) value.f1.getField(0); if (invoice_id_key ==null) {
>>              invoice_id_key = (String) value.f1.getField(4); }
>>            return invoice_id_key; })
>>      .process(new TableOutputProcessFunction())
>>      .name("ProcessTableOutput")
>>      .uid("ProcessTableOutput")
>>      .addSink(businessObjectSink)
>>      .name("businessObjectSink")
>>      .uid("businessObjectSink")
>>      .setParallelism(1);
>>
>> watermark strategy:
>>
>> public class PunctuatedWatermarkStrategyimplements WatermarkStrategy<Tuple2<Boolean, Row>> {
>>      @Override public WatermarkGenerator<Tuple2<Boolean, Row>>createWatermarkGenerator(WatermarkGeneratorSupplier.Context context) {
>>          return new PunctuatedTableOutputWatermarkGenerator(); }
>>
>>      @Override public TimestampAssigner<Tuple2<Boolean, Row>>createTimestampAssigner(TimestampAssignerSupplier.Context context) {
>>          log.info("Inside timestamp assigner"); return (booleanRowTuple2, previousElementTimestamp) -> {
>>              return my timestamp; }; }
>> }
>>
>> watermark generator code:
>>
>> public class PunctuatedTableOutputWatermarkGeneratorimplements WatermarkGenerator<Tuple2<Boolean, Row>> {
>>      @Override public void onEvent(Tuple2<Boolean, Row> booleanRowTuple2, long eventTimestamp, WatermarkOutput watermarkOutput) {
>>          watermarkOutput.emitWatermark(new Watermark(eventTimestamp)); log.info("Emit Punctuated watermark: {}", eventTimestamp); }
>>
>>      @Override public void onPeriodicEmit(WatermarkOutput watermarkOutput) {
>>          // don't need to do anything because we emit in reaction to events above }
>> }
>>
>> 16:35:13,584 INFO 
>> org.myorg.quickstart.operator.PunctuatedTableOutputWatermarkGenerator 
>> - Emit Punctuated watermark: 1605054900905
>>
>> From the log, I can see, it extract the eventTimestamp and emits the 
>> watermark. Why i can't access this piece of information in 
>> processElement() function.
>>
>> Any suggestions? Thank you so much!
>>
>>
>> Best regards,
>>
>> Fuyao
>>
>>
>>
>> On 11/10/20 16:04, Kevin Kwon wrote:
>>> Hi Fuyao, I think you need to implement your own /WatermarkStrategy/ 
>>> class and register that to 
>>> /window/./assignTimestampsAndWatermarks(new 
>>> YourEventWatermarkStrategy)/
>>> /
>>> /
>>> Make sure you use KafkaConsumer's /assignTimestampsAndWatermarks /if 
>>> you're using Kafka consumers
>>> /
>>> /
>>> an example code for a booking event that has it's internal timestamp 
>>> would be
>>>
>>> public class BookingWatermarkStrategyimplements WatermarkStrategy<Booking> {
>>>
>>>    @Override public WatermarkGenerator<Booking>createWatermarkGenerator(
>>>        WatermarkGeneratorSupplier.Context context
>>>    ) {
>>>      return new WatermarkGenerator<Booking>() {
>>>        private final long OUT_OF_ORDERNESS_MILLIS =30; private long currentMaxTimestamp = Long.MIN_VALUE +OUT_OF_ORDERNESS_MILLIS +1; @Override public void onEvent(Booking bookingEvent, long eventTimestamp, WatermarkOutput output) {
>>>          currentMaxTimestamp = Math.max(currentMaxTimestamp, bookingEvent.getTimestamp()); Watermark watermark =new Watermark(currentMaxTimestamp -OUT_OF_ORDERNESS_MILLIS -1); output.emitWatermark(watermark); }
>>>
>>>        @Override public void onPeriodicEmit(WatermarkOutput output) {
>>>          // Do nothing since watermark will be emitted every event }
>>>      }; }
>>>
>>>    @Override public TimestampAssigner<Booking>createTimestampAssigner(
>>>        TimestampAssignerSupplier.Context context
>>>    ) {
>>>      return (booking, recordTimestamp) -> booking.getTimestamp(); }
>>> }
>>>
>>> On Wed, Nov 11, 2020 at 12:28 AM <fuyao.li@oracle.com 
>>> <ma...@oracle.com>> wrote:
>>>
>>>     Hi Experts,
>>>
>>>     I am trying to use to implement a KeyedProcessFunction with
>>>     onTimer()
>>>     callback. I need to use event time and I meet some problems with
>>>     making
>>>     the watermark available to my operator. I meet some strange
>>>     behaviors.
>>>
>>>     I have a joined retracted stream without watermark or timestamp
>>>     information and i need to assign timestamps and watermarks to
>>>     it. The
>>>     timestamp is just a field in the stream. For the watermark
>>>     generator part.
>>>
>>>     Problem:
>>>
>>>     1. I can use timelag watermark generator and make it work. But for
>>>     BoundedOutofOrdernessGenator, The
>>>     context.timerService().currentWatermark() in ProcessElement()
>>>     always
>>>     sticks to the initial setup and never updates.
>>>
>>>     2. I set the autoWatermark interval to 5 seconds for debug
>>>     purpose, I
>>>     only attach this watermark generator in one place with
>>>     parallelism 1.
>>>     However, I am getting 8 records at a time. timelag policy will
>>>     advance
>>>     all 8 records, outOfOrderness policy will only advance 1
>>>     records. Maybe
>>>     the mismatch is causing the processElement() to capture the wrong
>>>     default watermark?
>>>
>>>     https://ci.apache.org/projects/flink/flink-docs-release-1.11/dev/event_timestamps_watermarks.html#writing-a-periodic-watermarkgenerator
>>>     <https://urldefense.com/v3/__https://ci.apache.org/projects/flink/flink-docs-release-1.11/dev/event_timestamps_watermarks.html*writing-a-periodic-watermarkgenerator__;Iw!!GqivPVa7Brio!IsqeiKPZU2RRj3KIBPhxTUiQ5GxzTU3oVWXMzZA1E3fghyT4OllCyHMAsIUclfM$>
>>>
>>>     This is my code for watermark generator:
>>>
>>>     @Slf4j
>>>     public class PeriodicTableOutputWatermarkGenerator implements
>>>     WatermarkGenerator<Tuple2<Boolean, Row>> {
>>>          private final long maxTimeLag = 15000;
>>>          private transient long currentMaxTimestamp = 15000;
>>>          @Override
>>>          public void onEvent(Tuple2<Boolean, Row> booleanRowTuple2,
>>>     long
>>>     eventTimestamp, WatermarkOutput output) {
>>>              // the eventTimestamp is get through TimestampAssigner
>>>              //
>>>     https://github.com/apache/flink/blob/f24cb3f3b7e773706188ae92998b3e1ffbf1829e/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/TimestampsAndWatermarksOperator.java
>>>     <https://urldefense.com/v3/__https://github.com/apache/flink/blob/f24cb3f3b7e773706188ae92998b3e1ffbf1829e/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/TimestampsAndWatermarksOperator.java__;!!GqivPVa7Brio!IsqeiKPZU2RRj3KIBPhxTUiQ5GxzTU3oVWXMzZA1E3fghyT4OllCyHMAQmdMGjw$>
>>>              currentMaxTimestamp = Math.max(eventTimestamp,
>>>     currentMaxTimestamp);
>>>     log.info
>>>     <https://urldefense.com/v3/__http://log.info__;!!GqivPVa7Brio!IsqeiKPZU2RRj3KIBPhxTUiQ5GxzTU3oVWXMzZA1E3fghyT4OllCyHMAGAShS_k$>("eventTimestamp
>>>     in onEvent method: {}", eventTimestamp);
>>>          }
>>>
>>>          @Override
>>>          public void onPeriodicEmit(WatermarkOutput output) {
>>>              // Policy 1: timelag strategy, can work and advance the
>>>     timestamp
>>>              long watermarkEpochTime =
>>>     Math.max(System.currentTimeMillis() -
>>>     maxTimeLag, currentMaxTimestamp);
>>>              output.emitWatermark(new Watermark(watermarkEpochTime));
>>>
>>>              // Policy 2: periodic emit based on event
>>>              long periodicEmitWatermarkTime = currentMaxTimestamp -
>>>     maxTimeLag;
>>>              // output.emitWatermark(new
>>>     Watermark(periodicEmitWatermarkTime));
>>>
>>>     log.info
>>>     <https://urldefense.com/v3/__http://log.info__;!!GqivPVa7Brio!IsqeiKPZU2RRj3KIBPhxTUiQ5GxzTU3oVWXMzZA1E3fghyT4OllCyHMAGAShS_k$>("Emit
>>>     Watermark: watermark based on system time: {},
>>>     periodicEmitWatermarkTime: {}, currentMaxTimestamp: {}"
>>>                      , watermarkEpochTime, periodicEmitWatermarkTime,
>>>     currentMaxTimestamp);
>>>          }
>>>     }
>>>
>>>
>>>     This is my log printed by the slf4j log above. Every time, it
>>>     will give
>>>     me 8 records, why it is 8 records? I think it should be 1 in
>>>     theory. I
>>>     am very confused. Also, the policy 1 is advancing all 8 records.
>>>     Policy
>>>     2 is advancing 1 of the 8 records and not reflected in
>>>     processElement().
>>>
>>>     14:28:01,199 INFO
>>>     org.myorg.quickstart.operator.PeriodicTableOutputWatermarkGenerator
>>>     -
>>>     Emit Watermark: watermark based on system time: 1605047266198,
>>>     periodicEmitWatermarkTime: 0, currentMaxTimestamp: 15000
>>>     14:28:01,199 INFO
>>>     org.myorg.quickstart.operator.PeriodicTableOutputWatermarkGenerator
>>>     -
>>>     Emit Watermark: watermark based on system time: 1605047266199,
>>>     periodicEmitWatermarkTime: 1605047172881, currentMaxTimestamp:
>>>     1605047187881
>>>     14:28:01,199 INFO
>>>     org.myorg.quickstart.operator.PeriodicTableOutputWatermarkGenerator
>>>     -
>>>     Emit Watermark: watermark based on system time: 1605047266199,
>>>     periodicEmitWatermarkTime: 0, currentMaxTimestamp: 15000
>>>     14:28:01,199 INFO
>>>     org.myorg.quickstart.operator.PeriodicTableOutputWatermarkGenerator
>>>     -
>>>     Emit Watermark: watermark based on system time: 1605047266198,
>>>     periodicEmitWatermarkTime: 0, currentMaxTimestamp: 15000
>>>     14:28:01,199 INFO
>>>     org.myorg.quickstart.operator.PeriodicTableOutputWatermarkGenerator
>>>     -
>>>     Emit Watermark: watermark based on system time: 1605047266198,
>>>     periodicEmitWatermarkTime: 0, currentMaxTimestamp: 15000
>>>     14:28:01,199 INFO
>>>     org.myorg.quickstart.operator.PeriodicTableOutputWatermarkGenerator
>>>     -
>>>     Emit Watermark: watermark based on system time: 1605047266198,
>>>     periodicEmitWatermarkTime: 0, currentMaxTimestamp: 15000
>>>     14:28:01,199 INFO
>>>     org.myorg.quickstart.operator.PeriodicTableOutputWatermarkGenerator
>>>     -
>>>     Emit Watermark: watermark based on system time: 1605047266198,
>>>     periodicEmitWatermarkTime: 0, currentMaxTimestamp: 15000
>>>     14:28:01,199 INFO
>>>     org.myorg.quickstart.operator.PeriodicTableOutputWatermarkGenerator
>>>     -
>>>     Emit Watermark: watermark based on system time: 1605047266198,
>>>     periodicEmitWatermarkTime: 0, currentMaxTimestamp: 15000
>>>
>>>     14:28:06,200 INFO
>>>     org.myorg.quickstart.operator.PeriodicTableOutputWatermarkGenerator
>>>     -
>>>     Emit Watermark: watermark based on system time: 1605047271200,
>>>     periodicEmitWatermarkTime: 0, currentMaxTimestamp: 15000
>>>     14:28:06,200 INFO
>>>     org.myorg.quickstart.operator.PeriodicTableOutputWatermarkGenerator
>>>     -
>>>     Emit Watermark: watermark based on system time: 1605047271200,
>>>     periodicEmitWatermarkTime: 0, currentMaxTimestamp: 15000
>>>     14:28:06,200 INFO
>>>     org.myorg.quickstart.operator.PeriodicTableOutputWatermarkGenerator
>>>     -
>>>     Emit Watermark: watermark based on system time: 1605047271200,
>>>     periodicEmitWatermarkTime: 1605047172881, currentMaxTimestamp:
>>>     1605047187881
>>>     14:28:06,200 INFO
>>>     org.myorg.quickstart.operator.PeriodicTableOutputWatermarkGenerator
>>>     -
>>>     Emit Watermark: watermark based on system time: 1605047271200,
>>>     periodicEmitWatermarkTime: 0, currentMaxTimestamp: 15000
>>>     14:28:06,200 INFO
>>>     org.myorg.quickstart.operator.PeriodicTableOutputWatermarkGenerator
>>>     -
>>>     Emit Watermark: watermark based on system time: 1605047271200,
>>>     periodicEmitWatermarkTime: 0, currentMaxTimestamp: 15000
>>>     14:28:06,200 INFO
>>>     org.myorg.quickstart.operator.PeriodicTableOutputWatermarkGenerator
>>>     -
>>>     Emit Watermark: watermark based on system time: 1605047271200,
>>>     periodicEmitWatermarkTime: 0, currentMaxTimestamp: 15000
>>>     14:28:06,200 INFO
>>>     org.myorg.quickstart.operator.PeriodicTableOutputWatermarkGenerator
>>>     -
>>>     Emit Watermark: watermark based on system time: 1605047271200,
>>>     periodicEmitWatermarkTime: 0, currentMaxTimestamp: 15000
>>>     14:28:06,200 INFO
>>>     org.myorg.quickstart.operator.PeriodicTableOutputWatermarkGenerator
>>>     -
>>>     Emit Watermark: watermark based on system time: 1605047271200,
>>>     periodicEmitWatermarkTime: 0, currentMaxTimestamp: 15000
>>>
>>>
>>>     Any insights? Thank you very much!
>>>
>>>
>>>     Best,
>>>
>>>     Fuyao
>>>