You are viewing a plain text version of this content. The canonical link for it is here.
Posted to user@flink.apache.org by HG <ha...@gmail.com> on 2022/03/16 09:05:37 UTC

Watermarks event time vs processing time

Hi,

I read from a Kafka topic events that are in JSON format
These event contain a handling time (aka event time) in epoch milliseconds,
a transaction_id and a large nested JSON structure.
I need to group the events by transaction_id, order them by handling time
and calculate the differences in handling time.
The events are updated with this calculated elapsed time and pushed
further.
So all events that go in should come out with the elapsed time added.

For testing I use events that are old (so handling time is not nearly the
wall clock time)
Initially I used EventTimeSessionWindows but somehow the processing did not
run as expected.
When I pushed 1000 events eventually 800 or so would appear at the output.
This was resolved by switching to ProcessingTimeSessionWindows .
My thought was then that I could remove the watermarkstrategies with
watermarks with timestamp assigners (handling time) for the Kafka input
stream and the data stream.
However this was not the case.

Can anyone enlighten me as to why the watermark strategies are still needed?

Below the code

        KafkaSource<ObjectNode> source = KafkaSource.<ObjectNode>builder()
                .setProperties(kafkaProps)
                .setProperty("ssl.truststore.type", trustStoreType)
                .setProperty("ssl.truststore.password", trustStorePassword)
                .setProperty("ssl.truststore.location", trustStoreLocation)
                .setProperty("security.protocol", securityProtocol)
                .setProperty("partition.discovery.interval.ms",
partitionDiscoveryIntervalMs)
                .setProperty("commit.offsets.on.checkpoint",
commitOffsetsOnCheckpoint)
                .setGroupId(inputGroupId)
                .setClientIdPrefix(clientId)
                .setTopics(kafkaInputTopic)
                .setDeserializer(KafkaRecordDeserializationSchema.of(new
JSONKeyValueDeserializationSchema(fetchMetadata)))

.setStartingOffsets(OffsetsInitializer.committedOffsets(OffsetResetStrategy.EARLIEST))
                .build();

        /* A watermark is needed to prevent duplicates! */
        WatermarkStrategy<ObjectNode> kafkaWmstrategy = WatermarkStrategy

.<ObjectNode>forBoundedOutOfOrderness(Duration.ofSeconds(Integer.parseInt(outOfOrderness)))

.withIdleness(Duration.ofSeconds(Integer.parseInt(idleness)))
                .withTimestampAssigner(new
SerializableTimestampAssigner<ObjectNode>() {
                    @Override
                    public long extractTimestamp(ObjectNode element, long
eventTime) {
                        return
element.get("value").get("handling_time").asLong();
                    }
                });

        /* Use the watermark stragegy to create a datastream */
        DataStream<ObjectNode> ds = env.fromSource(source, kafkaWmstrategy,
"Kafka Source");

        /* Split the ObjectNode into a Tuple4 */
        DataStream<Tuple4<Long, Long, String, String>> tuple4ds =
ds.flatMap(new Splitter());

        WatermarkStrategy<Tuple4<Long, Long, String, String>> wmStrategy =
WatermarkStrategy
                .<Tuple4<Long, Long, String,
String>>forBoundedOutOfOrderness(Duration.ofSeconds(Integer.parseInt(outOfOrderness)))

.withIdleness(Duration.ofSeconds(Integer.parseInt(idleness)))
                .withTimestampAssigner(new
SerializableTimestampAssigner<Tuple4<Long, Long, String, String>>() {
                    @Override
                    public long extractTimestamp(Tuple4<Long, Long, String,
String> element, long eventTime) {
                        return element.f0;
                    }
                });

        DataStream<Tuple4<Long, Long, String, String>> tuple4dswm =
tuple4ds.assignTimestampsAndWatermarks(wmStrategy);

        DataStream<String>  tuple4DsWmKeyedbytr =  tuple4dswm
            .keyBy(new KeySelector<Tuple4<Long, Long, String, String>,
String>() {
                @Override
                public String getKey(Tuple4<Long, Long, String, String>
value) throws Exception {
                    return value.f2;
                }
            })

.window(ProcessingTimeSessionWindows.withGap(Time.seconds(Integer.parseInt(sessionWindowGap))))

.allowedLateness(Time.seconds(Integer.parseInt(sessionAllowedLateness)))
            .process(new MyProcessWindowFunction());


        KafkaSink<String> kSink = KafkaSink.<String>builder()
                .setBootstrapServers(outputBrokers)

.setRecordSerializer(KafkaRecordSerializationSchema.builder()
                        .setTopic(kafkaOutputTopic)
                        .setValueSerializationSchema(new
SimpleStringSchema())
                        .build()
                )
                .setDeliverGuarantee(DeliveryGuarantee.AT_LEAST_ONCE)
                .build();

        // Sink to the Kafka topic
        tuple4DsWmKeyedbytr.sinkTo(kSink);

RE: Watermarks event time vs processing time

Posted by Schwalbe Matthias <Ma...@viseca.ch>.
Hi Hanspeter,

Event time mode should work just the same … for your example below you your need only one single arbitrary event per kafka partition that has a timestamp > 1646992800560 + sessionWindowGap + outOfOrderness in order for the session window to be triggered.

I’m not sure why processing time window does not work without watermarking configured (I never use processing time mode).
You need to consider what consistency guaranties you need in processing time mode: in case the job fails and is restarted (or if network i/o exhibits short hickups beyond your session gap), then you might get results that split a single transaction_id into multiple session windows …
The choice is yours 😊

As to the aggregation method: current event time – last event time … not min/max … otherwise not different 😊

If you want to find out why event time mode blocks you might find monitoring of the watermarks on single operators / per subtask useful:
Look for subtasks that don’t have watermarks, or too low watermarks for a specific session window to trigger.


Thias


From: HG <ha...@gmail.com>
Sent: Mittwoch, 16. März 2022 16:41
To: Schwalbe Matthias <Ma...@viseca.ch>
Cc: user <us...@flink.apache.org>
Subject: Re: Watermarks event time vs processing time

⚠EXTERNAL MESSAGE – CAUTION: Think Before You Click ⚠


Hi Matthias and others

Thanks for the answer.
I will remove the Idleness.
However I am not doing max/min etc. Unfortunately most examples are about aggregations.

The inputs are like this
{"handling_time":1646992800260,"transaction_id":"0000017f6af1548e-119dfb",........}
{"handling_time":1646992800290,"transaction_id":"0000017f6af1548e-119dfb",........}
{"handling_time":1646992800360,"transaction_id":"0000017f6af1548e-119dfb",........}
{"handling_time":1646992800560,"transaction_id":"0000017f6af1548e-119dfb",........}
The output like this
{"handling_time":1646992800260,"transaction_id":"0000017f6af1548e-119dfb","elapse":0,........}
{"handling_time":1646992800290,"transaction_id":"0000017f6af1548e-119dfb","elapse":30,........}
{"handling_time":1646992800360,"transaction_id":"0000017f6af1548e-119dfb","elapse":70,........}
{"handling_time":1646992800560,"transaction_id":"0000017f6af1548e-119dfb",,"elapse":200........}

I started with handling_time as timestamp. But that did not workout well. I don't know why.
Then I switched to session processing time. Which is also OK because the outcomes of the elapsed time does not rely on the event time.

Then I thought 'let me remove the kafka watermark assigner.
But as soon as I did that no events would appear at the sink.
So I left both watermark timestamp assigners in place.
They do no harm it seems and leaving them out appears to do. It is not ideal but it works..
But I'd rather know the correct way how to set it up.

Regards Hans-Peter








Op wo 16 mrt. 2022 om 15:52 schreef Schwalbe Matthias <Ma...@viseca.ch>>:
Hi Hanspeter,

Let me relate some hints that might help you getting concepts clearer.

From your description I make following assumptions where your are not specific enough (please confirm or correct in your answer):

a.       You store incoming events in state per transaction_id to be sorted/aggregated(min/max time) by event time later on

b.       So far you used a session window to determine the point in time when to emit the stored/enriched/sorted events

c.        Watermarks are generated with bounded out of orderness

d.       You use session windows with a specific gap

e.       In your experiment you ever only send 1000 events and then stop producing incoming events

Now to your questions:

  *   For processing time session windows, watermarks play no role whatsoever, you simply assume that you’ve seen all events belonging so a single transaction id if the last such event for a specific transaction id was processed sessionWindowGap milliseconds ago
  *   Therefore you see all enriched incoming events the latest sessionWindowGap ms after the last incoming event (+ some latency)
  *   In event time mode and resp event time session windows the situation is exactly the same, only that processing time play no role
  *   A watermark means (ideally) that no event older than the watermark time ever follows the watermark (which itself is a meta-event that flows with the proper events on the same channels)
  *   In order for a session gap window to forward the enriched events the window operator needs to receive a watermark that is sessionWindowGap milliseconds beyond the latest incoming event (in terms of the respective event time)
  *   The watermark generator in order to generate a new watermark that triggers this last session window above needs to encounter an (any) event that has a timestamp of (<latest event in session window> + outOfOrderness + sessionWindowGap + 1ms)
  *   Remember, the watermark generator never generated watermarks based on processing time, but only based on the timestamps it has seen in events actually encountered
  *   Coming back to your idleness configuration: it only means that the incoming stream becomes idle == timeless after a while … i.e. watermarks won’t make progress from this steam, and it tells all downstream operators
  *   Idleness specification is only useful if a respective operator has another source of valid watermarks (i.e. after a union of two streams, one active/one idle ….). this is not your case

I hope this clarifies your situation.

Cheers


Thias


From: HG <ha...@gmail.com>>
Sent: Mittwoch, 16. März 2022 10:06
To: user <us...@flink.apache.org>>
Subject: Watermarks event time vs processing time

⚠EXTERNAL MESSAGE – CAUTION: Think Before You Click ⚠


Hi,

I read from a Kafka topic events that are in JSON format
These event contain a handling time (aka event time) in epoch milliseconds, a transaction_id and a large nested JSON structure.
I need to group the events by transaction_id, order them by handling time and calculate the differences in handling time.
The events are updated with this calculated elapsed time and pushed further.
So all events that go in should come out with the elapsed time added.

For testing I use events that are old (so handling time is not nearly the wall clock time)
Initially I used EventTimeSessionWindows but somehow the processing did not run as expected.
When I pushed 1000 events eventually 800 or so would appear at the output.
This was resolved by switching to ProcessingTimeSessionWindows .
My thought was then that I could remove the watermarkstrategies with watermarks with timestamp assigners (handling time) for the Kafka input stream and the data stream.
However this was not the case.

Can anyone enlighten me as to why the watermark strategies are still needed?

Below the code

        KafkaSource<ObjectNode> source = KafkaSource.<ObjectNode>builder()
                .setProperties(kafkaProps)
                .setProperty("ssl.truststore.type", trustStoreType)
                .setProperty("ssl.truststore.password", trustStorePassword)
                .setProperty("ssl.truststore.location", trustStoreLocation)
                .setProperty("security.protocol", securityProtocol)
                .setProperty("partition.discovery.interval.ms<http://partition.discovery.interval.ms>", partitionDiscoveryIntervalMs)
                .setProperty("commit.offsets.on.checkpoint", commitOffsetsOnCheckpoint)
                .setGroupId(inputGroupId)
                .setClientIdPrefix(clientId)
                .setTopics(kafkaInputTopic)
                .setDeserializer(KafkaRecordDeserializationSchema.of(new JSONKeyValueDeserializationSchema(fetchMetadata)))
                .setStartingOffsets(OffsetsInitializer.committedOffsets(OffsetResetStrategy.EARLIEST))
                .build();

        /* A watermark is needed to prevent duplicates! */
        WatermarkStrategy<ObjectNode> kafkaWmstrategy = WatermarkStrategy
                .<ObjectNode>forBoundedOutOfOrderness(Duration.ofSeconds(Integer.parseInt(outOfOrderness)))
                .withIdleness(Duration.ofSeconds(Integer.parseInt(idleness)))
                .withTimestampAssigner(new SerializableTimestampAssigner<ObjectNode>() {
                    @Override
                    public long extractTimestamp(ObjectNode element, long eventTime) {
                        return element.get("value").get("handling_time").asLong();
                    }
                });

        /* Use the watermark stragegy to create a datastream */
        DataStream<ObjectNode> ds = env.fromSource(source, kafkaWmstrategy, "Kafka Source");

        /* Split the ObjectNode into a Tuple4 */
        DataStream<Tuple4<Long, Long, String, String>> tuple4ds = ds.flatMap(new Splitter());

        WatermarkStrategy<Tuple4<Long, Long, String, String>> wmStrategy = WatermarkStrategy
                .<Tuple4<Long, Long, String, String>>forBoundedOutOfOrderness(Duration.ofSeconds(Integer.parseInt(outOfOrderness)))
                .withIdleness(Duration.ofSeconds(Integer.parseInt(idleness)))
                .withTimestampAssigner(new SerializableTimestampAssigner<Tuple4<Long, Long, String, String>>() {
                    @Override
                    public long extractTimestamp(Tuple4<Long, Long, String, String> element, long eventTime) {
                        return element.f0;
                    }
                });

        DataStream<Tuple4<Long, Long, String, String>> tuple4dswm = tuple4ds.assignTimestampsAndWatermarks(wmStrategy);

        DataStream<String>  tuple4DsWmKeyedbytr =  tuple4dswm
            .keyBy(new KeySelector<Tuple4<Long, Long, String, String>, String>() {
                @Override
                public String getKey(Tuple4<Long, Long, String, String> value) throws Exception {
                    return value.f2;
                }
            })
            .window(ProcessingTimeSessionWindows.withGap(Time.seconds(Integer.parseInt(sessionWindowGap))))
            .allowedLateness(Time.seconds(Integer.parseInt(sessionAllowedLateness)))
            .process(new MyProcessWindowFunction());


        KafkaSink<String> kSink = KafkaSink.<String>builder()
                .setBootstrapServers(outputBrokers)
                .setRecordSerializer(KafkaRecordSerializationSchema.builder()
                        .setTopic(kafkaOutputTopic)
                        .setValueSerializationSchema(new SimpleStringSchema())
                        .build()
                )
                .setDeliverGuarantee(DeliveryGuarantee.AT_LEAST_ONCE)
                .build();

        // Sink to the Kafka topic
        tuple4DsWmKeyedbytr.sinkTo(kSink);
Diese Nachricht ist ausschliesslich für den Adressaten bestimmt und beinhaltet unter Umständen vertrauliche Mitteilungen. Da die Vertraulichkeit von e-Mail-Nachrichten nicht gewährleistet werden kann, übernehmen wir keine Haftung für die Gewährung der Vertraulichkeit und Unversehrtheit dieser Mitteilung. Bei irrtümlicher Zustellung bitten wir Sie um Benachrichtigung per e-Mail und um Löschung dieser Nachricht sowie eventueller Anhänge. Jegliche unberechtigte Verwendung oder Verbreitung dieser Informationen ist streng verboten.

This message is intended only for the named recipient and may contain confidential or privileged information. As the confidentiality of email communication cannot be guaranteed, we do not accept any responsibility for the confidentiality and the intactness of this message. If you have received it in error, please advise the sender by return e-mail and delete this message and any attachments. Any unauthorised use or dissemination of this information is strictly prohibited.
Diese Nachricht ist ausschliesslich für den Adressaten bestimmt und beinhaltet unter Umständen vertrauliche Mitteilungen. Da die Vertraulichkeit von e-Mail-Nachrichten nicht gewährleistet werden kann, übernehmen wir keine Haftung für die Gewährung der Vertraulichkeit und Unversehrtheit dieser Mitteilung. Bei irrtümlicher Zustellung bitten wir Sie um Benachrichtigung per e-Mail und um Löschung dieser Nachricht sowie eventueller Anhänge. Jegliche unberechtigte Verwendung oder Verbreitung dieser Informationen ist streng verboten.

This message is intended only for the named recipient and may contain confidential or privileged information. As the confidentiality of email communication cannot be guaranteed, we do not accept any responsibility for the confidentiality and the intactness of this message. If you have received it in error, please advise the sender by return e-mail and delete this message and any attachments. Any unauthorised use or dissemination of this information is strictly prohibited.

Re: Watermarks event time vs processing time

Posted by HG <ha...@gmail.com>.
Hi Matthias and others

Thanks for the answer.
I will remove the Idleness.
However I am not doing max/min etc. Unfortunately most examples are about
aggregations.

The inputs are like this
{"handling_time":1646992800260,"transaction_id":"0000017f6af1548e-119dfb",........}
{"handling_time":1646992800290,"transaction_id":"0000017f6af1548e-119dfb",........}
{"handling_time":1646992800360,"transaction_id":"0000017f6af1548e-119dfb",........}
{"handling_time":1646992800560,"transaction_id":"0000017f6af1548e-119dfb",........}
The output like this
{"handling_time":1646992800260,"transaction_id":"0000017f6af1548e-119dfb","elapse":0,........}
{"handling_time":1646992800290,"transaction_id":"0000017f6af1548e-119dfb","elapse":30,........}
{"handling_time":1646992800360,"transaction_id":"0000017f6af1548e-119dfb","elapse":70,........}
{"handling_time":1646992800560,"transaction_id":"0000017f6af1548e-119dfb",,"elapse":200........}

I started with handling_time as timestamp. But that did not workout well. I
don't know why.
Then I switched to session processing time. Which is also OK because the
outcomes of the elapsed time does not rely on the event time.

Then I thought 'let me remove the kafka watermark assigner.
But as soon as I did that no events would appear at the sink.
So I left both watermark timestamp assigners in place.
They do no harm it seems and leaving them out appears to do. It is not
ideal but it works..
But I'd rather know the correct way how to set it up.

Regards Hans-Peter








Op wo 16 mrt. 2022 om 15:52 schreef Schwalbe Matthias <
Matthias.Schwalbe@viseca.ch>:

> Hi Hanspeter,
>
>
>
> Let me relate some hints that might help you getting concepts clearer.
>
>
>
> From your description I make following assumptions where your are not
> specific enough (please confirm or correct in your answer):
>
>    1. You store incoming events in state per transaction_id to be
>    sorted/aggregated(min/max time) by event time later on
>    2. So far you used a session window to determine the point in time
>    when to emit the stored/enriched/sorted events
>    3. Watermarks are generated with bounded out of orderness
>    4. You use session windows with a specific gap
>    5. In your experiment you ever only send 1000 events and then stop
>    producing incoming events
>
>
>
> Now to your questions:
>
>    - For processing time session windows, watermarks play no role
>    whatsoever, you simply assume that you’ve seen all events belonging so a
>    single transaction id if the last such event for a specific transaction id
>    was processed sessionWindowGap milliseconds ago
>    - Therefore you see all enriched incoming events the latest
>    sessionWindowGap ms after the last incoming event (+ some latency)
>    - In event time mode and resp event time session windows the situation
>    is exactly the same, only that processing time play no role
>    - A watermark means (ideally) that no event older than the watermark
>    time ever follows the watermark (which itself is a meta-event that flows
>    with the proper events on the same channels)
>    - In order for a session gap window to forward the enriched events the
>    window operator needs to receive a watermark that is sessionWindowGap
>    milliseconds beyond the latest incoming event (in terms of the respective
>    event time)
>    - The watermark generator in order to generate a new watermark that
>    triggers this last session window above needs to encounter an (any) event
>    that has a timestamp of (<latest event in session window> + outOfOrderness
>    + sessionWindowGap + 1ms)
>    - Remember, the watermark generator never generated watermarks based
>    on processing time, but only based on the timestamps it has seen in events
>    actually encountered
>    - Coming back to your idleness configuration: it only means that the
>    incoming stream becomes idle == timeless after a while … i.e. watermarks
>    won’t make progress from this steam, and it tells all downstream operators
>    - Idleness specification is only useful if a respective operator has
>    another source of valid watermarks (i.e. after a union of two streams, one
>    active/one idle ….). this is not your case
>
>
>
> I hope this clarifies your situation.
>
>
>
> Cheers
>
>
>
>
>
> Thias
>
>
>
>
>
> *From:* HG <ha...@gmail.com>
> *Sent:* Mittwoch, 16. März 2022 10:06
> *To:* user <us...@flink.apache.org>
> *Subject:* Watermarks event time vs processing time
>
>
>
> ⚠*EXTERNAL MESSAGE – **CAUTION: Think Before You Click *⚠
>
>
>
> Hi,
>
>
>
> I read from a Kafka topic events that are in JSON format
>
> These event contain a handling time (aka event time) in epoch
> milliseconds, a transaction_id and a large nested JSON structure.
>
> I need to group the events by transaction_id, order them by handling time
> and calculate the differences in handling time.
>
> The events are updated with this calculated elapsed time and pushed
> further.
>
> So all events that go in should come out with the elapsed time added.
>
>
>
> For testing I use events that are old (so handling time is not nearly the
> wall clock time)
>
> Initially I used EventTimeSessionWindows but somehow the processing did
> not run as expected.
>
> When I pushed 1000 events eventually 800 or so would appear at the output.
> This was resolved by switching to ProcessingTimeSessionWindows .
> My thought was then that I could remove the watermarkstrategies with
> watermarks with timestamp assigners (handling time) for the Kafka input
> stream and the data stream.
>
> However this was not the case.
>
>
>
> Can anyone enlighten me as to why the watermark strategies are still
> needed?
>
>
>
> Below the code
>
>
>
>         KafkaSource<ObjectNode> source = KafkaSource.<ObjectNode>builder()
>                 .setProperties(kafkaProps)
>                 .setProperty("ssl.truststore.type", trustStoreType)
>                 .setProperty("ssl.truststore.password", trustStorePassword)
>                 .setProperty("ssl.truststore.location", trustStoreLocation)
>                 .setProperty("security.protocol", securityProtocol)
>                 .setProperty("partition.discovery.interval.ms",
> partitionDiscoveryIntervalMs)
>                 .setProperty("commit.offsets.on.checkpoint",
> commitOffsetsOnCheckpoint)
>                 .setGroupId(inputGroupId)
>                 .setClientIdPrefix(clientId)
>                 .setTopics(kafkaInputTopic)
>                 .setDeserializer(KafkaRecordDeserializationSchema.of(new
> JSONKeyValueDeserializationSchema(fetchMetadata)))
>
> .setStartingOffsets(OffsetsInitializer.committedOffsets(OffsetResetStrategy.EARLIEST))
>                 .build();
>
>         /* A watermark is needed to prevent duplicates! */
>         WatermarkStrategy<ObjectNode> kafkaWmstrategy = WatermarkStrategy
>
> .<ObjectNode>forBoundedOutOfOrderness(Duration.ofSeconds(Integer.parseInt(outOfOrderness)))
>
> .withIdleness(Duration.ofSeconds(Integer.parseInt(idleness)))
>                 .withTimestampAssigner(new
> SerializableTimestampAssigner<ObjectNode>() {
>                     @Override
>                     public long extractTimestamp(ObjectNode element, long
> eventTime) {
>                         return
> element.get("value").get("handling_time").asLong();
>                     }
>                 });
>
>         /* Use the watermark stragegy to create a datastream */
>         DataStream<ObjectNode> ds = env.fromSource(source,
> kafkaWmstrategy, "Kafka Source");
>
>         /* Split the ObjectNode into a Tuple4 */
>         DataStream<Tuple4<Long, Long, String, String>> tuple4ds =
> ds.flatMap(new Splitter());
>
>         WatermarkStrategy<Tuple4<Long, Long, String, String>> wmStrategy =
> WatermarkStrategy
>                 .<Tuple4<Long, Long, String,
> String>>forBoundedOutOfOrderness(Duration.ofSeconds(Integer.parseInt(outOfOrderness)))
>
> .withIdleness(Duration.ofSeconds(Integer.parseInt(idleness)))
>                 .withTimestampAssigner(new
> SerializableTimestampAssigner<Tuple4<Long, Long, String, String>>() {
>                     @Override
>                     public long extractTimestamp(Tuple4<Long, Long,
> String, String> element, long eventTime) {
>                         return element.f0;
>                     }
>                 });
>
>         DataStream<Tuple4<Long, Long, String, String>> tuple4dswm =
> tuple4ds.assignTimestampsAndWatermarks(wmStrategy);
>
>         DataStream<String>  tuple4DsWmKeyedbytr =  tuple4dswm
>             .keyBy(new KeySelector<Tuple4<Long, Long, String, String>,
> String>() {
>                 @Override
>                 public String getKey(Tuple4<Long, Long, String, String>
> value) throws Exception {
>                     return value.f2;
>                 }
>             })
>
> .window(ProcessingTimeSessionWindows.withGap(Time.seconds(Integer.parseInt(sessionWindowGap))))
>
> .allowedLateness(Time.seconds(Integer.parseInt(sessionAllowedLateness)))
>             .process(new MyProcessWindowFunction());
>
>
>         KafkaSink<String> kSink = KafkaSink.<String>builder()
>                 .setBootstrapServers(outputBrokers)
>
> .setRecordSerializer(KafkaRecordSerializationSchema.builder()
>                         .setTopic(kafkaOutputTopic)
>                         .setValueSerializationSchema(new
> SimpleStringSchema())
>                         .build()
>                 )
>                 .setDeliverGuarantee(DeliveryGuarantee.AT_LEAST_ONCE)
>                 .build();
>
>         // Sink to the Kafka topic
>         tuple4DsWmKeyedbytr.sinkTo(kSink);
> Diese Nachricht ist ausschliesslich für den Adressaten bestimmt und
> beinhaltet unter Umständen vertrauliche Mitteilungen. Da die
> Vertraulichkeit von e-Mail-Nachrichten nicht gewährleistet werden kann,
> übernehmen wir keine Haftung für die Gewährung der Vertraulichkeit und
> Unversehrtheit dieser Mitteilung. Bei irrtümlicher Zustellung bitten wir
> Sie um Benachrichtigung per e-Mail und um Löschung dieser Nachricht sowie
> eventueller Anhänge. Jegliche unberechtigte Verwendung oder Verbreitung
> dieser Informationen ist streng verboten.
>
> This message is intended only for the named recipient and may contain
> confidential or privileged information. As the confidentiality of email
> communication cannot be guaranteed, we do not accept any responsibility for
> the confidentiality and the intactness of this message. If you have
> received it in error, please advise the sender by return e-mail and delete
> this message and any attachments. Any unauthorised use or dissemination of
> this information is strictly prohibited.
>

Re: Watermarks event time vs processing time

Posted by HG <ha...@gmail.com>.
Hello Matthias,

I am still using ProcessingTimeSessionWindow.
But it turns out I was wrong.
I tested a couple of times and it did not seem to work.
But now it does with both watermarkstrategies removed.

My apologies.'
Regards Hans-Peter

This is the code:

        StreamExecutionEnvironment env =
StreamExecutionEnvironment.getExecutionEnvironment();
        env.setMaxParallelism(Integer.parseInt(envMaxParallelism));
        env.setRuntimeMode(RuntimeExecutionMode.STREAMING);

env.getCheckpointConfig().setCheckpointingMode(CheckpointingMode.EXACTLY_ONCE);
        env.enableCheckpointing(Integer.parseInt(envEnableCheckpointing));


        Properties kafkaProps  = new Properties();
        kafkaProps.setProperty(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG,
inputBrokers);
        kafkaProps.setProperty(ConsumerConfig.GROUP_ID_CONFIG,
inputGroupId);
        kafkaProps.setProperty(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG,
autoCommit);

kafkaProps.setProperty(ConsumerConfig.AUTO_COMMIT_INTERVAL_MS_CONFIG,
autoCommitInterval);
        kafkaProps.setProperty("ssl.truststore.type", inputTrustStoreType);
        kafkaProps.setProperty("ssl.truststore.password",
inputTrustStorePassword);
        kafkaProps.setProperty("ssl.truststore.location",
inputTrustStoreLocation);
        kafkaProps.setProperty("security.protocol", inputSecurityProtocol);
        kafkaProps.setProperty("ssl.enabled.protocols",
inputSslEnabledProtocols);

        KafkaSource<ObjectNode> source = KafkaSource.<ObjectNode>builder()
                .setProperties(kafkaProps)
                .setGroupId(inputGroupId)
                .setClientIdPrefix(clientId)
                .setTopics(kafkaInputTopic)
                .setDeserializer(KafkaRecordDeserializationSchema.of(new
JSONKeyValueDeserializationSchema(fetchMetadata)))

.setStartingOffsets(OffsetsInitializer.committedOffsets(OffsetResetStrategy.EARLIEST))
                .build();


        /* Use the watermark stragegy to create a datastream */
        DataStream<ObjectNode> ds = env.fromSource(source,
WatermarkStrategy.noWatermarks(), "Kafka Source");

        /* Split the ObjectNode into a Tuple4 */
        DataStream<Tuple4<Long, Long, String, String>> tuple4ds =
ds.flatMap(new Splitter())

        DataStream<String>  tuple4DsWmKeyedbytr =  tuple4ds
            .keyBy(new KeySelector<Tuple4<Long, Long, String, String>,
String>() {
                @Override
                public String getKey(Tuple4<Long, Long, String, String>
value) throws Exception {
                    return value.f2;
                }
            })

.window(ProcessingTimeSessionWindows.withGap(Time.seconds(Integer.parseInt(sessionWindowGap))))

.allowedLateness(Time.seconds(Integer.parseInt(sessionAllowedLateness)))
            .process(new MyProcessWindowFunction());



        Properties sinkkafkaProps  = new Properties();
        sinkkafkaProps.setProperty(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG,
outputBrokers);
        sinkkafkaProps.setProperty("ssl.truststore.type",
outputTrustStoreType);
        sinkkafkaProps.setProperty("ssl.truststore.location",
outputTrustStoreLocation);
        sinkkafkaProps.setProperty("ssl.truststore.password",
outputTrustStorePassword);
        sinkkafkaProps.setProperty("security.protocol",
outputSecurityProtocol);
        sinkkafkaProps.setProperty("max.request.size", maxRequestSize);
        sinkkafkaProps.setProperty("ssl.enabled.protocols",
outputSslEnabledProtocols);


        KafkaSink<String> kSink = KafkaSink.<String>builder()
                    .setBootstrapServers(outputBrokers)
                    .setKafkaProducerConfig(sinkkafkaProps)

.setRecordSerializer(KafkaRecordSerializationSchema.builder()
                            .setTopic(kafkaOutputTopic)
                            .setValueSerializationSchema(new
SimpleStringSchema())
                            .build()
                    )
                    .setDeliverGuarantee(DeliveryGuarantee.AT_LEAST_ONCE)
                    .build();

            // Sink to the Kafka topic
            tuple4DsWmKeyedbytr.sinkTo(kSink);


    // Splits the Object node into a Tuple4
    private static class Splitter implements FlatMapFunction<ObjectNode,
Tuple4<Long, Long, String, String>> {
        @Override
        public void flatMap(ObjectNode json, Collector<Tuple4<Long, Long,
String, String>> out) throws Exception {
            // retrieved handling_time twice intentionally one of which
will be used for the watermark strategy and the other for the calculation
of the elapsed time
            out.collect(new Tuple4<Long, Long, String,
String>(json.get("value").get("handling_time").asLong(),
json.get("value").get("handling_time").asLong(),
json.get("value").get("transaction_id").asText(),
 json.get("value").get("original_event").toPrettyString()));
        }
    }

    // Class to sort the events that belong to the same transactions
    public static class SortEventsHandlingTime implements
Comparator<Tuple4<Long, Long, String, String>> {

        // Let's compare 2 Tuple4 objects
        public int compare(Tuple4<Long, Long, String, String> o1,
Tuple4<Long, Long, String, String> o2) {
            int result =
Long.compare(Long.parseLong(o1.getField(0).toString()),
Long.parseLong(o2.getField(0).toString()));
            if (result > 0) {
                return 1;
            } else if (result == 0) {
                return 0;
            } else {
                return -1;
            }
        }
    }

    // Sorts the events and calculates the elapsed times
    static class MyProcessWindowFunction extends
ProcessWindowFunction<Tuple4<Long, Long, String, String>, String, String,
TimeWindow> {
        @Override
        public void process(String key, Context context,
Iterable<Tuple4<Long, Long, String, String>> input, Collector<String> out)
throws JsonProcessingException {
            Long elapsed       = 0L;
            Long pHandlingTime = 0L;
            Long cumulativeElapsed  = 0L;

            List<Tuple4<Long, Long, String, String>> inputList = new
ArrayList<>();
            input.forEach(inputList::add);
            inputList.sort(new SortEventsHandlingTime());

            ObjectMapper mapper = new ObjectMapper();

            for (Tuple4<Long, Long, String, String> in: inputList){

                if (pHandlingTime.equals(0L)) {
                    elapsed       = 0L;
                } else {
                    elapsed       =
Long.parseLong(in.getField(0).toString()) - pHandlingTime;
                }
                cumulativeElapsed  = cumulativeElapsed + elapsed;
                pHandlingTime = Long.parseLong(in.getField(0).toString());

                JsonNode originalEvent =
mapper.readTree(in.getField(3).toString());

                // Cast
                ObjectNode o = (ObjectNode)
originalEvent.get("Message").get("endpoints").get(0).get("endpoint_handlers").get(0);
                o.put("handling_time", in.getField(0).toString());
                o.put("elapsed_time", elapsed.toString());
                o.put("cumulative_elapsed_time",
cumulativeElapsed.toString());

                out.collect(((ObjectNode) originalEvent).toString());
            }
        }
    }


Op di 29 mrt. 2022 om 15:23 schreef Schwalbe Matthias <
Matthias.Schwalbe@viseca.ch>:

> Hello Hans-Peter,
>
>
>
> I’m a little confused which version of your code you are testing against:
>
>    - ProcessingTimeSessionWindows or EventTimeSessionWindows?
>    - did you keep the withIdleness() ??
>
>
>
> As said before:
>
>    - for ProcessingTimeSessionWindows, watermarks play no role
>    - if you keep withIdleness(), then the respective sparse DataStream is
>    event-time-less most of the time, i.e. no triggers fire to close a session
>    window
>    - withIdleness() makes only sense if you merge/union/connect multiple
>    DataStream where at least one stream has their watermarks updated regularly
>    (i.e. it is not withIdleness())
>       - this is not your case, your DAG is linear, no union nor connects
>    - in event-time mode processing time plays no role, watermarks
>    exclusively take the role of the progress of model (event) time and hence
>    the triggering of windows
>    - in order to trigger a (session-)window at time A the window operator
>    needs to receive a watermark of at least time A
>    - next catch regards partitioning
>       - your first watermark strategy kafkaWmstrategy generates
>       per-Kafka-partition watermarks
>       - a keyBy() reshuffles these partitions onto the number of subtasks
>       according to the hash of the key
>       - this results in a per subtask calculation of the *lowest*
>       watermark of all Kafka partitions that happen to be processed by that
>       subtask
>       - i.e. if a single Kafka partition makes no watermark progress the
>       subtask watermark makes no progress
>       - this surfaces in sparse data as in your case
>    - your second watermark strategy wmStrategy makes things worse because
>       - it discards the correct watermarks of the first watermark strategy
>       - and replaces it with something that is arbitrary (at this point
>       it is hard to guess the correct max lateness that is a mixture of the
>       events from multiple Kafka partitions)
>
>
>
> Concusion:
>
> The only way to make the event time session windows work for you in a
> timely manner is to make sure watermarks on all involved partitions make
> progress, i.e. new events arrive on all partitions in a regular manner.
>
>
>
> Hope this helps
>
>
>
> Thias
>
>
>
>
>
> *From:* HG <ha...@gmail.com>
> *Sent:* Tuesday, March 29, 2022 1:07 PM
> *To:* Schwalbe Matthias <Ma...@viseca.ch>
> *Cc:* user <us...@flink.apache.org>
> *Subject:* Re: Watermarks event time vs processing time
>
>
>
> ⚠*EXTERNAL MESSAGE – **CAUTION: Think Before You Click *⚠
>
>
>
> Hello Matthias,
>
>
>
> When I remove all the watermark strategies it does not process anything .
>
> For example when I use WatermarkStrategy*.*noWatermarks*()* instead of
> the one I build nothing seems to happen at all.
>
>
>
>  Also when I skip the part where I add wmStrategy  to create tuple4dswm:
>  DataStream<Tuple4<Long, Long, String, String>> tuple4dswm =
> tuple4ds.assignTimestampsAndWatermarks(wmStrategy);
>
>
>
> Nothing is processed.
>
>
>
> Regards Hans-Peter
>
>
>
> Op wo 16 mrt. 2022 om 15:52 schreef Schwalbe Matthias <
> Matthias.Schwalbe@viseca.ch>:
>
> Hi Hanspeter,
>
>
>
> Let me relate some hints that might help you getting concepts clearer.
>
>
>
> From your description I make following assumptions where your are not
> specific enough (please confirm or correct in your answer):
>
> a.       You store incoming events in state per transaction_id to be
> sorted/aggregated(min/max time) by event time later on
>
> b.       So far you used a session window to determine the point in time
> when to emit the stored/enriched/sorted events
>
> c.        Watermarks are generated with bounded out of orderness
>
> d.       You use session windows with a specific gap
>
> e.       In your experiment you ever only send 1000 events and then stop
> producing incoming events
>
>
>
> Now to your questions:
>
>    - For processing time session windows, watermarks play no role
>    whatsoever, you simply assume that you’ve seen all events belonging so a
>    single transaction id if the last such event for a specific transaction id
>    was processed sessionWindowGap milliseconds ago
>    - Therefore you see all enriched incoming events the latest
>    sessionWindowGap ms after the last incoming event (+ some latency)
>    - In event time mode and resp event time session windows the situation
>    is exactly the same, only that processing time play no role
>    - A watermark means (ideally) that no event older than the watermark
>    time ever follows the watermark (which itself is a meta-event that flows
>    with the proper events on the same channels)
>    - In order for a session gap window to forward the enriched events the
>    window operator needs to receive a watermark that is sessionWindowGap
>    milliseconds beyond the latest incoming event (in terms of the respective
>    event time)
>    - The watermark generator in order to generate a new watermark that
>    triggers this last session window above needs to encounter an (any) event
>    that has a timestamp of (<latest event in session window> + outOfOrderness
>    + sessionWindowGap + 1ms)
>    - Remember, the watermark generator never generated watermarks based
>    on processing time, but only based on the timestamps it has seen in events
>    actually encountered
>    - Coming back to your idleness configuration: it only means that the
>    incoming stream becomes idle == timeless after a while … i.e. watermarks
>    won’t make progress from this steam, and it tells all downstream operators
>    - Idleness specification is only useful if a respective operator has
>    another source of valid watermarks (i.e. after a union of two streams, one
>    active/one idle ….). this is not your case
>
>
>
> I hope this clarifies your situation.
>
>
>
> Cheers
>
>
>
>
>
> Thias
>
>
>
>
>
> *From:* HG <ha...@gmail.com>
> *Sent:* Mittwoch, 16. März 2022 10:06
> *To:* user <us...@flink.apache.org>
> *Subject:* Watermarks event time vs processing time
>
>
>
> ⚠*EXTERNAL MESSAGE – **CAUTION: Think Before You Click *⚠
>
>
>
> Hi,
>
>
>
> I read from a Kafka topic events that are in JSON format
>
> These event contain a handling time (aka event time) in epoch
> milliseconds, a transaction_id and a large nested JSON structure.
>
> I need to group the events by transaction_id, order them by handling time
> and calculate the differences in handling time.
>
> The events are updated with this calculated elapsed time and pushed
> further.
>
> So all events that go in should come out with the elapsed time added.
>
>
>
> For testing I use events that are old (so handling time is not nearly the
> wall clock time)
>
> Initially I used EventTimeSessionWindows but somehow the processing did
> not run as expected.
>
> When I pushed 1000 events eventually 800 or so would appear at the output.
> This was resolved by switching to ProcessingTimeSessionWindows .
> My thought was then that I could remove the watermarkstrategies with
> watermarks with timestamp assigners (handling time) for the Kafka input
> stream and the data stream.
>
> However this was not the case.
>
>
>
> Can anyone enlighten me as to why the watermark strategies are still
> needed?
>
>
>
> Below the code
>
>
>
>         KafkaSource<ObjectNode> source = KafkaSource.<ObjectNode>builder()
>                 .setProperties(kafkaProps)
>                 .setProperty("ssl.truststore.type", trustStoreType)
>                 .setProperty("ssl.truststore.password", trustStorePassword)
>                 .setProperty("ssl.truststore.location", trustStoreLocation)
>                 .setProperty("security.protocol", securityProtocol)
>                 .setProperty("partition.discovery.interval.ms",
> partitionDiscoveryIntervalMs)
>                 .setProperty("commit.offsets.on.checkpoint",
> commitOffsetsOnCheckpoint)
>                 .setGroupId(inputGroupId)
>                 .setClientIdPrefix(clientId)
>                 .setTopics(kafkaInputTopic)
>                 .setDeserializer(KafkaRecordDeserializationSchema.of(new
> JSONKeyValueDeserializationSchema(fetchMetadata)))
>
> .setStartingOffsets(OffsetsInitializer.committedOffsets(OffsetResetStrategy.EARLIEST))
>                 .build();
>
>         /* A watermark is needed to prevent duplicates! */
>         WatermarkStrategy<ObjectNode> kafkaWmstrategy = WatermarkStrategy
>
> .<ObjectNode>forBoundedOutOfOrderness(Duration.ofSeconds(Integer.parseInt(outOfOrderness)))
>
> .withIdleness(Duration.ofSeconds(Integer.parseInt(idleness)))
>                 .withTimestampAssigner(new
> SerializableTimestampAssigner<ObjectNode>() {
>                     @Override
>                     public long extractTimestamp(ObjectNode element, long
> eventTime) {
>                         return
> element.get("value").get("handling_time").asLong();
>                     }
>                 });
>
>         /* Use the watermark stragegy to create a datastream */
>         DataStream<ObjectNode> ds = env.fromSource(source,
> kafkaWmstrategy, "Kafka Source");
>
>         /* Split the ObjectNode into a Tuple4 */
>         DataStream<Tuple4<Long, Long, String, String>> tuple4ds =
> ds.flatMap(new Splitter());
>
>         WatermarkStrategy<Tuple4<Long, Long, String, String>> wmStrategy =
> WatermarkStrategy
>                 .<Tuple4<Long, Long, String,
> String>>forBoundedOutOfOrderness(Duration.ofSeconds(Integer.parseInt(outOfOrderness)))
>
> .withIdleness(Duration.ofSeconds(Integer.parseInt(idleness)))
>                 .withTimestampAssigner(new
> SerializableTimestampAssigner<Tuple4<Long, Long, String, String>>() {
>                     @Override
>                     public long extractTimestamp(Tuple4<Long, Long,
> String, String> element, long eventTime) {
>                         return element.f0;
>                     }
>                 });
>
>         DataStream<Tuple4<Long, Long, String, String>> tuple4dswm =
> tuple4ds.assignTimestampsAndWatermarks(wmStrategy);
>
>         DataStream<String>  tuple4DsWmKeyedbytr =  tuple4dswm
>             .keyBy(new KeySelector<Tuple4<Long, Long, String, String>,
> String>() {
>                 @Override
>                 public String getKey(Tuple4<Long, Long, String, String>
> value) throws Exception {
>                     return value.f2;
>                 }
>             })
>
> .window(ProcessingTimeSessionWindows.withGap(Time.seconds(Integer.parseInt(sessionWindowGap))))
>
> .allowedLateness(Time.seconds(Integer.parseInt(sessionAllowedLateness)))
>             .process(new MyProcessWindowFunction());
>
>
>         KafkaSink<String> kSink = KafkaSink.<String>builder()
>                 .setBootstrapServers(outputBrokers)
>
> .setRecordSerializer(KafkaRecordSerializationSchema.builder()
>                         .setTopic(kafkaOutputTopic)
>                         .setValueSerializationSchema(new
> SimpleStringSchema())
>                         .build()
>                 )
>                 .setDeliverGuarantee(DeliveryGuarantee.AT_LEAST_ONCE)
>                 .build();
>
>         // Sink to the Kafka topic
>         tuple4DsWmKeyedbytr.sinkTo(kSink);
>
> Diese Nachricht ist ausschliesslich für den Adressaten bestimmt und
> beinhaltet unter Umständen vertrauliche Mitteilungen. Da die
> Vertraulichkeit von e-Mail-Nachrichten nicht gewährleistet werden kann,
> übernehmen wir keine Haftung für die Gewährung der Vertraulichkeit und
> Unversehrtheit dieser Mitteilung. Bei irrtümlicher Zustellung bitten wir
> Sie um Benachrichtigung per e-Mail und um Löschung dieser Nachricht sowie
> eventueller Anhänge. Jegliche unberechtigte Verwendung oder Verbreitung
> dieser Informationen ist streng verboten.
>
> This message is intended only for the named recipient and may contain
> confidential or privileged information. As the confidentiality of email
> communication cannot be guaranteed, we do not accept any responsibility for
> the confidentiality and the intactness of this message. If you have
> received it in error, please advise the sender by return e-mail and delete
> this message and any attachments. Any unauthorised use or dissemination of
> this information is strictly prohibited.
>
> Diese Nachricht ist ausschliesslich für den Adressaten bestimmt und
> beinhaltet unter Umständen vertrauliche Mitteilungen. Da die
> Vertraulichkeit von e-Mail-Nachrichten nicht gewährleistet werden kann,
> übernehmen wir keine Haftung für die Gewährung der Vertraulichkeit und
> Unversehrtheit dieser Mitteilung. Bei irrtümlicher Zustellung bitten wir
> Sie um Benachrichtigung per e-Mail und um Löschung dieser Nachricht sowie
> eventueller Anhänge. Jegliche unberechtigte Verwendung oder Verbreitung
> dieser Informationen ist streng verboten.
>
> This message is intended only for the named recipient and may contain
> confidential or privileged information. As the confidentiality of email
> communication cannot be guaranteed, we do not accept any responsibility for
> the confidentiality and the intactness of this message. If you have
> received it in error, please advise the sender by return e-mail and delete
> this message and any attachments. Any unauthorised use or dissemination of
> this information is strictly prohibited.
>

RE: Watermarks event time vs processing time

Posted by Schwalbe Matthias <Ma...@viseca.ch>.
Hello Hans-Peter,

I’m a little confused which version of your code you are testing against:

  *   ProcessingTimeSessionWindows or EventTimeSessionWindows?
  *   did you keep the withIdleness() ??

As said before:

  *   for ProcessingTimeSessionWindows, watermarks play no role
  *   if you keep withIdleness(), then the respective sparse DataStream is event-time-less most of the time, i.e. no triggers fire to close a session window
  *   withIdleness() makes only sense if you merge/union/connect multiple DataStream where at least one stream has their watermarks updated regularly (i.e. it is not withIdleness())
     *   this is not your case, your DAG is linear, no union nor connects
  *   in event-time mode processing time plays no role, watermarks exclusively take the role of the progress of model (event) time and hence the triggering of windows
  *   in order to trigger a (session-)window at time A the window operator needs to receive a watermark of at least time A
  *   next catch regards partitioning
     *   your first watermark strategy kafkaWmstrategy generates per-Kafka-partition watermarks
     *   a keyBy() reshuffles these partitions onto the number of subtasks according to the hash of the key
     *   this results in a per subtask calculation of the lowest watermark of all Kafka partitions that happen to be processed by that subtask
     *   i.e. if a single Kafka partition makes no watermark progress the subtask watermark makes no progress
     *   this surfaces in sparse data as in your case
  *   your second watermark strategy wmStrategy makes things worse because
     *   it discards the correct watermarks of the first watermark strategy
     *   and replaces it with something that is arbitrary (at this point it is hard to guess the correct max lateness that is a mixture of the events from multiple Kafka partitions)

Concusion:
The only way to make the event time session windows work for you in a timely manner is to make sure watermarks on all involved partitions make progress, i.e. new events arrive on all partitions in a regular manner.

Hope this helps

Thias


From: HG <ha...@gmail.com>
Sent: Tuesday, March 29, 2022 1:07 PM
To: Schwalbe Matthias <Ma...@viseca.ch>
Cc: user <us...@flink.apache.org>
Subject: Re: Watermarks event time vs processing time

⚠EXTERNAL MESSAGE – CAUTION: Think Before You Click ⚠


Hello Matthias,

When I remove all the watermark strategies it does not process anything .
For example when I use WatermarkStrategy.noWatermarks() instead of the one I build nothing seems to happen at all.

 Also when I skip the part where I add wmStrategy  to create tuple4dswm:
 DataStream<Tuple4<Long, Long, String, String>> tuple4dswm = tuple4ds.assignTimestampsAndWatermarks(wmStrategy);

Nothing is processed.

Regards Hans-Peter

Op wo 16 mrt. 2022 om 15:52 schreef Schwalbe Matthias <Ma...@viseca.ch>>:
Hi Hanspeter,

Let me relate some hints that might help you getting concepts clearer.

From your description I make following assumptions where your are not specific enough (please confirm or correct in your answer):

a.       You store incoming events in state per transaction_id to be sorted/aggregated(min/max time) by event time later on

b.       So far you used a session window to determine the point in time when to emit the stored/enriched/sorted events

c.        Watermarks are generated with bounded out of orderness

d.       You use session windows with a specific gap

e.       In your experiment you ever only send 1000 events and then stop producing incoming events

Now to your questions:

  *   For processing time session windows, watermarks play no role whatsoever, you simply assume that you’ve seen all events belonging so a single transaction id if the last such event for a specific transaction id was processed sessionWindowGap milliseconds ago
  *   Therefore you see all enriched incoming events the latest sessionWindowGap ms after the last incoming event (+ some latency)
  *   In event time mode and resp event time session windows the situation is exactly the same, only that processing time play no role
  *   A watermark means (ideally) that no event older than the watermark time ever follows the watermark (which itself is a meta-event that flows with the proper events on the same channels)
  *   In order for a session gap window to forward the enriched events the window operator needs to receive a watermark that is sessionWindowGap milliseconds beyond the latest incoming event (in terms of the respective event time)
  *   The watermark generator in order to generate a new watermark that triggers this last session window above needs to encounter an (any) event that has a timestamp of (<latest event in session window> + outOfOrderness + sessionWindowGap + 1ms)
  *   Remember, the watermark generator never generated watermarks based on processing time, but only based on the timestamps it has seen in events actually encountered
  *   Coming back to your idleness configuration: it only means that the incoming stream becomes idle == timeless after a while … i.e. watermarks won’t make progress from this steam, and it tells all downstream operators
  *   Idleness specification is only useful if a respective operator has another source of valid watermarks (i.e. after a union of two streams, one active/one idle ….). this is not your case

I hope this clarifies your situation.

Cheers


Thias


From: HG <ha...@gmail.com>>
Sent: Mittwoch, 16. März 2022 10:06
To: user <us...@flink.apache.org>>
Subject: Watermarks event time vs processing time

⚠EXTERNAL MESSAGE – CAUTION: Think Before You Click ⚠


Hi,

I read from a Kafka topic events that are in JSON format
These event contain a handling time (aka event time) in epoch milliseconds, a transaction_id and a large nested JSON structure.
I need to group the events by transaction_id, order them by handling time and calculate the differences in handling time.
The events are updated with this calculated elapsed time and pushed further.
So all events that go in should come out with the elapsed time added.

For testing I use events that are old (so handling time is not nearly the wall clock time)
Initially I used EventTimeSessionWindows but somehow the processing did not run as expected.
When I pushed 1000 events eventually 800 or so would appear at the output.
This was resolved by switching to ProcessingTimeSessionWindows .
My thought was then that I could remove the watermarkstrategies with watermarks with timestamp assigners (handling time) for the Kafka input stream and the data stream.
However this was not the case.

Can anyone enlighten me as to why the watermark strategies are still needed?

Below the code

        KafkaSource<ObjectNode> source = KafkaSource.<ObjectNode>builder()
                .setProperties(kafkaProps)
                .setProperty("ssl.truststore.type", trustStoreType)
                .setProperty("ssl.truststore.password", trustStorePassword)
                .setProperty("ssl.truststore.location", trustStoreLocation)
                .setProperty("security.protocol", securityProtocol)
                .setProperty("partition.discovery.interval.ms<http://partition.discovery.interval.ms>", partitionDiscoveryIntervalMs)
                .setProperty("commit.offsets.on.checkpoint", commitOffsetsOnCheckpoint)
                .setGroupId(inputGroupId)
                .setClientIdPrefix(clientId)
                .setTopics(kafkaInputTopic)
                .setDeserializer(KafkaRecordDeserializationSchema.of(new JSONKeyValueDeserializationSchema(fetchMetadata)))
                .setStartingOffsets(OffsetsInitializer.committedOffsets(OffsetResetStrategy.EARLIEST))
                .build();

        /* A watermark is needed to prevent duplicates! */
        WatermarkStrategy<ObjectNode> kafkaWmstrategy = WatermarkStrategy
                .<ObjectNode>forBoundedOutOfOrderness(Duration.ofSeconds(Integer.parseInt(outOfOrderness)))
                .withIdleness(Duration.ofSeconds(Integer.parseInt(idleness)))
                .withTimestampAssigner(new SerializableTimestampAssigner<ObjectNode>() {
                    @Override
                    public long extractTimestamp(ObjectNode element, long eventTime) {
                        return element.get("value").get("handling_time").asLong();
                    }
                });

        /* Use the watermark stragegy to create a datastream */
        DataStream<ObjectNode> ds = env.fromSource(source, kafkaWmstrategy, "Kafka Source");

        /* Split the ObjectNode into a Tuple4 */
        DataStream<Tuple4<Long, Long, String, String>> tuple4ds = ds.flatMap(new Splitter());

        WatermarkStrategy<Tuple4<Long, Long, String, String>> wmStrategy = WatermarkStrategy
                .<Tuple4<Long, Long, String, String>>forBoundedOutOfOrderness(Duration.ofSeconds(Integer.parseInt(outOfOrderness)))
                .withIdleness(Duration.ofSeconds(Integer.parseInt(idleness)))
                .withTimestampAssigner(new SerializableTimestampAssigner<Tuple4<Long, Long, String, String>>() {
                    @Override
                    public long extractTimestamp(Tuple4<Long, Long, String, String> element, long eventTime) {
                        return element.f0;
                    }
                });

        DataStream<Tuple4<Long, Long, String, String>> tuple4dswm = tuple4ds.assignTimestampsAndWatermarks(wmStrategy);

        DataStream<String>  tuple4DsWmKeyedbytr =  tuple4dswm
            .keyBy(new KeySelector<Tuple4<Long, Long, String, String>, String>() {
                @Override
                public String getKey(Tuple4<Long, Long, String, String> value) throws Exception {
                    return value.f2;
                }
            })
            .window(ProcessingTimeSessionWindows.withGap(Time.seconds(Integer.parseInt(sessionWindowGap))))
            .allowedLateness(Time.seconds(Integer.parseInt(sessionAllowedLateness)))
            .process(new MyProcessWindowFunction());


        KafkaSink<String> kSink = KafkaSink.<String>builder()
                .setBootstrapServers(outputBrokers)
                .setRecordSerializer(KafkaRecordSerializationSchema.builder()
                        .setTopic(kafkaOutputTopic)
                        .setValueSerializationSchema(new SimpleStringSchema())
                        .build()
                )
                .setDeliverGuarantee(DeliveryGuarantee.AT_LEAST_ONCE)
                .build();

        // Sink to the Kafka topic
        tuple4DsWmKeyedbytr.sinkTo(kSink);
Diese Nachricht ist ausschliesslich für den Adressaten bestimmt und beinhaltet unter Umständen vertrauliche Mitteilungen. Da die Vertraulichkeit von e-Mail-Nachrichten nicht gewährleistet werden kann, übernehmen wir keine Haftung für die Gewährung der Vertraulichkeit und Unversehrtheit dieser Mitteilung. Bei irrtümlicher Zustellung bitten wir Sie um Benachrichtigung per e-Mail und um Löschung dieser Nachricht sowie eventueller Anhänge. Jegliche unberechtigte Verwendung oder Verbreitung dieser Informationen ist streng verboten.

This message is intended only for the named recipient and may contain confidential or privileged information. As the confidentiality of email communication cannot be guaranteed, we do not accept any responsibility for the confidentiality and the intactness of this message. If you have received it in error, please advise the sender by return e-mail and delete this message and any attachments. Any unauthorised use or dissemination of this information is strictly prohibited.
Diese Nachricht ist ausschliesslich für den Adressaten bestimmt und beinhaltet unter Umständen vertrauliche Mitteilungen. Da die Vertraulichkeit von e-Mail-Nachrichten nicht gewährleistet werden kann, übernehmen wir keine Haftung für die Gewährung der Vertraulichkeit und Unversehrtheit dieser Mitteilung. Bei irrtümlicher Zustellung bitten wir Sie um Benachrichtigung per e-Mail und um Löschung dieser Nachricht sowie eventueller Anhänge. Jegliche unberechtigte Verwendung oder Verbreitung dieser Informationen ist streng verboten.

This message is intended only for the named recipient and may contain confidential or privileged information. As the confidentiality of email communication cannot be guaranteed, we do not accept any responsibility for the confidentiality and the intactness of this message. If you have received it in error, please advise the sender by return e-mail and delete this message and any attachments. Any unauthorised use or dissemination of this information is strictly prohibited.

Re: Watermarks event time vs processing time

Posted by HG <ha...@gmail.com>.
Hello Matthias,

When I remove all the watermark strategies it does not process anything .
For example when I use WatermarkStrategy.noWatermarks() instead of the one
I build nothing seems to happen at all.

 Also when I skip the part where I add wmStrategy  to create tuple4dswm:
 DataStream<Tuple4<Long, Long, String, String>> tuple4dswm =
tuple4ds.assignTimestampsAndWatermarks(wmStrategy);

Nothing is processed.

Regards Hans-Peter

Op wo 16 mrt. 2022 om 15:52 schreef Schwalbe Matthias <
Matthias.Schwalbe@viseca.ch>:

> Hi Hanspeter,
>
>
>
> Let me relate some hints that might help you getting concepts clearer.
>
>
>
> From your description I make following assumptions where your are not
> specific enough (please confirm or correct in your answer):
>
>    1. You store incoming events in state per transaction_id to be
>    sorted/aggregated(min/max time) by event time later on
>    2. So far you used a session window to determine the point in time
>    when to emit the stored/enriched/sorted events
>    3. Watermarks are generated with bounded out of orderness
>    4. You use session windows with a specific gap
>    5. In your experiment you ever only send 1000 events and then stop
>    producing incoming events
>
>
>
> Now to your questions:
>
>    - For processing time session windows, watermarks play no role
>    whatsoever, you simply assume that you’ve seen all events belonging so a
>    single transaction id if the last such event for a specific transaction id
>    was processed sessionWindowGap milliseconds ago
>    - Therefore you see all enriched incoming events the latest
>    sessionWindowGap ms after the last incoming event (+ some latency)
>    - In event time mode and resp event time session windows the situation
>    is exactly the same, only that processing time play no role
>    - A watermark means (ideally) that no event older than the watermark
>    time ever follows the watermark (which itself is a meta-event that flows
>    with the proper events on the same channels)
>    - In order for a session gap window to forward the enriched events the
>    window operator needs to receive a watermark that is sessionWindowGap
>    milliseconds beyond the latest incoming event (in terms of the respective
>    event time)
>    - The watermark generator in order to generate a new watermark that
>    triggers this last session window above needs to encounter an (any) event
>    that has a timestamp of (<latest event in session window> + outOfOrderness
>    + sessionWindowGap + 1ms)
>    - Remember, the watermark generator never generated watermarks based
>    on processing time, but only based on the timestamps it has seen in events
>    actually encountered
>    - Coming back to your idleness configuration: it only means that the
>    incoming stream becomes idle == timeless after a while … i.e. watermarks
>    won’t make progress from this steam, and it tells all downstream operators
>    - Idleness specification is only useful if a respective operator has
>    another source of valid watermarks (i.e. after a union of two streams, one
>    active/one idle ….). this is not your case
>
>
>
> I hope this clarifies your situation.
>
>
>
> Cheers
>
>
>
>
>
> Thias
>
>
>
>
>
> *From:* HG <ha...@gmail.com>
> *Sent:* Mittwoch, 16. März 2022 10:06
> *To:* user <us...@flink.apache.org>
> *Subject:* Watermarks event time vs processing time
>
>
>
> ⚠*EXTERNAL MESSAGE – **CAUTION: Think Before You Click *⚠
>
>
>
> Hi,
>
>
>
> I read from a Kafka topic events that are in JSON format
>
> These event contain a handling time (aka event time) in epoch
> milliseconds, a transaction_id and a large nested JSON structure.
>
> I need to group the events by transaction_id, order them by handling time
> and calculate the differences in handling time.
>
> The events are updated with this calculated elapsed time and pushed
> further.
>
> So all events that go in should come out with the elapsed time added.
>
>
>
> For testing I use events that are old (so handling time is not nearly the
> wall clock time)
>
> Initially I used EventTimeSessionWindows but somehow the processing did
> not run as expected.
>
> When I pushed 1000 events eventually 800 or so would appear at the output.
> This was resolved by switching to ProcessingTimeSessionWindows .
> My thought was then that I could remove the watermarkstrategies with
> watermarks with timestamp assigners (handling time) for the Kafka input
> stream and the data stream.
>
> However this was not the case.
>
>
>
> Can anyone enlighten me as to why the watermark strategies are still
> needed?
>
>
>
> Below the code
>
>
>
>         KafkaSource<ObjectNode> source = KafkaSource.<ObjectNode>builder()
>                 .setProperties(kafkaProps)
>                 .setProperty("ssl.truststore.type", trustStoreType)
>                 .setProperty("ssl.truststore.password", trustStorePassword)
>                 .setProperty("ssl.truststore.location", trustStoreLocation)
>                 .setProperty("security.protocol", securityProtocol)
>                 .setProperty("partition.discovery.interval.ms",
> partitionDiscoveryIntervalMs)
>                 .setProperty("commit.offsets.on.checkpoint",
> commitOffsetsOnCheckpoint)
>                 .setGroupId(inputGroupId)
>                 .setClientIdPrefix(clientId)
>                 .setTopics(kafkaInputTopic)
>                 .setDeserializer(KafkaRecordDeserializationSchema.of(new
> JSONKeyValueDeserializationSchema(fetchMetadata)))
>
> .setStartingOffsets(OffsetsInitializer.committedOffsets(OffsetResetStrategy.EARLIEST))
>                 .build();
>
>         /* A watermark is needed to prevent duplicates! */
>         WatermarkStrategy<ObjectNode> kafkaWmstrategy = WatermarkStrategy
>
> .<ObjectNode>forBoundedOutOfOrderness(Duration.ofSeconds(Integer.parseInt(outOfOrderness)))
>
> .withIdleness(Duration.ofSeconds(Integer.parseInt(idleness)))
>                 .withTimestampAssigner(new
> SerializableTimestampAssigner<ObjectNode>() {
>                     @Override
>                     public long extractTimestamp(ObjectNode element, long
> eventTime) {
>                         return
> element.get("value").get("handling_time").asLong();
>                     }
>                 });
>
>         /* Use the watermark stragegy to create a datastream */
>         DataStream<ObjectNode> ds = env.fromSource(source,
> kafkaWmstrategy, "Kafka Source");
>
>         /* Split the ObjectNode into a Tuple4 */
>         DataStream<Tuple4<Long, Long, String, String>> tuple4ds =
> ds.flatMap(new Splitter());
>
>         WatermarkStrategy<Tuple4<Long, Long, String, String>> wmStrategy =
> WatermarkStrategy
>                 .<Tuple4<Long, Long, String,
> String>>forBoundedOutOfOrderness(Duration.ofSeconds(Integer.parseInt(outOfOrderness)))
>
> .withIdleness(Duration.ofSeconds(Integer.parseInt(idleness)))
>                 .withTimestampAssigner(new
> SerializableTimestampAssigner<Tuple4<Long, Long, String, String>>() {
>                     @Override
>                     public long extractTimestamp(Tuple4<Long, Long,
> String, String> element, long eventTime) {
>                         return element.f0;
>                     }
>                 });
>
>         DataStream<Tuple4<Long, Long, String, String>> tuple4dswm =
> tuple4ds.assignTimestampsAndWatermarks(wmStrategy);
>
>         DataStream<String>  tuple4DsWmKeyedbytr =  tuple4dswm
>             .keyBy(new KeySelector<Tuple4<Long, Long, String, String>,
> String>() {
>                 @Override
>                 public String getKey(Tuple4<Long, Long, String, String>
> value) throws Exception {
>                     return value.f2;
>                 }
>             })
>
> .window(ProcessingTimeSessionWindows.withGap(Time.seconds(Integer.parseInt(sessionWindowGap))))
>
> .allowedLateness(Time.seconds(Integer.parseInt(sessionAllowedLateness)))
>             .process(new MyProcessWindowFunction());
>
>
>         KafkaSink<String> kSink = KafkaSink.<String>builder()
>                 .setBootstrapServers(outputBrokers)
>
> .setRecordSerializer(KafkaRecordSerializationSchema.builder()
>                         .setTopic(kafkaOutputTopic)
>                         .setValueSerializationSchema(new
> SimpleStringSchema())
>                         .build()
>                 )
>                 .setDeliverGuarantee(DeliveryGuarantee.AT_LEAST_ONCE)
>                 .build();
>
>         // Sink to the Kafka topic
>         tuple4DsWmKeyedbytr.sinkTo(kSink);
> Diese Nachricht ist ausschliesslich für den Adressaten bestimmt und
> beinhaltet unter Umständen vertrauliche Mitteilungen. Da die
> Vertraulichkeit von e-Mail-Nachrichten nicht gewährleistet werden kann,
> übernehmen wir keine Haftung für die Gewährung der Vertraulichkeit und
> Unversehrtheit dieser Mitteilung. Bei irrtümlicher Zustellung bitten wir
> Sie um Benachrichtigung per e-Mail und um Löschung dieser Nachricht sowie
> eventueller Anhänge. Jegliche unberechtigte Verwendung oder Verbreitung
> dieser Informationen ist streng verboten.
>
> This message is intended only for the named recipient and may contain
> confidential or privileged information. As the confidentiality of email
> communication cannot be guaranteed, we do not accept any responsibility for
> the confidentiality and the intactness of this message. If you have
> received it in error, please advise the sender by return e-mail and delete
> this message and any attachments. Any unauthorised use or dissemination of
> this information is strictly prohibited.
>

RE: Watermarks event time vs processing time

Posted by Schwalbe Matthias <Ma...@viseca.ch>.
Hi Hanspeter,

Let me relate some hints that might help you getting concepts clearer.

From your description I make following assumptions where your are not specific enough (please confirm or correct in your answer):

  1.  You store incoming events in state per transaction_id to be sorted/aggregated(min/max time) by event time later on
  2.  So far you used a session window to determine the point in time when to emit the stored/enriched/sorted events
  3.  Watermarks are generated with bounded out of orderness
  4.  You use session windows with a specific gap
  5.  In your experiment you ever only send 1000 events and then stop producing incoming events

Now to your questions:

  *   For processing time session windows, watermarks play no role whatsoever, you simply assume that you’ve seen all events belonging so a single transaction id if the last such event for a specific transaction id was processed sessionWindowGap milliseconds ago
  *   Therefore you see all enriched incoming events the latest sessionWindowGap ms after the last incoming event (+ some latency)
  *   In event time mode and resp event time session windows the situation is exactly the same, only that processing time play no role
  *   A watermark means (ideally) that no event older than the watermark time ever follows the watermark (which itself is a meta-event that flows with the proper events on the same channels)
  *   In order for a session gap window to forward the enriched events the window operator needs to receive a watermark that is sessionWindowGap milliseconds beyond the latest incoming event (in terms of the respective event time)
  *   The watermark generator in order to generate a new watermark that triggers this last session window above needs to encounter an (any) event that has a timestamp of (<latest event in session window> + outOfOrderness + sessionWindowGap + 1ms)
  *   Remember, the watermark generator never generated watermarks based on processing time, but only based on the timestamps it has seen in events actually encountered
  *   Coming back to your idleness configuration: it only means that the incoming stream becomes idle == timeless after a while … i.e. watermarks won’t make progress from this steam, and it tells all downstream operators
  *   Idleness specification is only useful if a respective operator has another source of valid watermarks (i.e. after a union of two streams, one active/one idle ….). this is not your case

I hope this clarifies your situation.

Cheers


Thias


From: HG <ha...@gmail.com>
Sent: Mittwoch, 16. März 2022 10:06
To: user <us...@flink.apache.org>
Subject: Watermarks event time vs processing time

⚠EXTERNAL MESSAGE – CAUTION: Think Before You Click ⚠


Hi,

I read from a Kafka topic events that are in JSON format
These event contain a handling time (aka event time) in epoch milliseconds, a transaction_id and a large nested JSON structure.
I need to group the events by transaction_id, order them by handling time and calculate the differences in handling time.
The events are updated with this calculated elapsed time and pushed further.
So all events that go in should come out with the elapsed time added.

For testing I use events that are old (so handling time is not nearly the wall clock time)
Initially I used EventTimeSessionWindows but somehow the processing did not run as expected.
When I pushed 1000 events eventually 800 or so would appear at the output.
This was resolved by switching to ProcessingTimeSessionWindows .
My thought was then that I could remove the watermarkstrategies with watermarks with timestamp assigners (handling time) for the Kafka input stream and the data stream.
However this was not the case.

Can anyone enlighten me as to why the watermark strategies are still needed?

Below the code

        KafkaSource<ObjectNode> source = KafkaSource.<ObjectNode>builder()
                .setProperties(kafkaProps)
                .setProperty("ssl.truststore.type", trustStoreType)
                .setProperty("ssl.truststore.password", trustStorePassword)
                .setProperty("ssl.truststore.location", trustStoreLocation)
                .setProperty("security.protocol", securityProtocol)
                .setProperty("partition.discovery.interval.ms<http://partition.discovery.interval.ms>", partitionDiscoveryIntervalMs)
                .setProperty("commit.offsets.on.checkpoint", commitOffsetsOnCheckpoint)
                .setGroupId(inputGroupId)
                .setClientIdPrefix(clientId)
                .setTopics(kafkaInputTopic)
                .setDeserializer(KafkaRecordDeserializationSchema.of(new JSONKeyValueDeserializationSchema(fetchMetadata)))
                .setStartingOffsets(OffsetsInitializer.committedOffsets(OffsetResetStrategy.EARLIEST))
                .build();

        /* A watermark is needed to prevent duplicates! */
        WatermarkStrategy<ObjectNode> kafkaWmstrategy = WatermarkStrategy
                .<ObjectNode>forBoundedOutOfOrderness(Duration.ofSeconds(Integer.parseInt(outOfOrderness)))
                .withIdleness(Duration.ofSeconds(Integer.parseInt(idleness)))
                .withTimestampAssigner(new SerializableTimestampAssigner<ObjectNode>() {
                    @Override
                    public long extractTimestamp(ObjectNode element, long eventTime) {
                        return element.get("value").get("handling_time").asLong();
                    }
                });

        /* Use the watermark stragegy to create a datastream */
        DataStream<ObjectNode> ds = env.fromSource(source, kafkaWmstrategy, "Kafka Source");

        /* Split the ObjectNode into a Tuple4 */
        DataStream<Tuple4<Long, Long, String, String>> tuple4ds = ds.flatMap(new Splitter());

        WatermarkStrategy<Tuple4<Long, Long, String, String>> wmStrategy = WatermarkStrategy
                .<Tuple4<Long, Long, String, String>>forBoundedOutOfOrderness(Duration.ofSeconds(Integer.parseInt(outOfOrderness)))
                .withIdleness(Duration.ofSeconds(Integer.parseInt(idleness)))
                .withTimestampAssigner(new SerializableTimestampAssigner<Tuple4<Long, Long, String, String>>() {
                    @Override
                    public long extractTimestamp(Tuple4<Long, Long, String, String> element, long eventTime) {
                        return element.f0;
                    }
                });

        DataStream<Tuple4<Long, Long, String, String>> tuple4dswm = tuple4ds.assignTimestampsAndWatermarks(wmStrategy);

        DataStream<String>  tuple4DsWmKeyedbytr =  tuple4dswm
            .keyBy(new KeySelector<Tuple4<Long, Long, String, String>, String>() {
                @Override
                public String getKey(Tuple4<Long, Long, String, String> value) throws Exception {
                    return value.f2;
                }
            })
            .window(ProcessingTimeSessionWindows.withGap(Time.seconds(Integer.parseInt(sessionWindowGap))))
            .allowedLateness(Time.seconds(Integer.parseInt(sessionAllowedLateness)))
            .process(new MyProcessWindowFunction());


        KafkaSink<String> kSink = KafkaSink.<String>builder()
                .setBootstrapServers(outputBrokers)
                .setRecordSerializer(KafkaRecordSerializationSchema.builder()
                        .setTopic(kafkaOutputTopic)
                        .setValueSerializationSchema(new SimpleStringSchema())
                        .build()
                )
                .setDeliverGuarantee(DeliveryGuarantee.AT_LEAST_ONCE)
                .build();

        // Sink to the Kafka topic
        tuple4DsWmKeyedbytr.sinkTo(kSink);
Diese Nachricht ist ausschliesslich für den Adressaten bestimmt und beinhaltet unter Umständen vertrauliche Mitteilungen. Da die Vertraulichkeit von e-Mail-Nachrichten nicht gewährleistet werden kann, übernehmen wir keine Haftung für die Gewährung der Vertraulichkeit und Unversehrtheit dieser Mitteilung. Bei irrtümlicher Zustellung bitten wir Sie um Benachrichtigung per e-Mail und um Löschung dieser Nachricht sowie eventueller Anhänge. Jegliche unberechtigte Verwendung oder Verbreitung dieser Informationen ist streng verboten.

This message is intended only for the named recipient and may contain confidential or privileged information. As the confidentiality of email communication cannot be guaranteed, we do not accept any responsibility for the confidentiality and the intactness of this message. If you have received it in error, please advise the sender by return e-mail and delete this message and any attachments. Any unauthorised use or dissemination of this information is strictly prohibited.