You are viewing a plain text version of this content. The canonical link for it is here.
Posted to user@flink.apache.org by Gnanasoundari Soundarajan <gn...@man-es.com> on 2020/05/14 04:14:01 UTC

Watermarks and parallelism

Hi all,

I have below queries in flink. Could anyone help me to understand?

Query:

1 Is watermark maintained  globally at the operator level?

2 When we have a keyByOperator with parallelism >1, is there a single watermark maintained across all the parallel subtasks or for each of the parallel subtasks

3. Assuming I have a keybyoperator with parallelism > 1, is it possible to feed data to this operator from only one stream from the previous parameter (say map (1) always goes to window (1)

Regards,
Gnana

Re: Watermarks and parallelism

Posted by Arvid Heise <ar...@ververica.com>.
Hi Gnanasoundari,

there are two things that you need to choose:
* maxOutOfOrderness [1], which determines how long you wait until you close
the windows for the first time
* and allowedLateness [2], which allows late events to be processed and
cause update events

In general, you have to pick some assumptions. Even without Flink, you need
to find a point in time where you kick off your calculation for the first
time. Then you can choose to update the results if more data comes in.
However, you still need to find a maximum wait time to handle disconnected
assets and to eventually free the resources.

For your questions:
1) If assets may be down, you probably want to introduce an idle timeout in
your watermark assigner, like this
https://github.com/ververica/flink-training-troubleshooting/blob/master/src/main/java/com/ververica/flinktraining/solutions/troubleshoot/TroubledStreamingJobSolution2.java#L129-L187
.
2) That's where allowedLateness comes in. You can configure Flink to ignore
late events (default) or to perform recalculation and emit update events.
Note that your downstream pipelines need to be able to handle such update
events.

I also recommend checking out retract streams of the table API [3].

[1]
https://ci.apache.org/projects/flink/flink-docs-stable/dev/event_timestamps_watermarks.html#with-periodic-watermarks
[2]
https://ci.apache.org/projects/flink/flink-docs-release-1.10/dev/stream/operators/windows.html#allowed-lateness
[3]
https://ci.apache.org/projects/flink/flink-docs-stable/dev/table/streaming/dynamic_tables.html#table-to-stream-conversion

On Wed, May 20, 2020 at 6:25 AM Gnanasoundari Soundarajan <
gnanasoundari.soundarajan@man-es.com> wrote:

> Thanks Arvid for the explanation.
>
>
>
> Assume, we have 5 assets.
>
> Asset 1, 2 sending current timestamp May 17th 00:00
>
> Asset 3  went down on May 15th 00:00 and it restarted on May 17th 00:00
> and it started sending data from May 15th 00:00
>
> Asset 4 went down on May 16th 00:00 and it is still down
>
> Asset 5 went down on May 14th 00:00 and it restarted on May 17th 01:00
> and it started sending data from May 14th 00:00
>
>
>
> In the above scenario,
>
>    - Due to the uncertainty on the asset connectivity, I can’t predict
>    out of orderness duration.
>    - Not all the asset will communicate all the time.
>
>
>
> I have a kafka topic as source and sink to flink job and I have event time
> window of 1min.
>
>
>
> *Query:*
>
>    1. As I have 5 assets, if I set parallelism of 5 in window operator
>    level, will it not have any issues in watermark progression when asset 4 is
>    not communicating? Assume, I use key By asset id(1 to 5)
>    2. Assume window operator chose the watermark as May 15th 00:00 of
>    asset 3 as it is a minimum event time across the sub task of window, if
>    asset 5 sends the data for May14th 00:00, will asset 5 data not be dropped
>    considering it as a late date?
>
>
>
> Regards,
>
> Gnana
>
>
>
> *From: *Arvid Heise <ar...@ververica.com>
> *Date: *Monday, 18 May 2020 at 4:59 PM
> *To: *Gnanasoundari Soundarajan <gn...@man-es.com>
> *Cc: *Alexander Fedulov <al...@ververica.com>, "user@flink.apache.org"
> <us...@flink.apache.org>
> *Subject: *Re: Watermarks and parallelism
>
>
>
> Hi Gnanasoundari,
>
>
>
> Your use case is very typical and pretty much the main motivation for
> event time and watermarks. It's supported out of the box. I recommend
> reading again the first resource of Alex.
>
>
>
> To make it clear, let's have a small example:
>
>
>
> Source 1 -\
>
>                  +--> Window --> Sink
>
> Source 2 -/
>
>
>
> Consider source 1 being 1s ahead of source 2. Then the watermarks are also
> one 1s ahead. Now at the window level, the watermark will only advance at
> the minimum! That's why no data is lost in your case.
>
>
>
> In particular, consider the following events that pop up once per second
> that just need to be summed up per minute.
>
> Source1: (00:01, s1_event1), ..., (00:59, s1_event59), (01:00,
> s1_event60), (01:01, s1_event61), ...
>
> Source2: (00:00, s2_event1), ..., (00:58, s2_event59), (00:59,
> s2_event60), (01:00, s2_event61), ...
>
> For simplicity, assume that watermark = event timestamp.
>
>
>
> Then consider a window [00:00, 00:59], this window will only close off,
> perform the aggregation, and fire the result, if the watermark from both
> sources reached 01:00 (so when the event with that timestamp occurs).
>
> It will contain 59 events from Source1 and 60 events from Source2.
>
> In particular, when event s1_event60 arrives at 01:00, it carries over to
> the next window [01:00, 01:59], while the previous window is still open for
> events from Source2. Only after receiving s2_event61 at 01:00, the first
> window will result in an output event.
>
>
>
> Of course that also means that data from quick sources need to live as
> long in the main memory (or actually state backend) as it takes for the
> slowest source to catch up.
>
>
>
> On Fri, May 15, 2020 at 7:16 PM Gnanasoundari Soundarajan <
> gnanasoundari.soundarajan@man-es.com> wrote:
>
> Thanks Alexander for your detailed response.
>
>
>
> I have a requirement that each asset will communicate different event time
> due to connectivity issues. If I have 50 asset and each communicates with
> different event time, I should not lose the data because of lateness.
>
>
>
> To handle this, I have tried with keyBy operator to route the data by
> asset context and try to maintain watermark per asset (key) using
> keyedProcess function by registering eventtime timer for each asset (key).
> When I have tried this option, I observed that eventtime timer is not
> triggered keyedProcess function and hence data didn’t flow downstream.
>
>
>
> I am curious to know that whether will it be a feasible requirement to
> achieve it in flink using event time?
>
>
>
> Regards,
>
> Gnana
>
>
>
> *From: *Alexander Fedulov <al...@ververica.com>
> *Date: *Thursday, 14 May 2020 at 9:25 PM
> *To: *Gnanasoundari Soundarajan <gn...@man-es.com>
> *Cc: *"user@flink.apache.org" <us...@flink.apache.org>
> *Subject: *Re: Watermarks and parallelism
>
>
>
> Hi Gnana,
>
>
>
> 1. No, watermarks are generated independently per subtask. I think this
> section of the docs might make things more clear - [1]
> <https://ci.apache.org/projects/flink/flink-docs-master/concepts/timely-stream-processing.html#watermarks-in-parallel-streams>
>  .
>
>
>
> 2. The same watermark from the input of the keyBy will be dispatched to
> all of the instances of the downstream keyed operator. That said, there is
> no global coordination between the subtasks. The same watermark can arrive
> at the downstream subtask at a different time, depending on how much time
> they'd spend on the input channels. Notice also that watermarks are managed
> on the subtask level, not at the level of the individual keys.
>
>
>
> 3. I am not quite sure I get what you mean by this one and what exactly
> you try to achieve. I assume you want to basically have parallel windows
> that are scoped to all of the items coming from a corresponding subtask of
> the previous non-keyed operator. As Flink windows can be executed in
> parallel only on keyed streams, you could  do a little trick - use
> `reinterpredAsKeyedStream` [2]
> <https://ci.apache.org/projects/flink/flink-docs-stable/dev/stream/experimental.html#reinterpreting-a-pre-partitioned-data-stream-as-keyed-stream>.
> This will make it possible to basically have a "passthrough" partitioning,
> without an actual data shuffle. Another alternative would be to implement
> your Map function as a RichMapFunction, which gives you the access to the
> runtime context. From there:
>
> 1) use `getRuntimeContext().getIndexOfThisSubtask();` to retrieve the ID
> of the current subtask
>
> 2) enrich your events with a new field, containing the subtask ID
>
> 3) use this ID as the key in your keyBy operator
>
> The problem is that both of those approaches will be non-deterministic in
> terms of state recovery when, for instance, you would like to scale out
> your job to a higher degree of parallelism. You'd need to decide if this is
> relevant for your use case.
>
>
>
> [1]
> https://ci.apache.org/projects/flink/flink-docs-master/concepts/timely-stream-processing.html#watermarks-in-parallel-streams
>
> [2]
> https://ci.apache.org/projects/flink/flink-docs-stable/dev/stream/experimental.html#reinterpreting-a-pre-partitioned-data-stream-as-keyed-stream
>
>
>
> Best,
>
> --
>
> *Alexander Fedulov* | Solutions Architect
>
> +49 1514 6265796
>
>
>
> *Error! Filename not specified.* <https://www.ververica.com/>
>
>
>
> Follow us @VervericaData
>
> --
>
> Join Flink Forward <https://flink-forward.org/> - The Apache Flink
> Conference
>
> Stream Processing | Event Driven | Real Time
>
> --
>
> Ververica GmbH | Invalidenstrasse 115, 10115 Berlin, Germany
>
> --
>
> Ververica GmbH
> Registered at Amtsgericht Charlottenburg: HRB 158244 B
> Managing Directors: Timothy Alexander Steinert, Yip Park Tung Jason, Ji
> (Tony) Cheng
>
>
>
>
>
>
>
> On Thu, May 14, 2020 at 6:14 AM Gnanasoundari Soundarajan <
> gnanasoundari.soundarajan@man-es.com> wrote:
>
> Hi all,
>
>
>
> I have below queries in flink. Could anyone help me to understand?
>
>
>
> *Query:*
>
> 1 Is watermark maintained  globally at the operator level?
>
> 2 When we have a keyByOperator with parallelism >1, is there a single
> watermark maintained across all the parallel subtasks or for each of the
> parallel subtasks
>
> 3. Assuming I have a keybyoperator with parallelism > 1, is it possible
> to feed data to this operator from only one stream from the previous
> parameter (say map (1) always goes to window (1)
>
>
>
> Regards,
>
> Gnana
>
>
>
> --
>
> *Arvid Heise *| Senior Java Developer
>
> [image: Image removed by sender.] <https://www.ververica.com/>
>
>
>
> Follow us @VervericaData
>
> --
>
> Join Flink Forward <https://flink-forward.org/> - The Apache Flink
> Conference
>
> Stream Processing | Event Driven | Real Time
>
> --
>
> Ververica GmbH | Invalidenstrasse 115, 10115 Berlin, Germany
>
> --
>
> Ververica GmbH
> Registered at Amtsgericht Charlottenburg: HRB 158244 B
> Managing Directors: Timothy Alexander Steinert, Yip Park Tung Jason, Ji
> (Toni) Cheng
>


-- 

Arvid Heise | Senior Java Developer

<https://www.ververica.com/>

Follow us @VervericaData

--

Join Flink Forward <https://flink-forward.org/> - The Apache Flink
Conference

Stream Processing | Event Driven | Real Time

--

Ververica GmbH | Invalidenstrasse 115, 10115 Berlin, Germany

--
Ververica GmbH
Registered at Amtsgericht Charlottenburg: HRB 158244 B
Managing Directors: Timothy Alexander Steinert, Yip Park Tung Jason, Ji
(Toni) Cheng

Re: Watermarks and parallelism

Posted by Gnanasoundari Soundarajan <gn...@man-es.com>.
Thanks Arvid for the explanation.

Assume, we have 5 assets.
Asset 1, 2 sending current timestamp May 17th 00:00
Asset 3  went down on May 15th 00:00 and it restarted on May 17th 00:00 and it started sending data from May 15th 00:00
Asset 4 went down on May 16th 00:00 and it is still down
Asset 5 went down on May 14th 00:00 and it restarted on May 17th 01:00 and it started sending data from May 14th 00:00

In the above scenario,

  *   Due to the uncertainty on the asset connectivity, I can’t predict out of orderness duration.
  *   Not all the asset will communicate all the time.

I have a kafka topic as source and sink to flink job and I have event time window of 1min.

Query:

  1.  As I have 5 assets, if I set parallelism of 5 in window operator level, will it not have any issues in watermark progression when asset 4 is not communicating? Assume, I use key By asset id(1 to 5)
  2.  Assume window operator chose the watermark as May 15th 00:00 of asset 3 as it is a minimum event time across the sub task of window, if asset 5 sends the data for May14th 00:00, will asset 5 data not be dropped considering it as a late date?

Regards,
Gnana

From: Arvid Heise <ar...@ververica.com>
Date: Monday, 18 May 2020 at 4:59 PM
To: Gnanasoundari Soundarajan <gn...@man-es.com>
Cc: Alexander Fedulov <al...@ververica.com>, "user@flink.apache.org" <us...@flink.apache.org>
Subject: Re: Watermarks and parallelism

Hi Gnanasoundari,

Your use case is very typical and pretty much the main motivation for event time and watermarks. It's supported out of the box. I recommend reading again the first resource of Alex.

To make it clear, let's have a small example:

Source 1 -\
                 +--> Window --> Sink
Source 2 -/

Consider source 1 being 1s ahead of source 2. Then the watermarks are also one 1s ahead. Now at the window level, the watermark will only advance at the minimum! That's why no data is lost in your case.

In particular, consider the following events that pop up once per second that just need to be summed up per minute.
Source1: (00:01, s1_event1), ..., (00:59, s1_event59), (01:00, s1_event60), (01:01, s1_event61), ...
Source2: (00:00, s2_event1), ..., (00:58, s2_event59), (00:59, s2_event60), (01:00, s2_event61), ...
For simplicity, assume that watermark = event timestamp.

Then consider a window [00:00, 00:59], this window will only close off, perform the aggregation, and fire the result, if the watermark from both sources reached 01:00 (so when the event with that timestamp occurs).
It will contain 59 events from Source1 and 60 events from Source2.
In particular, when event s1_event60 arrives at 01:00, it carries over to the next window [01:00, 01:59], while the previous window is still open for events from Source2. Only after receiving s2_event61 at 01:00, the first window will result in an output event.

Of course that also means that data from quick sources need to live as long in the main memory (or actually state backend) as it takes for the slowest source to catch up.

On Fri, May 15, 2020 at 7:16 PM Gnanasoundari Soundarajan <gn...@man-es.com>> wrote:
Thanks Alexander for your detailed response.

I have a requirement that each asset will communicate different event time due to connectivity issues. If I have 50 asset and each communicates with different event time, I should not lose the data because of lateness.

To handle this, I have tried with keyBy operator to route the data by asset context and try to maintain watermark per asset (key) using keyedProcess function by registering eventtime timer for each asset (key).  When I have tried this option, I observed that eventtime timer is not triggered keyedProcess function and hence data didn’t flow downstream.

I am curious to know that whether will it be a feasible requirement to achieve it in flink using event time?

Regards,
Gnana

From: Alexander Fedulov <al...@ververica.com>>
Date: Thursday, 14 May 2020 at 9:25 PM
To: Gnanasoundari Soundarajan <gn...@man-es.com>>
Cc: "user@flink.apache.org<ma...@flink.apache.org>" <us...@flink.apache.org>>
Subject: Re: Watermarks and parallelism

Hi Gnana,

1. No, watermarks are generated independently per subtask. I think this section of the docs might make things more clear - [1]<https://ci.apache.org/projects/flink/flink-docs-master/concepts/timely-stream-processing.html#watermarks-in-parallel-streams> .

2. The same watermark from the input of the keyBy will be dispatched to all of the instances of the downstream keyed operator. That said, there is no global coordination between the subtasks. The same watermark can arrive at the downstream subtask at a different time, depending on how much time they'd spend on the input channels. Notice also that watermarks are managed on the subtask level, not at the level of the individual keys.

3. I am not quite sure I get what you mean by this one and what exactly you try to achieve. I assume you want to basically have parallel windows that are scoped to all of the items coming from a corresponding subtask of the previous non-keyed operator. As Flink windows can be executed in parallel only on keyed streams, you could  do a little trick - use `reinterpredAsKeyedStream` [2]<https://ci.apache.org/projects/flink/flink-docs-stable/dev/stream/experimental.html#reinterpreting-a-pre-partitioned-data-stream-as-keyed-stream>. This will make it possible to basically have a "passthrough" partitioning, without an actual data shuffle. Another alternative would be to implement your Map function as a RichMapFunction, which gives you the access to the runtime context. From there:
1) use `getRuntimeContext().getIndexOfThisSubtask();` to retrieve the ID of the current subtask
2) enrich your events with a new field, containing the subtask ID
3) use this ID as the key in your keyBy operator
The problem is that both of those approaches will be non-deterministic in terms of state recovery when, for instance, you would like to scale out your job to a higher degree of parallelism. You'd need to decide if this is relevant for your use case.

[1] https://ci.apache.org/projects/flink/flink-docs-master/concepts/timely-stream-processing.html#watermarks-in-parallel-streams
[2] https://ci.apache.org/projects/flink/flink-docs-stable/dev/stream/experimental.html#reinterpreting-a-pre-partitioned-data-stream-as-keyed-stream

Best,

--

Alexander Fedulov | Solutions Architect

+49 1514 6265796


Error! Filename not specified.<https://www.ververica.com/>


Follow us @VervericaData

--

Join Flink Forward<https://flink-forward.org/> - The Apache Flink Conference

Stream Processing | Event Driven | Real Time

--

Ververica GmbH | Invalidenstrasse 115, 10115 Berlin, Germany

--

Ververica GmbH
Registered at Amtsgericht Charlottenburg: HRB 158244 B
Managing Directors: Timothy Alexander Steinert, Yip Park Tung Jason, Ji (Tony) Cheng



On Thu, May 14, 2020 at 6:14 AM Gnanasoundari Soundarajan <gn...@man-es.com>> wrote:
Hi all,

I have below queries in flink. Could anyone help me to understand?

Query:

1 Is watermark maintained  globally at the operator level?

2 When we have a keyByOperator with parallelism >1, is there a single watermark maintained across all the parallel subtasks or for each of the parallel subtasks

3. Assuming I have a keybyoperator with parallelism > 1, is it possible to feed data to this operator from only one stream from the previous parameter (say map (1) always goes to window (1)

Regards,
Gnana


--

Arvid Heise | Senior Java Developer

[Image removed by sender.]<https://www.ververica.com/>


Follow us @VervericaData

--

Join Flink Forward<https://flink-forward.org/> - The Apache Flink Conference

Stream Processing | Event Driven | Real Time

--

Ververica GmbH | Invalidenstrasse 115, 10115 Berlin, Germany

--
Ververica GmbH
Registered at Amtsgericht Charlottenburg: HRB 158244 B
Managing Directors: Timothy Alexander Steinert, Yip Park Tung Jason, Ji (Toni) Cheng

Re: Watermarks and parallelism

Posted by Arvid Heise <ar...@ververica.com>.
Hi Gnanasoundari,

Your use case is very typical and pretty much the main motivation for event
time and watermarks. It's supported out of the box. I recommend reading
again the first resource of Alex.

To make it clear, let's have a small example:

Source 1 -\
                 +--> Window --> Sink
Source 2 -/

Consider source 1 being 1s ahead of source 2. Then the watermarks are also
one 1s ahead. Now at the window level, the watermark will only advance at
the minimum! That's why no data is lost in your case.

In particular, consider the following events that pop up once per second
that just need to be summed up per minute.
Source1: (00:01, s1_event1), ..., (00:59, s1_event59), (01:00, s1_event60),
(01:01, s1_event61), ...
Source2: (00:00, s2_event1), ..., (00:58, s2_event59), (00:59, s2_event60),
(01:00, s2_event61), ...
For simplicity, assume that watermark = event timestamp.

Then consider a window [00:00, 00:59], this window will only close off,
perform the aggregation, and fire the result, if the watermark from both
sources reached 01:00 (so when the event with that timestamp occurs).
It will contain 59 events from Source1 and 60 events from Source2.
In particular, when event s1_event60 arrives at 01:00, it carries over to
the next window [01:00, 01:59], while the previous window is still open for
events from Source2. Only after receiving s2_event61 at 01:00, the first
window will result in an output event.

Of course that also means that data from quick sources need to live as long
in the main memory (or actually state backend) as it takes for the slowest
source to catch up.

On Fri, May 15, 2020 at 7:16 PM Gnanasoundari Soundarajan <
gnanasoundari.soundarajan@man-es.com> wrote:

> Thanks Alexander for your detailed response.
>
>
>
> I have a requirement that each asset will communicate different event time
> due to connectivity issues. If I have 50 asset and each communicates with
> different event time, I should not lose the data because of lateness.
>
>
>
> To handle this, I have tried with keyBy operator to route the data by
> asset context and try to maintain watermark per asset (key) using
> keyedProcess function by registering eventtime timer for each asset (key).
> When I have tried this option, I observed that eventtime timer is not
> triggered keyedProcess function and hence data didn’t flow downstream.
>
>
>
> I am curious to know that whether will it be a feasible requirement to
> achieve it in flink using event time?
>
>
>
> Regards,
>
> Gnana
>
>
>
> *From: *Alexander Fedulov <al...@ververica.com>
> *Date: *Thursday, 14 May 2020 at 9:25 PM
> *To: *Gnanasoundari Soundarajan <gn...@man-es.com>
> *Cc: *"user@flink.apache.org" <us...@flink.apache.org>
> *Subject: *Re: Watermarks and parallelism
>
>
>
> Hi Gnana,
>
>
>
> 1. No, watermarks are generated independently per subtask. I think this
> section of the docs might make things more clear - [1]
> <https://ci.apache.org/projects/flink/flink-docs-master/concepts/timely-stream-processing.html#watermarks-in-parallel-streams>
>  .
>
>
>
> 2. The same watermark from the input of the keyBy will be dispatched to
> all of the instances of the downstream keyed operator. That said, there is
> no global coordination between the subtasks. The same watermark can arrive
> at the downstream subtask at a different time, depending on how much time
> they'd spend on the input channels. Notice also that watermarks are managed
> on the subtask level, not at the level of the individual keys.
>
>
>
> 3. I am not quite sure I get what you mean by this one and what exactly
> you try to achieve. I assume you want to basically have parallel windows
> that are scoped to all of the items coming from a corresponding subtask of
> the previous non-keyed operator. As Flink windows can be executed in
> parallel only on keyed streams, you could  do a little trick - use
> `reinterpredAsKeyedStream` [2]
> <https://ci.apache.org/projects/flink/flink-docs-stable/dev/stream/experimental.html#reinterpreting-a-pre-partitioned-data-stream-as-keyed-stream>.
> This will make it possible to basically have a "passthrough" partitioning,
> without an actual data shuffle. Another alternative would be to implement
> your Map function as a RichMapFunction, which gives you the access to the
> runtime context. From there:
>
> 1) use `getRuntimeContext().getIndexOfThisSubtask();` to retrieve the ID
> of the current subtask
>
> 2) enrich your events with a new field, containing the subtask ID
>
> 3) use this ID as the key in your keyBy operator
>
> The problem is that both of those approaches will be non-deterministic in
> terms of state recovery when, for instance, you would like to scale out
> your job to a higher degree of parallelism. You'd need to decide if this is
> relevant for your use case.
>
>
>
> [1]
> https://ci.apache.org/projects/flink/flink-docs-master/concepts/timely-stream-processing.html#watermarks-in-parallel-streams
>
> [2]
> https://ci.apache.org/projects/flink/flink-docs-stable/dev/stream/experimental.html#reinterpreting-a-pre-partitioned-data-stream-as-keyed-stream
>
>
>
> Best,
>
> --
>
> *Alexander Fedulov* | Solutions Architect
>
> +49 1514 6265796
>
>
>
> [image: Image removed by sender.] <https://www.ververica.com/>
>
>
>
> Follow us @VervericaData
>
> --
>
> Join Flink Forward <https://flink-forward.org/> - The Apache Flink
> Conference
>
> Stream Processing | Event Driven | Real Time
>
> --
>
> Ververica GmbH | Invalidenstrasse 115, 10115 Berlin, Germany
>
> --
>
> Ververica GmbH
> Registered at Amtsgericht Charlottenburg: HRB 158244 B
> Managing Directors: Timothy Alexander Steinert, Yip Park Tung Jason, Ji
> (Tony) Cheng
>
>
>
>
>
>
>
> On Thu, May 14, 2020 at 6:14 AM Gnanasoundari Soundarajan <
> gnanasoundari.soundarajan@man-es.com> wrote:
>
> Hi all,
>
>
>
> I have below queries in flink. Could anyone help me to understand?
>
>
>
> *Query:*
>
> 1 Is watermark maintained  globally at the operator level?
>
> 2 When we have a keyByOperator with parallelism >1, is there a single
> watermark maintained across all the parallel subtasks or for each of the
> parallel subtasks
>
> 3. Assuming I have a keybyoperator with parallelism > 1, is it possible
> to feed data to this operator from only one stream from the previous
> parameter (say map (1) always goes to window (1)
>
>
>
> Regards,
>
> Gnana
>
>

-- 

Arvid Heise | Senior Java Developer

<https://www.ververica.com/>

Follow us @VervericaData

--

Join Flink Forward <https://flink-forward.org/> - The Apache Flink
Conference

Stream Processing | Event Driven | Real Time

--

Ververica GmbH | Invalidenstrasse 115, 10115 Berlin, Germany

--
Ververica GmbH
Registered at Amtsgericht Charlottenburg: HRB 158244 B
Managing Directors: Timothy Alexander Steinert, Yip Park Tung Jason, Ji
(Toni) Cheng

Re: Watermarks and parallelism

Posted by Gnanasoundari Soundarajan <gn...@man-es.com>.
Thanks Alexander for your detailed response.

I have a requirement that each asset will communicate different event time due to connectivity issues. If I have 50 asset and each communicates with different event time, I should not lose the data because of lateness.

To handle this, I have tried with keyBy operator to route the data by asset context and try to maintain watermark per asset (key) using keyedProcess function by registering eventtime timer for each asset (key).  When I have tried this option, I observed that eventtime timer is not triggered keyedProcess function and hence data didn’t flow downstream.

I am curious to know that whether will it be a feasible requirement to achieve it in flink using event time?

Regards,
Gnana

From: Alexander Fedulov <al...@ververica.com>
Date: Thursday, 14 May 2020 at 9:25 PM
To: Gnanasoundari Soundarajan <gn...@man-es.com>
Cc: "user@flink.apache.org" <us...@flink.apache.org>
Subject: Re: Watermarks and parallelism

Hi Gnana,

1. No, watermarks are generated independently per subtask. I think this section of the docs might make things more clear - [1]<https://ci.apache.org/projects/flink/flink-docs-master/concepts/timely-stream-processing.html#watermarks-in-parallel-streams> .

2. The same watermark from the input of the keyBy will be dispatched to all of the instances of the downstream keyed operator. That said, there is no global coordination between the subtasks. The same watermark can arrive at the downstream subtask at a different time, depending on how much time they'd spend on the input channels. Notice also that watermarks are managed on the subtask level, not at the level of the individual keys.

3. I am not quite sure I get what you mean by this one and what exactly you try to achieve. I assume you want to basically have parallel windows that are scoped to all of the items coming from a corresponding subtask of the previous non-keyed operator. As Flink windows can be executed in parallel only on keyed streams, you could  do a little trick - use `reinterpredAsKeyedStream` [2]<https://ci.apache.org/projects/flink/flink-docs-stable/dev/stream/experimental.html#reinterpreting-a-pre-partitioned-data-stream-as-keyed-stream>. This will make it possible to basically have a "passthrough" partitioning, without an actual data shuffle. Another alternative would be to implement your Map function as a RichMapFunction, which gives you the access to the runtime context. From there:
1) use `getRuntimeContext().getIndexOfThisSubtask();` to retrieve the ID of the current subtask
2) enrich your events with a new field, containing the subtask ID
3) use this ID as the key in your keyBy operator
The problem is that both of those approaches will be non-deterministic in terms of state recovery when, for instance, you would like to scale out your job to a higher degree of parallelism. You'd need to decide if this is relevant for your use case.

[1] https://ci.apache.org/projects/flink/flink-docs-master/concepts/timely-stream-processing.html#watermarks-in-parallel-streams
[2] https://ci.apache.org/projects/flink/flink-docs-stable/dev/stream/experimental.html#reinterpreting-a-pre-partitioned-data-stream-as-keyed-stream

Best,

--

Alexander Fedulov | Solutions Architect

+49 1514 6265796


[Image removed by sender.]<https://www.ververica.com/>


Follow us @VervericaData

--

Join Flink Forward<https://flink-forward.org/> - The Apache Flink Conference

Stream Processing | Event Driven | Real Time

--

Ververica GmbH | Invalidenstrasse 115, 10115 Berlin, Germany

--

Ververica GmbH
Registered at Amtsgericht Charlottenburg: HRB 158244 B
Managing Directors: Timothy Alexander Steinert, Yip Park Tung Jason, Ji (Tony) Cheng



On Thu, May 14, 2020 at 6:14 AM Gnanasoundari Soundarajan <gn...@man-es.com>> wrote:
Hi all,

I have below queries in flink. Could anyone help me to understand?

Query:

1 Is watermark maintained  globally at the operator level?

2 When we have a keyByOperator with parallelism >1, is there a single watermark maintained across all the parallel subtasks or for each of the parallel subtasks

3. Assuming I have a keybyoperator with parallelism > 1, is it possible to feed data to this operator from only one stream from the previous parameter (say map (1) always goes to window (1)

Regards,
Gnana

Re: Watermarks and parallelism

Posted by Alexander Fedulov <al...@ververica.com>.
Hi Gnana,

1. No, watermarks are generated independently per subtask. I think this
section of the docs might make things more clear - [1]
<https://ci.apache.org/projects/flink/flink-docs-master/concepts/timely-stream-processing.html#watermarks-in-parallel-streams>
 .

2. The same watermark from the input of the keyBy will be dispatched to all
of the instances of the downstream keyed operator. That said, there is no
global coordination between the subtasks. The same watermark can arrive at
the downstream subtask at a different time, depending on how much time
they'd spend on the input channels. Notice also that watermarks are managed
on the subtask level, not at the level of the individual keys.

3. I am not quite sure I get what you mean by this one and what exactly you
try to achieve. I assume you want to basically have parallel windows that
are scoped to all of the items coming from a corresponding subtask of the
previous non-keyed operator. As Flink windows can be executed in parallel
only on keyed streams, you could  do a little trick - use
`reinterpredAsKeyedStream` [2]
<https://ci.apache.org/projects/flink/flink-docs-stable/dev/stream/experimental.html#reinterpreting-a-pre-partitioned-data-stream-as-keyed-stream>.
This will make it possible to basically have a "passthrough" partitioning,
without an actual data shuffle. Another alternative would be to implement
your Map function as a RichMapFunction, which gives you the access to the
runtime context. From there:
1) use `getRuntimeContext().getIndexOfThisSubtask();` to retrieve the ID of
the current subtask
2) enrich your events with a new field, containing the subtask ID
3) use this ID as the key in your keyBy operator
The problem is that both of those approaches will be non-deterministic in
terms of state recovery when, for instance, you would like to scale out
your job to a higher degree of parallelism. You'd need to decide if this is
relevant for your use case.

[1]
https://ci.apache.org/projects/flink/flink-docs-master/concepts/timely-stream-processing.html#watermarks-in-parallel-streams
[2]
https://ci.apache.org/projects/flink/flink-docs-stable/dev/stream/experimental.html#reinterpreting-a-pre-partitioned-data-stream-as-keyed-stream

Best,

--

Alexander Fedulov | Solutions Architect

+49 1514 6265796

<https://www.ververica.com/>

Follow us @VervericaData

--

Join Flink Forward <https://flink-forward.org/> - The Apache Flink
Conference

Stream Processing | Event Driven | Real Time

--

Ververica GmbH | Invalidenstrasse 115, 10115 Berlin, Germany

--

Ververica GmbH
Registered at Amtsgericht Charlottenburg: HRB 158244 B
Managing Directors: Timothy Alexander Steinert, Yip Park Tung Jason, Ji
(Tony) Cheng



On Thu, May 14, 2020 at 6:14 AM Gnanasoundari Soundarajan <
gnanasoundari.soundarajan@man-es.com> wrote:

> Hi all,
>
>
>
> I have below queries in flink. Could anyone help me to understand?
>
>
>
> *Query:*
>
> 1 Is watermark maintained  globally at the operator level?
>
> 2 When we have a keyByOperator with parallelism >1, is there a single
> watermark maintained across all the parallel subtasks or for each of the
> parallel subtasks
>
> 3. Assuming I have a keybyoperator with parallelism > 1, is it possible
> to feed data to this operator from only one stream from the previous
> parameter (say map (1) always goes to window (1)
>
>
>
> Regards,
>
> Gnana
>