You are viewing a plain text version of this content. The canonical link for it is here.
Posted to user@flink.apache.org by fu...@oracle.com on 2020/11/10 23:27:56 UTC
BoundedOutOfOrderness Watermark Generator is NOT making the event
time to advance
Hi Experts,
I am trying to use to implement a KeyedProcessFunction with onTimer()
callback. I need to use event time and I meet some problems with making
the watermark available to my operator. I meet some strange behaviors.
I have a joined retracted stream without watermark or timestamp
information and i need to assign timestamps and watermarks to it. The
timestamp is just a field in the stream. For the watermark generator part.
Problem:
1. I can use timelag watermark generator and make it work. But for
BoundedOutofOrdernessGenator, The
context.timerService().currentWatermark() in ProcessElement() always
sticks to the initial setup and never updates.
2. I set the autoWatermark interval to 5 seconds for debug purpose, I
only attach this watermark generator in one place with parallelism 1.
However, I am getting 8 records at a time. timelag policy will advance
all 8 records, outOfOrderness policy will only advance 1 records. Maybe
the mismatch is causing the processElement() to capture the wrong
default watermark?
https://ci.apache.org/projects/flink/flink-docs-release-1.11/dev/event_timestamps_watermarks.html#writing-a-periodic-watermarkgenerator
This is my code for watermark generator:
@Slf4j
public class PeriodicTableOutputWatermarkGenerator implements
WatermarkGenerator<Tuple2<Boolean, Row>> {
private final long maxTimeLag = 15000;
private transient long currentMaxTimestamp = 15000;
@Override
public void onEvent(Tuple2<Boolean, Row> booleanRowTuple2, long
eventTimestamp, WatermarkOutput output) {
// the eventTimestamp is get through TimestampAssigner
//
https://github.com/apache/flink/blob/f24cb3f3b7e773706188ae92998b3e1ffbf1829e/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/TimestampsAndWatermarksOperator.java
currentMaxTimestamp = Math.max(eventTimestamp,
currentMaxTimestamp);
log.info("eventTimestamp in onEvent method: {}", eventTimestamp);
}
@Override
public void onPeriodicEmit(WatermarkOutput output) {
// Policy 1: timelag strategy, can work and advance the timestamp
long watermarkEpochTime = Math.max(System.currentTimeMillis() -
maxTimeLag, currentMaxTimestamp);
output.emitWatermark(new Watermark(watermarkEpochTime));
// Policy 2: periodic emit based on event
long periodicEmitWatermarkTime = currentMaxTimestamp - maxTimeLag;
// output.emitWatermark(new Watermark(periodicEmitWatermarkTime));
log.info("Emit Watermark: watermark based on system time: {},
periodicEmitWatermarkTime: {}, currentMaxTimestamp: {}"
, watermarkEpochTime, periodicEmitWatermarkTime,
currentMaxTimestamp);
}
}
This is my log printed by the slf4j log above. Every time, it will give
me 8 records, why it is 8 records? I think it should be 1 in theory. I
am very confused. Also, the policy 1 is advancing all 8 records. Policy
2 is advancing 1 of the 8 records and not reflected in processElement().
14:28:01,199 INFO
org.myorg.quickstart.operator.PeriodicTableOutputWatermarkGenerator -
Emit Watermark: watermark based on system time: 1605047266198,
periodicEmitWatermarkTime: 0, currentMaxTimestamp: 15000
14:28:01,199 INFO
org.myorg.quickstart.operator.PeriodicTableOutputWatermarkGenerator -
Emit Watermark: watermark based on system time: 1605047266199,
periodicEmitWatermarkTime: 1605047172881, currentMaxTimestamp: 1605047187881
14:28:01,199 INFO
org.myorg.quickstart.operator.PeriodicTableOutputWatermarkGenerator -
Emit Watermark: watermark based on system time: 1605047266199,
periodicEmitWatermarkTime: 0, currentMaxTimestamp: 15000
14:28:01,199 INFO
org.myorg.quickstart.operator.PeriodicTableOutputWatermarkGenerator -
Emit Watermark: watermark based on system time: 1605047266198,
periodicEmitWatermarkTime: 0, currentMaxTimestamp: 15000
14:28:01,199 INFO
org.myorg.quickstart.operator.PeriodicTableOutputWatermarkGenerator -
Emit Watermark: watermark based on system time: 1605047266198,
periodicEmitWatermarkTime: 0, currentMaxTimestamp: 15000
14:28:01,199 INFO
org.myorg.quickstart.operator.PeriodicTableOutputWatermarkGenerator -
Emit Watermark: watermark based on system time: 1605047266198,
periodicEmitWatermarkTime: 0, currentMaxTimestamp: 15000
14:28:01,199 INFO
org.myorg.quickstart.operator.PeriodicTableOutputWatermarkGenerator -
Emit Watermark: watermark based on system time: 1605047266198,
periodicEmitWatermarkTime: 0, currentMaxTimestamp: 15000
14:28:01,199 INFO
org.myorg.quickstart.operator.PeriodicTableOutputWatermarkGenerator -
Emit Watermark: watermark based on system time: 1605047266198,
periodicEmitWatermarkTime: 0, currentMaxTimestamp: 15000
14:28:06,200 INFO
org.myorg.quickstart.operator.PeriodicTableOutputWatermarkGenerator -
Emit Watermark: watermark based on system time: 1605047271200,
periodicEmitWatermarkTime: 0, currentMaxTimestamp: 15000
14:28:06,200 INFO
org.myorg.quickstart.operator.PeriodicTableOutputWatermarkGenerator -
Emit Watermark: watermark based on system time: 1605047271200,
periodicEmitWatermarkTime: 0, currentMaxTimestamp: 15000
14:28:06,200 INFO
org.myorg.quickstart.operator.PeriodicTableOutputWatermarkGenerator -
Emit Watermark: watermark based on system time: 1605047271200,
periodicEmitWatermarkTime: 1605047172881, currentMaxTimestamp: 1605047187881
14:28:06,200 INFO
org.myorg.quickstart.operator.PeriodicTableOutputWatermarkGenerator -
Emit Watermark: watermark based on system time: 1605047271200,
periodicEmitWatermarkTime: 0, currentMaxTimestamp: 15000
14:28:06,200 INFO
org.myorg.quickstart.operator.PeriodicTableOutputWatermarkGenerator -
Emit Watermark: watermark based on system time: 1605047271200,
periodicEmitWatermarkTime: 0, currentMaxTimestamp: 15000
14:28:06,200 INFO
org.myorg.quickstart.operator.PeriodicTableOutputWatermarkGenerator -
Emit Watermark: watermark based on system time: 1605047271200,
periodicEmitWatermarkTime: 0, currentMaxTimestamp: 15000
14:28:06,200 INFO
org.myorg.quickstart.operator.PeriodicTableOutputWatermarkGenerator -
Emit Watermark: watermark based on system time: 1605047271200,
periodicEmitWatermarkTime: 0, currentMaxTimestamp: 15000
14:28:06,200 INFO
org.myorg.quickstart.operator.PeriodicTableOutputWatermarkGenerator -
Emit Watermark: watermark based on system time: 1605047271200,
periodicEmitWatermarkTime: 0, currentMaxTimestamp: 15000
Any insights? Thank you very much!
Best,
Fuyao
Re: BoundedOutOfOrderness Watermark Generator is NOT making the event
time to advance
Posted by fu...@oracle.com.
The test workflow attachment is not added in the previous email, sorry
for the confusion, please refer to the describe text workflow.. Thanks.
On 11/12/20 16:17, fuyao.li@oracle.com wrote:
>
> Hi All,
>
> Just to add a little more context to the problem. I have a full outer
> join operation before this stage. The source data stream for full
> outer join is a Kafka Source. I also added timestamp and watermarks to
> the FlinkKafkaConsumer. After that, it makes no difference to the
> result, still can not make the watermark to advance.
>
> overall workflow:
>
> two kafka topics -> two data streams in Flink -> join them together
> and convert to retract stream -> do KeyedProcessFunction and schedule
> event time timer and onTimer logic in it -> push to downstream sink.
>
> I think there is no issues with my Syntax. But I still could NOT make
> the watermark to advance for event time using bound out of orderness
> strategy. (In Flink Cluster, the behavior is different, the watermark
> is advancing, but onTimer is still not triggered correctly. :(
>
> I guess the reason is that I receive 8 records for each round of
> onPeriodicEmit(), only one of the eight is updated for
> BoundedOutOfOrderness Strategy. For timelag strategy (refer to the
> first email in the thread), they are all updated so that it will make
> the watermark to advance. I just don't know why I got 8 records every
> time even if I have parallelism as 1. (logs can be found in the first
> email in the thread.)
>
> I also tried to debug inside Flink web interface based on the link:
> https://ci.apache.org/projects/flink/flink-docs-release-1.11/monitoring/debugging_event_time.html
>
> The logs produced by Flink local cluster is different from directly
> starting my application. *Why the behavior is inconsistent...? *The
> context timestamp sticks to LONG.MIN_VALUE during IDE debug, however,
> it is updated correctly in the Flink Cluster, except for the first
> record to be the the default value. But, I am still not getting the
> scheduled logic triggered correctly inside the onTimer method. My test
> workflow can be seen in the attachment. I have read through previous
> archives about the not updated watermark (stick to LONG.MIN_VALUE), it
> doesn't help much in my case. Thanks.
>
>
> Best,
>
> Fuyao
>
>
>
> On 11/11/20 11:33, fuyao.li@oracle.com wrote:
>>
>> Hi Community,
>>
>>
>> Regarding this problem, could someone give me an explanation? Thanks.
>>
>> Best,
>>
>> Fuyao
>>
>> On 11/10/20 16:56, fuyao.li@oracle.com wrote:
>>>
>>> Hi Kevin,
>>>
>>> Sorry for the name typo...
>>>
>>> On 11/10/20 16:48, fuyao.li@oracle.com wrote:
>>>>
>>>> Hi Kavin,
>>>>
>>>> Thanks for your example. I think I have already done something very
>>>> very similar before. I didn't post the full WatermarkStrategy
>>>> interface in my previous email, but I do have that part already. I
>>>> think the example you gave me is a punctuatedWatermarkStrategy, not
>>>> boundoutoforderness one. My major concern now is that why my
>>>> emitted watermark is not available in processElement() and why I
>>>> have 8 records for each time the code reaches the onPeriodicEmit
>>>> part. I will post my code following your example below.
>>>>
>>>> The symptom is that I will get the context watermark as
>>>> LONG.MIN_VALUE if I use the watermark strategy below.
>>>>
>>>> 16:35:12,969 INFO
>>>> org.myorg.quickstart.processor.TableOutputProcessFunction - context
>>>> current key: 69215, context current watermark: -9223372036854775808
>>>>
>>>>
>>>> DataStream<Tuple2<Boolean, Row>> retractStream =tEnv.toRetractStream(table, Row.class);
>>>> retractStream
>>>> .assignTimestampsAndWatermarks(new PunctuatedWatermarkStrategy())
>>>> .keyBy(
>>>> value -> {String invoice_id_key = (String) value.f1.getField(0); if (invoice_id_key ==null) {
>>>> invoice_id_key = (String) value.f1.getField(4); }
>>>> return invoice_id_key; })
>>>> .process(new TableOutputProcessFunction())
>>>> .name("ProcessTableOutput")
>>>> .uid("ProcessTableOutput")
>>>> .addSink(businessObjectSink)
>>>> .name("businessObjectSink")
>>>> .uid("businessObjectSink")
>>>> .setParallelism(1);
>>>>
>>>> watermark strategy:
>>>>
>>>> public class PunctuatedWatermarkStrategyimplements WatermarkStrategy<Tuple2<Boolean, Row>> {
>>>> @Override public WatermarkGenerator<Tuple2<Boolean, Row>>createWatermarkGenerator(WatermarkGeneratorSupplier.Context context) {
>>>> return new PunctuatedTableOutputWatermarkGenerator(); }
>>>>
>>>> @Override public TimestampAssigner<Tuple2<Boolean, Row>>createTimestampAssigner(TimestampAssignerSupplier.Context context) {
>>>> log.info("Inside timestamp assigner"); return (booleanRowTuple2, previousElementTimestamp) -> {
>>>> return my timestamp; }; }
>>>> }
>>>>
>>>> watermark generator code:
>>>>
>>>> public class PunctuatedTableOutputWatermarkGeneratorimplements WatermarkGenerator<Tuple2<Boolean, Row>> {
>>>> @Override public void onEvent(Tuple2<Boolean, Row> booleanRowTuple2, long eventTimestamp, WatermarkOutput watermarkOutput) {
>>>> watermarkOutput.emitWatermark(new Watermark(eventTimestamp)); log.info("Emit Punctuated watermark: {}", eventTimestamp); }
>>>>
>>>> @Override public void onPeriodicEmit(WatermarkOutput watermarkOutput) {
>>>> // don't need to do anything because we emit in reaction to events
>>>> above }
>>>> }
>>>>
>>>> 16:35:13,584 INFO
>>>> org.myorg.quickstart.operator.PunctuatedTableOutputWatermarkGenerator
>>>> - Emit Punctuated watermark: 1605054900905
>>>>
>>>> From the log, I can see, it extract the eventTimestamp and emits
>>>> the watermark. Why i can't access this piece of information in
>>>> processElement() function.
>>>>
>>>> Any suggestions? Thank you so much!
>>>>
>>>>
>>>> Best regards,
>>>>
>>>> Fuyao
>>>>
>>>>
>>>>
>>>> On 11/10/20 16:04, Kevin Kwon wrote:
>>>>> Hi Fuyao, I think you need to implement your own
>>>>> /WatermarkStrategy/ class and register that to
>>>>> /window/./assignTimestampsAndWatermarks(new
>>>>> YourEventWatermarkStrategy)/
>>>>> /
>>>>> /
>>>>> Make sure you use KafkaConsumer's /assignTimestampsAndWatermarks
>>>>> /if you're using Kafka consumers
>>>>> /
>>>>> /
>>>>> an example code for a booking event that has it's internal
>>>>> timestamp would be
>>>>>
>>>>> public class BookingWatermarkStrategyimplements WatermarkStrategy<Booking> {
>>>>>
>>>>> @Override public WatermarkGenerator<Booking>createWatermarkGenerator(
>>>>> WatermarkGeneratorSupplier.Context context
>>>>> ) {
>>>>> return new WatermarkGenerator<Booking>() {
>>>>> private final long OUT_OF_ORDERNESS_MILLIS =30; private long currentMaxTimestamp = Long.MIN_VALUE +OUT_OF_ORDERNESS_MILLIS +1; @Override public void onEvent(Booking bookingEvent, long eventTimestamp, WatermarkOutput output) {
>>>>> currentMaxTimestamp = Math.max(currentMaxTimestamp, bookingEvent.getTimestamp()); Watermark watermark =new Watermark(currentMaxTimestamp -OUT_OF_ORDERNESS_MILLIS -1); output.emitWatermark(watermark); }
>>>>>
>>>>> @Override public void onPeriodicEmit(WatermarkOutput output) {
>>>>> // Do nothing since watermark will be emitted every event }
>>>>> }; }
>>>>>
>>>>> @Override public TimestampAssigner<Booking>createTimestampAssigner(
>>>>> TimestampAssignerSupplier.Context context
>>>>> ) {
>>>>> return (booking, recordTimestamp) -> booking.getTimestamp(); }
>>>>> }
>>>>>
>>>>> On Wed, Nov 11, 2020 at 12:28 AM <fuyao.li@oracle.com
>>>>> <ma...@oracle.com>> wrote:
>>>>>
>>>>> Hi Experts,
>>>>>
>>>>> I am trying to use to implement a KeyedProcessFunction with
>>>>> onTimer()
>>>>> callback. I need to use event time and I meet some problems
>>>>> with making
>>>>> the watermark available to my operator. I meet some strange
>>>>> behaviors.
>>>>>
>>>>> I have a joined retracted stream without watermark or timestamp
>>>>> information and i need to assign timestamps and watermarks to
>>>>> it. The
>>>>> timestamp is just a field in the stream. For the watermark
>>>>> generator part.
>>>>>
>>>>> Problem:
>>>>>
>>>>> 1. I can use timelag watermark generator and make it work. But
>>>>> for
>>>>> BoundedOutofOrdernessGenator, The
>>>>> context.timerService().currentWatermark() in ProcessElement()
>>>>> always
>>>>> sticks to the initial setup and never updates.
>>>>>
>>>>> 2. I set the autoWatermark interval to 5 seconds for debug
>>>>> purpose, I
>>>>> only attach this watermark generator in one place with
>>>>> parallelism 1.
>>>>> However, I am getting 8 records at a time. timelag policy will
>>>>> advance
>>>>> all 8 records, outOfOrderness policy will only advance 1
>>>>> records. Maybe
>>>>> the mismatch is causing the processElement() to capture the wrong
>>>>> default watermark?
>>>>>
>>>>> https://ci.apache.org/projects/flink/flink-docs-release-1.11/dev/event_timestamps_watermarks.html#writing-a-periodic-watermarkgenerator
>>>>> <https://urldefense.com/v3/__https://ci.apache.org/projects/flink/flink-docs-release-1.11/dev/event_timestamps_watermarks.html*writing-a-periodic-watermarkgenerator__;Iw!!GqivPVa7Brio!IsqeiKPZU2RRj3KIBPhxTUiQ5GxzTU3oVWXMzZA1E3fghyT4OllCyHMAsIUclfM$>
>>>>>
>>>>> This is my code for watermark generator:
>>>>>
>>>>> @Slf4j
>>>>> public class PeriodicTableOutputWatermarkGenerator implements
>>>>> WatermarkGenerator<Tuple2<Boolean, Row>> {
>>>>> private final long maxTimeLag = 15000;
>>>>> private transient long currentMaxTimestamp = 15000;
>>>>> @Override
>>>>> public void onEvent(Tuple2<Boolean, Row>
>>>>> booleanRowTuple2, long
>>>>> eventTimestamp, WatermarkOutput output) {
>>>>> // the eventTimestamp is get through TimestampAssigner
>>>>> //
>>>>> https://github.com/apache/flink/blob/f24cb3f3b7e773706188ae92998b3e1ffbf1829e/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/TimestampsAndWatermarksOperator.java
>>>>> <https://urldefense.com/v3/__https://github.com/apache/flink/blob/f24cb3f3b7e773706188ae92998b3e1ffbf1829e/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/TimestampsAndWatermarksOperator.java__;!!GqivPVa7Brio!IsqeiKPZU2RRj3KIBPhxTUiQ5GxzTU3oVWXMzZA1E3fghyT4OllCyHMAQmdMGjw$>
>>>>> currentMaxTimestamp = Math.max(eventTimestamp,
>>>>> currentMaxTimestamp);
>>>>> log.info
>>>>> <https://urldefense.com/v3/__http://log.info__;!!GqivPVa7Brio!IsqeiKPZU2RRj3KIBPhxTUiQ5GxzTU3oVWXMzZA1E3fghyT4OllCyHMAGAShS_k$>("eventTimestamp
>>>>> in onEvent method: {}", eventTimestamp);
>>>>> }
>>>>>
>>>>> @Override
>>>>> public void onPeriodicEmit(WatermarkOutput output) {
>>>>> // Policy 1: timelag strategy, can work and advance
>>>>> the timestamp
>>>>> long watermarkEpochTime =
>>>>> Math.max(System.currentTimeMillis() -
>>>>> maxTimeLag, currentMaxTimestamp);
>>>>> output.emitWatermark(new Watermark(watermarkEpochTime));
>>>>>
>>>>> // Policy 2: periodic emit based on event
>>>>> long periodicEmitWatermarkTime = currentMaxTimestamp
>>>>> - maxTimeLag;
>>>>> // output.emitWatermark(new
>>>>> Watermark(periodicEmitWatermarkTime));
>>>>>
>>>>> log.info
>>>>> <https://urldefense.com/v3/__http://log.info__;!!GqivPVa7Brio!IsqeiKPZU2RRj3KIBPhxTUiQ5GxzTU3oVWXMzZA1E3fghyT4OllCyHMAGAShS_k$>("Emit
>>>>> Watermark: watermark based on system time: {},
>>>>> periodicEmitWatermarkTime: {}, currentMaxTimestamp: {}"
>>>>> , watermarkEpochTime, periodicEmitWatermarkTime,
>>>>> currentMaxTimestamp);
>>>>> }
>>>>> }
>>>>>
>>>>>
>>>>> This is my log printed by the slf4j log above. Every time, it
>>>>> will give
>>>>> me 8 records, why it is 8 records? I think it should be 1 in
>>>>> theory. I
>>>>> am very confused. Also, the policy 1 is advancing all 8
>>>>> records. Policy
>>>>> 2 is advancing 1 of the 8 records and not reflected in
>>>>> processElement().
>>>>>
>>>>> 14:28:01,199 INFO
>>>>> org.myorg.quickstart.operator.PeriodicTableOutputWatermarkGenerator
>>>>> -
>>>>> Emit Watermark: watermark based on system time: 1605047266198,
>>>>> periodicEmitWatermarkTime: 0, currentMaxTimestamp: 15000
>>>>> 14:28:01,199 INFO
>>>>> org.myorg.quickstart.operator.PeriodicTableOutputWatermarkGenerator
>>>>> -
>>>>> Emit Watermark: watermark based on system time: 1605047266199,
>>>>> periodicEmitWatermarkTime: 1605047172881, currentMaxTimestamp:
>>>>> 1605047187881
>>>>> 14:28:01,199 INFO
>>>>> org.myorg.quickstart.operator.PeriodicTableOutputWatermarkGenerator
>>>>> -
>>>>> Emit Watermark: watermark based on system time: 1605047266199,
>>>>> periodicEmitWatermarkTime: 0, currentMaxTimestamp: 15000
>>>>> 14:28:01,199 INFO
>>>>> org.myorg.quickstart.operator.PeriodicTableOutputWatermarkGenerator
>>>>> -
>>>>> Emit Watermark: watermark based on system time: 1605047266198,
>>>>> periodicEmitWatermarkTime: 0, currentMaxTimestamp: 15000
>>>>> 14:28:01,199 INFO
>>>>> org.myorg.quickstart.operator.PeriodicTableOutputWatermarkGenerator
>>>>> -
>>>>> Emit Watermark: watermark based on system time: 1605047266198,
>>>>> periodicEmitWatermarkTime: 0, currentMaxTimestamp: 15000
>>>>> 14:28:01,199 INFO
>>>>> org.myorg.quickstart.operator.PeriodicTableOutputWatermarkGenerator
>>>>> -
>>>>> Emit Watermark: watermark based on system time: 1605047266198,
>>>>> periodicEmitWatermarkTime: 0, currentMaxTimestamp: 15000
>>>>> 14:28:01,199 INFO
>>>>> org.myorg.quickstart.operator.PeriodicTableOutputWatermarkGenerator
>>>>> -
>>>>> Emit Watermark: watermark based on system time: 1605047266198,
>>>>> periodicEmitWatermarkTime: 0, currentMaxTimestamp: 15000
>>>>> 14:28:01,199 INFO
>>>>> org.myorg.quickstart.operator.PeriodicTableOutputWatermarkGenerator
>>>>> -
>>>>> Emit Watermark: watermark based on system time: 1605047266198,
>>>>> periodicEmitWatermarkTime: 0, currentMaxTimestamp: 15000
>>>>>
>>>>> 14:28:06,200 INFO
>>>>> org.myorg.quickstart.operator.PeriodicTableOutputWatermarkGenerator
>>>>> -
>>>>> Emit Watermark: watermark based on system time: 1605047271200,
>>>>> periodicEmitWatermarkTime: 0, currentMaxTimestamp: 15000
>>>>> 14:28:06,200 INFO
>>>>> org.myorg.quickstart.operator.PeriodicTableOutputWatermarkGenerator
>>>>> -
>>>>> Emit Watermark: watermark based on system time: 1605047271200,
>>>>> periodicEmitWatermarkTime: 0, currentMaxTimestamp: 15000
>>>>> 14:28:06,200 INFO
>>>>> org.myorg.quickstart.operator.PeriodicTableOutputWatermarkGenerator
>>>>> -
>>>>> Emit Watermark: watermark based on system time: 1605047271200,
>>>>> periodicEmitWatermarkTime: 1605047172881, currentMaxTimestamp:
>>>>> 1605047187881
>>>>> 14:28:06,200 INFO
>>>>> org.myorg.quickstart.operator.PeriodicTableOutputWatermarkGenerator
>>>>> -
>>>>> Emit Watermark: watermark based on system time: 1605047271200,
>>>>> periodicEmitWatermarkTime: 0, currentMaxTimestamp: 15000
>>>>> 14:28:06,200 INFO
>>>>> org.myorg.quickstart.operator.PeriodicTableOutputWatermarkGenerator
>>>>> -
>>>>> Emit Watermark: watermark based on system time: 1605047271200,
>>>>> periodicEmitWatermarkTime: 0, currentMaxTimestamp: 15000
>>>>> 14:28:06,200 INFO
>>>>> org.myorg.quickstart.operator.PeriodicTableOutputWatermarkGenerator
>>>>> -
>>>>> Emit Watermark: watermark based on system time: 1605047271200,
>>>>> periodicEmitWatermarkTime: 0, currentMaxTimestamp: 15000
>>>>> 14:28:06,200 INFO
>>>>> org.myorg.quickstart.operator.PeriodicTableOutputWatermarkGenerator
>>>>> -
>>>>> Emit Watermark: watermark based on system time: 1605047271200,
>>>>> periodicEmitWatermarkTime: 0, currentMaxTimestamp: 15000
>>>>> 14:28:06,200 INFO
>>>>> org.myorg.quickstart.operator.PeriodicTableOutputWatermarkGenerator
>>>>> -
>>>>> Emit Watermark: watermark based on system time: 1605047271200,
>>>>> periodicEmitWatermarkTime: 0, currentMaxTimestamp: 15000
>>>>>
>>>>>
>>>>> Any insights? Thank you very much!
>>>>>
>>>>>
>>>>> Best,
>>>>>
>>>>> Fuyao
>>>>>
Re: BoundedOutOfOrderness Watermark Generator is NOT making the event
time to advance
Posted by fu...@oracle.com.
Hi All,
Just to add a little more context to the problem. I have a full outer
join operation before this stage. The source data stream for full outer
join is a Kafka Source. I also added timestamp and watermarks to the
FlinkKafkaConsumer. After that, it makes no difference to the result,
still can not make the watermark to advance.
overall workflow:
two kafka topics -> two data streams in Flink -> join them together and
convert to retract stream -> do KeyedProcessFunction and schedule event
time timer and onTimer logic in it -> push to downstream sink.
I think there is no issues with my Syntax. But I still could NOT make
the watermark to advance for event time using bound out of orderness
strategy. (In Flink Cluster, the behavior is different, the watermark is
advancing, but onTimer is still not triggered correctly. :(
I guess the reason is that I receive 8 records for each round of
onPeriodicEmit(), only one of the eight is updated for
BoundedOutOfOrderness Strategy. For timelag strategy (refer to the first
email in the thread), they are all updated so that it will make the
watermark to advance. I just don't know why I got 8 records every time
even if I have parallelism as 1. (logs can be found in the first email
in the thread.)
I also tried to debug inside Flink web interface based on the link:
https://ci.apache.org/projects/flink/flink-docs-release-1.11/monitoring/debugging_event_time.html
The logs produced by Flink local cluster is different from directly
starting my application. *Why the behavior is inconsistent...? *The
context timestamp sticks to LONG.MIN_VALUE during IDE debug, however, it
is updated correctly in the Flink Cluster, except for the first record
to be the the default value. But, I am still not getting the scheduled
logic triggered correctly inside the onTimer method. My test workflow
can be seen in the attachment. I have read through previous archives
about the not updated watermark (stick to LONG.MIN_VALUE), it doesn't
help much in my case. Thanks.
Best,
Fuyao
On 11/11/20 11:33, fuyao.li@oracle.com wrote:
>
> Hi Community,
>
>
> Regarding this problem, could someone give me an explanation? Thanks.
>
> Best,
>
> Fuyao
>
> On 11/10/20 16:56, fuyao.li@oracle.com wrote:
>>
>> Hi Kevin,
>>
>> Sorry for the name typo...
>>
>> On 11/10/20 16:48, fuyao.li@oracle.com wrote:
>>>
>>> Hi Kavin,
>>>
>>> Thanks for your example. I think I have already done something very
>>> very similar before. I didn't post the full WatermarkStrategy
>>> interface in my previous email, but I do have that part already. I
>>> think the example you gave me is a punctuatedWatermarkStrategy, not
>>> boundoutoforderness one. My major concern now is that why my emitted
>>> watermark is not available in processElement() and why I have 8
>>> records for each time the code reaches the onPeriodicEmit part. I
>>> will post my code following your example below.
>>>
>>> The symptom is that I will get the context watermark as
>>> LONG.MIN_VALUE if I use the watermark strategy below.
>>>
>>> 16:35:12,969 INFO
>>> org.myorg.quickstart.processor.TableOutputProcessFunction - context
>>> current key: 69215, context current watermark: -9223372036854775808
>>>
>>>
>>> DataStream<Tuple2<Boolean, Row>> retractStream =tEnv.toRetractStream(table, Row.class);
>>> retractStream
>>> .assignTimestampsAndWatermarks(new PunctuatedWatermarkStrategy())
>>> .keyBy(
>>> value -> {String invoice_id_key = (String) value.f1.getField(0); if (invoice_id_key ==null) {
>>> invoice_id_key = (String) value.f1.getField(4); }
>>> return invoice_id_key; })
>>> .process(new TableOutputProcessFunction())
>>> .name("ProcessTableOutput")
>>> .uid("ProcessTableOutput")
>>> .addSink(businessObjectSink)
>>> .name("businessObjectSink")
>>> .uid("businessObjectSink")
>>> .setParallelism(1);
>>>
>>> watermark strategy:
>>>
>>> public class PunctuatedWatermarkStrategyimplements WatermarkStrategy<Tuple2<Boolean, Row>> {
>>> @Override public WatermarkGenerator<Tuple2<Boolean, Row>>createWatermarkGenerator(WatermarkGeneratorSupplier.Context context) {
>>> return new PunctuatedTableOutputWatermarkGenerator(); }
>>>
>>> @Override public TimestampAssigner<Tuple2<Boolean, Row>>createTimestampAssigner(TimestampAssignerSupplier.Context context) {
>>> log.info("Inside timestamp assigner"); return (booleanRowTuple2, previousElementTimestamp) -> {
>>> return my timestamp; }; }
>>> }
>>>
>>> watermark generator code:
>>>
>>> public class PunctuatedTableOutputWatermarkGeneratorimplements WatermarkGenerator<Tuple2<Boolean, Row>> {
>>> @Override public void onEvent(Tuple2<Boolean, Row> booleanRowTuple2, long eventTimestamp, WatermarkOutput watermarkOutput) {
>>> watermarkOutput.emitWatermark(new Watermark(eventTimestamp)); log.info("Emit Punctuated watermark: {}", eventTimestamp); }
>>>
>>> @Override public void onPeriodicEmit(WatermarkOutput watermarkOutput) {
>>> // don't need to do anything because we emit in reaction to events
>>> above }
>>> }
>>>
>>> 16:35:13,584 INFO
>>> org.myorg.quickstart.operator.PunctuatedTableOutputWatermarkGenerator
>>> - Emit Punctuated watermark: 1605054900905
>>>
>>> From the log, I can see, it extract the eventTimestamp and emits the
>>> watermark. Why i can't access this piece of information in
>>> processElement() function.
>>>
>>> Any suggestions? Thank you so much!
>>>
>>>
>>> Best regards,
>>>
>>> Fuyao
>>>
>>>
>>>
>>> On 11/10/20 16:04, Kevin Kwon wrote:
>>>> Hi Fuyao, I think you need to implement your own
>>>> /WatermarkStrategy/ class and register that to
>>>> /window/./assignTimestampsAndWatermarks(new
>>>> YourEventWatermarkStrategy)/
>>>> /
>>>> /
>>>> Make sure you use KafkaConsumer's /assignTimestampsAndWatermarks
>>>> /if you're using Kafka consumers
>>>> /
>>>> /
>>>> an example code for a booking event that has it's internal
>>>> timestamp would be
>>>>
>>>> public class BookingWatermarkStrategyimplements WatermarkStrategy<Booking> {
>>>>
>>>> @Override public WatermarkGenerator<Booking>createWatermarkGenerator(
>>>> WatermarkGeneratorSupplier.Context context
>>>> ) {
>>>> return new WatermarkGenerator<Booking>() {
>>>> private final long OUT_OF_ORDERNESS_MILLIS =30; private long currentMaxTimestamp = Long.MIN_VALUE +OUT_OF_ORDERNESS_MILLIS +1; @Override public void onEvent(Booking bookingEvent, long eventTimestamp, WatermarkOutput output) {
>>>> currentMaxTimestamp = Math.max(currentMaxTimestamp, bookingEvent.getTimestamp()); Watermark watermark =new Watermark(currentMaxTimestamp -OUT_OF_ORDERNESS_MILLIS -1); output.emitWatermark(watermark); }
>>>>
>>>> @Override public void onPeriodicEmit(WatermarkOutput output) {
>>>> // Do nothing since watermark will be emitted every event }
>>>> }; }
>>>>
>>>> @Override public TimestampAssigner<Booking>createTimestampAssigner(
>>>> TimestampAssignerSupplier.Context context
>>>> ) {
>>>> return (booking, recordTimestamp) -> booking.getTimestamp(); }
>>>> }
>>>>
>>>> On Wed, Nov 11, 2020 at 12:28 AM <fuyao.li@oracle.com
>>>> <ma...@oracle.com>> wrote:
>>>>
>>>> Hi Experts,
>>>>
>>>> I am trying to use to implement a KeyedProcessFunction with
>>>> onTimer()
>>>> callback. I need to use event time and I meet some problems
>>>> with making
>>>> the watermark available to my operator. I meet some strange
>>>> behaviors.
>>>>
>>>> I have a joined retracted stream without watermark or timestamp
>>>> information and i need to assign timestamps and watermarks to
>>>> it. The
>>>> timestamp is just a field in the stream. For the watermark
>>>> generator part.
>>>>
>>>> Problem:
>>>>
>>>> 1. I can use timelag watermark generator and make it work. But for
>>>> BoundedOutofOrdernessGenator, The
>>>> context.timerService().currentWatermark() in ProcessElement()
>>>> always
>>>> sticks to the initial setup and never updates.
>>>>
>>>> 2. I set the autoWatermark interval to 5 seconds for debug
>>>> purpose, I
>>>> only attach this watermark generator in one place with
>>>> parallelism 1.
>>>> However, I am getting 8 records at a time. timelag policy will
>>>> advance
>>>> all 8 records, outOfOrderness policy will only advance 1
>>>> records. Maybe
>>>> the mismatch is causing the processElement() to capture the wrong
>>>> default watermark?
>>>>
>>>> https://ci.apache.org/projects/flink/flink-docs-release-1.11/dev/event_timestamps_watermarks.html#writing-a-periodic-watermarkgenerator
>>>> <https://urldefense.com/v3/__https://ci.apache.org/projects/flink/flink-docs-release-1.11/dev/event_timestamps_watermarks.html*writing-a-periodic-watermarkgenerator__;Iw!!GqivPVa7Brio!IsqeiKPZU2RRj3KIBPhxTUiQ5GxzTU3oVWXMzZA1E3fghyT4OllCyHMAsIUclfM$>
>>>>
>>>> This is my code for watermark generator:
>>>>
>>>> @Slf4j
>>>> public class PeriodicTableOutputWatermarkGenerator implements
>>>> WatermarkGenerator<Tuple2<Boolean, Row>> {
>>>> private final long maxTimeLag = 15000;
>>>> private transient long currentMaxTimestamp = 15000;
>>>> @Override
>>>> public void onEvent(Tuple2<Boolean, Row> booleanRowTuple2,
>>>> long
>>>> eventTimestamp, WatermarkOutput output) {
>>>> // the eventTimestamp is get through TimestampAssigner
>>>> //
>>>> https://github.com/apache/flink/blob/f24cb3f3b7e773706188ae92998b3e1ffbf1829e/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/TimestampsAndWatermarksOperator.java
>>>> <https://urldefense.com/v3/__https://github.com/apache/flink/blob/f24cb3f3b7e773706188ae92998b3e1ffbf1829e/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/TimestampsAndWatermarksOperator.java__;!!GqivPVa7Brio!IsqeiKPZU2RRj3KIBPhxTUiQ5GxzTU3oVWXMzZA1E3fghyT4OllCyHMAQmdMGjw$>
>>>> currentMaxTimestamp = Math.max(eventTimestamp,
>>>> currentMaxTimestamp);
>>>> log.info
>>>> <https://urldefense.com/v3/__http://log.info__;!!GqivPVa7Brio!IsqeiKPZU2RRj3KIBPhxTUiQ5GxzTU3oVWXMzZA1E3fghyT4OllCyHMAGAShS_k$>("eventTimestamp
>>>> in onEvent method: {}", eventTimestamp);
>>>> }
>>>>
>>>> @Override
>>>> public void onPeriodicEmit(WatermarkOutput output) {
>>>> // Policy 1: timelag strategy, can work and advance
>>>> the timestamp
>>>> long watermarkEpochTime =
>>>> Math.max(System.currentTimeMillis() -
>>>> maxTimeLag, currentMaxTimestamp);
>>>> output.emitWatermark(new Watermark(watermarkEpochTime));
>>>>
>>>> // Policy 2: periodic emit based on event
>>>> long periodicEmitWatermarkTime = currentMaxTimestamp -
>>>> maxTimeLag;
>>>> // output.emitWatermark(new
>>>> Watermark(periodicEmitWatermarkTime));
>>>>
>>>> log.info
>>>> <https://urldefense.com/v3/__http://log.info__;!!GqivPVa7Brio!IsqeiKPZU2RRj3KIBPhxTUiQ5GxzTU3oVWXMzZA1E3fghyT4OllCyHMAGAShS_k$>("Emit
>>>> Watermark: watermark based on system time: {},
>>>> periodicEmitWatermarkTime: {}, currentMaxTimestamp: {}"
>>>> , watermarkEpochTime, periodicEmitWatermarkTime,
>>>> currentMaxTimestamp);
>>>> }
>>>> }
>>>>
>>>>
>>>> This is my log printed by the slf4j log above. Every time, it
>>>> will give
>>>> me 8 records, why it is 8 records? I think it should be 1 in
>>>> theory. I
>>>> am very confused. Also, the policy 1 is advancing all 8
>>>> records. Policy
>>>> 2 is advancing 1 of the 8 records and not reflected in
>>>> processElement().
>>>>
>>>> 14:28:01,199 INFO
>>>> org.myorg.quickstart.operator.PeriodicTableOutputWatermarkGenerator
>>>> -
>>>> Emit Watermark: watermark based on system time: 1605047266198,
>>>> periodicEmitWatermarkTime: 0, currentMaxTimestamp: 15000
>>>> 14:28:01,199 INFO
>>>> org.myorg.quickstart.operator.PeriodicTableOutputWatermarkGenerator
>>>> -
>>>> Emit Watermark: watermark based on system time: 1605047266199,
>>>> periodicEmitWatermarkTime: 1605047172881, currentMaxTimestamp:
>>>> 1605047187881
>>>> 14:28:01,199 INFO
>>>> org.myorg.quickstart.operator.PeriodicTableOutputWatermarkGenerator
>>>> -
>>>> Emit Watermark: watermark based on system time: 1605047266199,
>>>> periodicEmitWatermarkTime: 0, currentMaxTimestamp: 15000
>>>> 14:28:01,199 INFO
>>>> org.myorg.quickstart.operator.PeriodicTableOutputWatermarkGenerator
>>>> -
>>>> Emit Watermark: watermark based on system time: 1605047266198,
>>>> periodicEmitWatermarkTime: 0, currentMaxTimestamp: 15000
>>>> 14:28:01,199 INFO
>>>> org.myorg.quickstart.operator.PeriodicTableOutputWatermarkGenerator
>>>> -
>>>> Emit Watermark: watermark based on system time: 1605047266198,
>>>> periodicEmitWatermarkTime: 0, currentMaxTimestamp: 15000
>>>> 14:28:01,199 INFO
>>>> org.myorg.quickstart.operator.PeriodicTableOutputWatermarkGenerator
>>>> -
>>>> Emit Watermark: watermark based on system time: 1605047266198,
>>>> periodicEmitWatermarkTime: 0, currentMaxTimestamp: 15000
>>>> 14:28:01,199 INFO
>>>> org.myorg.quickstart.operator.PeriodicTableOutputWatermarkGenerator
>>>> -
>>>> Emit Watermark: watermark based on system time: 1605047266198,
>>>> periodicEmitWatermarkTime: 0, currentMaxTimestamp: 15000
>>>> 14:28:01,199 INFO
>>>> org.myorg.quickstart.operator.PeriodicTableOutputWatermarkGenerator
>>>> -
>>>> Emit Watermark: watermark based on system time: 1605047266198,
>>>> periodicEmitWatermarkTime: 0, currentMaxTimestamp: 15000
>>>>
>>>> 14:28:06,200 INFO
>>>> org.myorg.quickstart.operator.PeriodicTableOutputWatermarkGenerator
>>>> -
>>>> Emit Watermark: watermark based on system time: 1605047271200,
>>>> periodicEmitWatermarkTime: 0, currentMaxTimestamp: 15000
>>>> 14:28:06,200 INFO
>>>> org.myorg.quickstart.operator.PeriodicTableOutputWatermarkGenerator
>>>> -
>>>> Emit Watermark: watermark based on system time: 1605047271200,
>>>> periodicEmitWatermarkTime: 0, currentMaxTimestamp: 15000
>>>> 14:28:06,200 INFO
>>>> org.myorg.quickstart.operator.PeriodicTableOutputWatermarkGenerator
>>>> -
>>>> Emit Watermark: watermark based on system time: 1605047271200,
>>>> periodicEmitWatermarkTime: 1605047172881, currentMaxTimestamp:
>>>> 1605047187881
>>>> 14:28:06,200 INFO
>>>> org.myorg.quickstart.operator.PeriodicTableOutputWatermarkGenerator
>>>> -
>>>> Emit Watermark: watermark based on system time: 1605047271200,
>>>> periodicEmitWatermarkTime: 0, currentMaxTimestamp: 15000
>>>> 14:28:06,200 INFO
>>>> org.myorg.quickstart.operator.PeriodicTableOutputWatermarkGenerator
>>>> -
>>>> Emit Watermark: watermark based on system time: 1605047271200,
>>>> periodicEmitWatermarkTime: 0, currentMaxTimestamp: 15000
>>>> 14:28:06,200 INFO
>>>> org.myorg.quickstart.operator.PeriodicTableOutputWatermarkGenerator
>>>> -
>>>> Emit Watermark: watermark based on system time: 1605047271200,
>>>> periodicEmitWatermarkTime: 0, currentMaxTimestamp: 15000
>>>> 14:28:06,200 INFO
>>>> org.myorg.quickstart.operator.PeriodicTableOutputWatermarkGenerator
>>>> -
>>>> Emit Watermark: watermark based on system time: 1605047271200,
>>>> periodicEmitWatermarkTime: 0, currentMaxTimestamp: 15000
>>>> 14:28:06,200 INFO
>>>> org.myorg.quickstart.operator.PeriodicTableOutputWatermarkGenerator
>>>> -
>>>> Emit Watermark: watermark based on system time: 1605047271200,
>>>> periodicEmitWatermarkTime: 0, currentMaxTimestamp: 15000
>>>>
>>>>
>>>> Any insights? Thank you very much!
>>>>
>>>>
>>>> Best,
>>>>
>>>> Fuyao
>>>>
Re: BoundedOutOfOrderness Watermark Generator is NOT making the event
time to advance
Posted by fu...@oracle.com.
Hi Community,
Regarding this problem, could someone give me an explanation? Thanks.
Best,
Fuyao
On 11/10/20 16:56, fuyao.li@oracle.com wrote:
>
> Hi Kevin,
>
> Sorry for the name typo...
>
> On 11/10/20 16:48, fuyao.li@oracle.com wrote:
>>
>> Hi Kavin,
>>
>> Thanks for your example. I think I have already done something very
>> very similar before. I didn't post the full WatermarkStrategy
>> interface in my previous email, but I do have that part already. I
>> think the example you gave me is a punctuatedWatermarkStrategy, not
>> boundoutoforderness one. My major concern now is that why my emitted
>> watermark is not available in processElement() and why I have 8
>> records for each time the code reaches the onPeriodicEmit part. I
>> will post my code following your example below.
>>
>> The symptom is that I will get the context watermark as
>> LONG.MIN_VALUE if I use the watermark strategy below.
>>
>> 16:35:12,969 INFO
>> org.myorg.quickstart.processor.TableOutputProcessFunction - context
>> current key: 69215, context current watermark: -9223372036854775808
>>
>>
>> DataStream<Tuple2<Boolean, Row>> retractStream =tEnv.toRetractStream(table, Row.class);
>> retractStream
>> .assignTimestampsAndWatermarks(new PunctuatedWatermarkStrategy())
>> .keyBy(
>> value -> {String invoice_id_key = (String) value.f1.getField(0); if (invoice_id_key ==null) {
>> invoice_id_key = (String) value.f1.getField(4); }
>> return invoice_id_key; })
>> .process(new TableOutputProcessFunction())
>> .name("ProcessTableOutput")
>> .uid("ProcessTableOutput")
>> .addSink(businessObjectSink)
>> .name("businessObjectSink")
>> .uid("businessObjectSink")
>> .setParallelism(1);
>>
>> watermark strategy:
>>
>> public class PunctuatedWatermarkStrategyimplements WatermarkStrategy<Tuple2<Boolean, Row>> {
>> @Override public WatermarkGenerator<Tuple2<Boolean, Row>>createWatermarkGenerator(WatermarkGeneratorSupplier.Context context) {
>> return new PunctuatedTableOutputWatermarkGenerator(); }
>>
>> @Override public TimestampAssigner<Tuple2<Boolean, Row>>createTimestampAssigner(TimestampAssignerSupplier.Context context) {
>> log.info("Inside timestamp assigner"); return (booleanRowTuple2, previousElementTimestamp) -> {
>> return my timestamp; }; }
>> }
>>
>> watermark generator code:
>>
>> public class PunctuatedTableOutputWatermarkGeneratorimplements WatermarkGenerator<Tuple2<Boolean, Row>> {
>> @Override public void onEvent(Tuple2<Boolean, Row> booleanRowTuple2, long eventTimestamp, WatermarkOutput watermarkOutput) {
>> watermarkOutput.emitWatermark(new Watermark(eventTimestamp)); log.info("Emit Punctuated watermark: {}", eventTimestamp); }
>>
>> @Override public void onPeriodicEmit(WatermarkOutput watermarkOutput) {
>> // don't need to do anything because we emit in reaction to events above }
>> }
>>
>> 16:35:13,584 INFO
>> org.myorg.quickstart.operator.PunctuatedTableOutputWatermarkGenerator
>> - Emit Punctuated watermark: 1605054900905
>>
>> From the log, I can see, it extract the eventTimestamp and emits the
>> watermark. Why i can't access this piece of information in
>> processElement() function.
>>
>> Any suggestions? Thank you so much!
>>
>>
>> Best regards,
>>
>> Fuyao
>>
>>
>>
>> On 11/10/20 16:04, Kevin Kwon wrote:
>>> Hi Fuyao, I think you need to implement your own /WatermarkStrategy/
>>> class and register that to
>>> /window/./assignTimestampsAndWatermarks(new
>>> YourEventWatermarkStrategy)/
>>> /
>>> /
>>> Make sure you use KafkaConsumer's /assignTimestampsAndWatermarks /if
>>> you're using Kafka consumers
>>> /
>>> /
>>> an example code for a booking event that has it's internal timestamp
>>> would be
>>>
>>> public class BookingWatermarkStrategyimplements WatermarkStrategy<Booking> {
>>>
>>> @Override public WatermarkGenerator<Booking>createWatermarkGenerator(
>>> WatermarkGeneratorSupplier.Context context
>>> ) {
>>> return new WatermarkGenerator<Booking>() {
>>> private final long OUT_OF_ORDERNESS_MILLIS =30; private long currentMaxTimestamp = Long.MIN_VALUE +OUT_OF_ORDERNESS_MILLIS +1; @Override public void onEvent(Booking bookingEvent, long eventTimestamp, WatermarkOutput output) {
>>> currentMaxTimestamp = Math.max(currentMaxTimestamp, bookingEvent.getTimestamp()); Watermark watermark =new Watermark(currentMaxTimestamp -OUT_OF_ORDERNESS_MILLIS -1); output.emitWatermark(watermark); }
>>>
>>> @Override public void onPeriodicEmit(WatermarkOutput output) {
>>> // Do nothing since watermark will be emitted every event }
>>> }; }
>>>
>>> @Override public TimestampAssigner<Booking>createTimestampAssigner(
>>> TimestampAssignerSupplier.Context context
>>> ) {
>>> return (booking, recordTimestamp) -> booking.getTimestamp(); }
>>> }
>>>
>>> On Wed, Nov 11, 2020 at 12:28 AM <fuyao.li@oracle.com
>>> <ma...@oracle.com>> wrote:
>>>
>>> Hi Experts,
>>>
>>> I am trying to use to implement a KeyedProcessFunction with
>>> onTimer()
>>> callback. I need to use event time and I meet some problems with
>>> making
>>> the watermark available to my operator. I meet some strange
>>> behaviors.
>>>
>>> I have a joined retracted stream without watermark or timestamp
>>> information and i need to assign timestamps and watermarks to
>>> it. The
>>> timestamp is just a field in the stream. For the watermark
>>> generator part.
>>>
>>> Problem:
>>>
>>> 1. I can use timelag watermark generator and make it work. But for
>>> BoundedOutofOrdernessGenator, The
>>> context.timerService().currentWatermark() in ProcessElement()
>>> always
>>> sticks to the initial setup and never updates.
>>>
>>> 2. I set the autoWatermark interval to 5 seconds for debug
>>> purpose, I
>>> only attach this watermark generator in one place with
>>> parallelism 1.
>>> However, I am getting 8 records at a time. timelag policy will
>>> advance
>>> all 8 records, outOfOrderness policy will only advance 1
>>> records. Maybe
>>> the mismatch is causing the processElement() to capture the wrong
>>> default watermark?
>>>
>>> https://ci.apache.org/projects/flink/flink-docs-release-1.11/dev/event_timestamps_watermarks.html#writing-a-periodic-watermarkgenerator
>>> <https://urldefense.com/v3/__https://ci.apache.org/projects/flink/flink-docs-release-1.11/dev/event_timestamps_watermarks.html*writing-a-periodic-watermarkgenerator__;Iw!!GqivPVa7Brio!IsqeiKPZU2RRj3KIBPhxTUiQ5GxzTU3oVWXMzZA1E3fghyT4OllCyHMAsIUclfM$>
>>>
>>> This is my code for watermark generator:
>>>
>>> @Slf4j
>>> public class PeriodicTableOutputWatermarkGenerator implements
>>> WatermarkGenerator<Tuple2<Boolean, Row>> {
>>> private final long maxTimeLag = 15000;
>>> private transient long currentMaxTimestamp = 15000;
>>> @Override
>>> public void onEvent(Tuple2<Boolean, Row> booleanRowTuple2,
>>> long
>>> eventTimestamp, WatermarkOutput output) {
>>> // the eventTimestamp is get through TimestampAssigner
>>> //
>>> https://github.com/apache/flink/blob/f24cb3f3b7e773706188ae92998b3e1ffbf1829e/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/TimestampsAndWatermarksOperator.java
>>> <https://urldefense.com/v3/__https://github.com/apache/flink/blob/f24cb3f3b7e773706188ae92998b3e1ffbf1829e/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/TimestampsAndWatermarksOperator.java__;!!GqivPVa7Brio!IsqeiKPZU2RRj3KIBPhxTUiQ5GxzTU3oVWXMzZA1E3fghyT4OllCyHMAQmdMGjw$>
>>> currentMaxTimestamp = Math.max(eventTimestamp,
>>> currentMaxTimestamp);
>>> log.info
>>> <https://urldefense.com/v3/__http://log.info__;!!GqivPVa7Brio!IsqeiKPZU2RRj3KIBPhxTUiQ5GxzTU3oVWXMzZA1E3fghyT4OllCyHMAGAShS_k$>("eventTimestamp
>>> in onEvent method: {}", eventTimestamp);
>>> }
>>>
>>> @Override
>>> public void onPeriodicEmit(WatermarkOutput output) {
>>> // Policy 1: timelag strategy, can work and advance the
>>> timestamp
>>> long watermarkEpochTime =
>>> Math.max(System.currentTimeMillis() -
>>> maxTimeLag, currentMaxTimestamp);
>>> output.emitWatermark(new Watermark(watermarkEpochTime));
>>>
>>> // Policy 2: periodic emit based on event
>>> long periodicEmitWatermarkTime = currentMaxTimestamp -
>>> maxTimeLag;
>>> // output.emitWatermark(new
>>> Watermark(periodicEmitWatermarkTime));
>>>
>>> log.info
>>> <https://urldefense.com/v3/__http://log.info__;!!GqivPVa7Brio!IsqeiKPZU2RRj3KIBPhxTUiQ5GxzTU3oVWXMzZA1E3fghyT4OllCyHMAGAShS_k$>("Emit
>>> Watermark: watermark based on system time: {},
>>> periodicEmitWatermarkTime: {}, currentMaxTimestamp: {}"
>>> , watermarkEpochTime, periodicEmitWatermarkTime,
>>> currentMaxTimestamp);
>>> }
>>> }
>>>
>>>
>>> This is my log printed by the slf4j log above. Every time, it
>>> will give
>>> me 8 records, why it is 8 records? I think it should be 1 in
>>> theory. I
>>> am very confused. Also, the policy 1 is advancing all 8 records.
>>> Policy
>>> 2 is advancing 1 of the 8 records and not reflected in
>>> processElement().
>>>
>>> 14:28:01,199 INFO
>>> org.myorg.quickstart.operator.PeriodicTableOutputWatermarkGenerator
>>> -
>>> Emit Watermark: watermark based on system time: 1605047266198,
>>> periodicEmitWatermarkTime: 0, currentMaxTimestamp: 15000
>>> 14:28:01,199 INFO
>>> org.myorg.quickstart.operator.PeriodicTableOutputWatermarkGenerator
>>> -
>>> Emit Watermark: watermark based on system time: 1605047266199,
>>> periodicEmitWatermarkTime: 1605047172881, currentMaxTimestamp:
>>> 1605047187881
>>> 14:28:01,199 INFO
>>> org.myorg.quickstart.operator.PeriodicTableOutputWatermarkGenerator
>>> -
>>> Emit Watermark: watermark based on system time: 1605047266199,
>>> periodicEmitWatermarkTime: 0, currentMaxTimestamp: 15000
>>> 14:28:01,199 INFO
>>> org.myorg.quickstart.operator.PeriodicTableOutputWatermarkGenerator
>>> -
>>> Emit Watermark: watermark based on system time: 1605047266198,
>>> periodicEmitWatermarkTime: 0, currentMaxTimestamp: 15000
>>> 14:28:01,199 INFO
>>> org.myorg.quickstart.operator.PeriodicTableOutputWatermarkGenerator
>>> -
>>> Emit Watermark: watermark based on system time: 1605047266198,
>>> periodicEmitWatermarkTime: 0, currentMaxTimestamp: 15000
>>> 14:28:01,199 INFO
>>> org.myorg.quickstart.operator.PeriodicTableOutputWatermarkGenerator
>>> -
>>> Emit Watermark: watermark based on system time: 1605047266198,
>>> periodicEmitWatermarkTime: 0, currentMaxTimestamp: 15000
>>> 14:28:01,199 INFO
>>> org.myorg.quickstart.operator.PeriodicTableOutputWatermarkGenerator
>>> -
>>> Emit Watermark: watermark based on system time: 1605047266198,
>>> periodicEmitWatermarkTime: 0, currentMaxTimestamp: 15000
>>> 14:28:01,199 INFO
>>> org.myorg.quickstart.operator.PeriodicTableOutputWatermarkGenerator
>>> -
>>> Emit Watermark: watermark based on system time: 1605047266198,
>>> periodicEmitWatermarkTime: 0, currentMaxTimestamp: 15000
>>>
>>> 14:28:06,200 INFO
>>> org.myorg.quickstart.operator.PeriodicTableOutputWatermarkGenerator
>>> -
>>> Emit Watermark: watermark based on system time: 1605047271200,
>>> periodicEmitWatermarkTime: 0, currentMaxTimestamp: 15000
>>> 14:28:06,200 INFO
>>> org.myorg.quickstart.operator.PeriodicTableOutputWatermarkGenerator
>>> -
>>> Emit Watermark: watermark based on system time: 1605047271200,
>>> periodicEmitWatermarkTime: 0, currentMaxTimestamp: 15000
>>> 14:28:06,200 INFO
>>> org.myorg.quickstart.operator.PeriodicTableOutputWatermarkGenerator
>>> -
>>> Emit Watermark: watermark based on system time: 1605047271200,
>>> periodicEmitWatermarkTime: 1605047172881, currentMaxTimestamp:
>>> 1605047187881
>>> 14:28:06,200 INFO
>>> org.myorg.quickstart.operator.PeriodicTableOutputWatermarkGenerator
>>> -
>>> Emit Watermark: watermark based on system time: 1605047271200,
>>> periodicEmitWatermarkTime: 0, currentMaxTimestamp: 15000
>>> 14:28:06,200 INFO
>>> org.myorg.quickstart.operator.PeriodicTableOutputWatermarkGenerator
>>> -
>>> Emit Watermark: watermark based on system time: 1605047271200,
>>> periodicEmitWatermarkTime: 0, currentMaxTimestamp: 15000
>>> 14:28:06,200 INFO
>>> org.myorg.quickstart.operator.PeriodicTableOutputWatermarkGenerator
>>> -
>>> Emit Watermark: watermark based on system time: 1605047271200,
>>> periodicEmitWatermarkTime: 0, currentMaxTimestamp: 15000
>>> 14:28:06,200 INFO
>>> org.myorg.quickstart.operator.PeriodicTableOutputWatermarkGenerator
>>> -
>>> Emit Watermark: watermark based on system time: 1605047271200,
>>> periodicEmitWatermarkTime: 0, currentMaxTimestamp: 15000
>>> 14:28:06,200 INFO
>>> org.myorg.quickstart.operator.PeriodicTableOutputWatermarkGenerator
>>> -
>>> Emit Watermark: watermark based on system time: 1605047271200,
>>> periodicEmitWatermarkTime: 0, currentMaxTimestamp: 15000
>>>
>>>
>>> Any insights? Thank you very much!
>>>
>>>
>>> Best,
>>>
>>> Fuyao
>>>