You are viewing a plain text version of this content. The canonical link for it is here.
Posted to dev@flink.apache.org by Till Rohrmann <tr...@apache.org> on 2021/08/05 10:21:58 UTC

Re: [DISCUSS] FLIP-180: Adjust StreamStatus and Idleness definition

Coming back to my previous comment: I would actually propose to separate
the discussion about whether to expose the WatermarkStatus in the sinks or
not from correcting the StreamStatus and Idleness definition in order to
keep the scope of this FLIP as small as possible. If there is a good reason
to expose the WatermarkStatus, then we can probably do it.

Cheers,
Till

On Fri, Jul 30, 2021 at 2:29 PM Arvid Heise <ar...@apache.org> wrote:

> Hi Martijn,
>
> 1. Good question. The watermarks and statuses of the splits are first
> aggregated before emitted through the reader. The watermark strategy of the
> user is actually applied on all SourceOutputs (=splits). Since one split is
> active and one is idle, the watermark of the reader will not advance until
> the user-defined idleness is triggered on the idle split. At this point,
> the combined watermark solely depends on the active split. The combined
> status remains ACTIVE.
> 2. Kafka has no dynamic partitions. This is a complete misnomer on Flink
> side. In fact, if you search for Kafka and partition discovery, you will
> only find Flink resources. What we actually do is dynamic topic discovery
> and that can only be triggered through pattern afaik. We could go for topic
> discovery on all patterns by default if we don't do that already.
> 3. Yes, idleness on assigned partitions would even work with dynamic
> assignments. I will update the FLIP to reflect that.
> 4. Afaik it was only meant for scenario 2 (and your question 3) and it
> should be this way after the FLIP. I don't know of any source
> implementation that uses the user-specified idleness to handle scenario 3.
> The thing that is currently extra is that some readers go idle, when the
> reader doesn't have an active assignment.
>
> Best,
>
> Arvid
>
> On Fri, Jul 30, 2021 at 12:17 PM Martijn Visser <ma...@ververica.com>
> wrote:
>
> > Hi all,
> >
> > I have a couple of questions after studying the FLIP and the docs:
> >
> > 1. What happens when one of the readers has two splits assigned and one
> of
> > the splits actually receives data?
> >
> > 2. If I understand it correctly the Kinesis Source uses dynamic shard
> > discovery by default (so in case of idleness scenario 3 would happen
> there)
> > and the FileSource also has a dynamic assignment. The Kafka Source
> doesn't
> > use dynamic partition discovery by default (so scenario 2 would be the
> > default to happen there). Why did we choose to not enable dynamic
> partition
> > discovery by default and should we actually change that?
> >
> > 3. To be sure, is it correct that in case of a dynamic assignment and
> there
> > is temporarily no data, that scenario 2 is applicable?
> >
> > 4. Does WatermarkStrategy#withIdleness currently cover scenario 2, 3 and
> > the one from my 3rd question? (edited)
> >
> > Best regards,
> >
> > Martijn
> >
> > On Fri, 23 Jul 2021 at 15:57, Till Rohrmann <tr...@apache.org>
> wrote:
> >
> > > Hi everyone,
> > >
> > > I would be in favour of what Arvid said about not exposing the
> > > WatermarkStatus to the Sink. Unless there is a very strong argument
> that
> > > this is required I think that keeping this concept internal seems to me
> > the
> > > better choice right now. Moreover, as Arvid said the downstream
> > application
> > > can derive the WatermarkStatus on their own depending on its business
> > > logic.
> > >
> > > Cheers,
> > > Till
> > >
> > > On Fri, Jul 23, 2021 at 2:15 PM Arvid Heise <ar...@apache.org> wrote:
> > >
> > > > Hi Eron,
> > > >
> > > > thank you very much for your feedback.
> > > >
> > > > Please mention that the "temporary status toggle" code will be
> removed.
> > > > >
> > > > This code is already removed but there is still some automation of
> > going
> > > > idle when temporary no splits are assigned. I will include it in the
> > > FLIP.
> > > >
> > > > I agree with adding the markActive() functionality, for symmetry.
> > > Speaking
> > > > > of symmetry, could we now include the minor enhancement we
> discussed
> > in
> > > > > FLIP-167, the exposure of watermark status changes on the Sink
> > > interface.
> > > > > I drafted a PR and would be happy to revisit it.
> > > > >
> > > > >
> > > >
> > >
> >
> https://github.com/streamnative/flink/pull/2/files#diff-64d9c652ffc3c60b6d838200a24b106eeeda4b2d853deae94dbbdf16d8d694c2R62-R70
> > > >
> > > > I'm still not sure if that's a good idea.
> > > >
> > > > If we have now refined idleness to be an user-specified,
> > > > application-specific way to handle with temporarily stalled
> partitions,
> > > > then downstream applications should actually implement their own
> > idleness
> > > > definition. Let's see what other devs think. I'm pinging the once
> that
> > > have
> > > > been most involved in the discussion: @Stephan Ewen <
> sewen@apache.org>
> > > > @Till
> > > > Rohrmann <tr...@apache.org> @Dawid Wysakowicz <
> > > dwysakowicz@apache.org>
> > > > .
> > > >
> > > > The flip mentions a 'watermarkstatus' package for the WatermarkStatus
> > > > > class.  Should it be 'eventtime' package?
> > > > >
> > > > Are you proposing org.apache.flink.api.common.eventtime? I was simply
> > > > suggesting to simply rename
> > > > org.apache.flink.streaming.runtime.streamstatus but I'm very open for
> > > other
> > > > suggestions (given that there are only 2 classes in the package).
> > > >
> > > >
> > > > > Regarding the change of 'streamStatus' to 'watermarkStatus', could
> > you
> > > > > spell out what the new method names will be on each interface? May
> I
> > > > > suggest that Input.emitStreamStatus be Input.processStreamStatus?
> > This
> > > > is
> > > > > to help decouple the input's watermark status from the output's
> > > watermark
> > > > > status.
> > > > >
> > > > I haven't found
> > > > org.apache.flink.streaming.api.operators.Input#emitStreamStatus in
> > > master.
> > > > Could you double-check if I'm looking at the correct class?
> > > >
> > > > The current idea was mainly to grep+replace
> > > /streamStatus/watermarkStatus/
> > > > and /StreamStatus/WatermarkStatus/. But again I'm very open for more
> > > > descriptive names. I can add an explicit list later. I'm assuming you
> > are
> > > > only interested in (semi-)public classes.
> > > >
> > > >
> > > > > I observe that AbstractStreamOperator is hardcoded to derive the
> > output
> > > > > channel's status from the input channel's status.  May I suggest
> > > > > we refactor
> > > "AbstractStreamOperator::emitStreamStatus(StreamStatus,int)"
> > > > to
> > > > > allow for the operator subclass to customize the processing of the
> > > > > aggregated watermark and watermark status.
> > > > >
> > > > Can you add a motivation for that?
> > > > @Dawid Wysakowicz <dw...@apache.org> , I think you are the
> last
> > > > person that touched the code. Do you have some example operators in
> > your
> > > > head that would change it?
> > > >
> > > > Maybe the FLIP should spell out the expected behavior of the generic
> > > > > watermark generator (TimestampsAndWatermarksOperator).  Should the
> > > > > generator ignore the upstream idleness signal?  I believe it
> > propagates
> > > > the
> > > > > signal, even though it also generates its own signals.   Given that
> > > > > source-based and generic watermark generation shouldn't be
> combined,
> > > one
> > > > > could argue that the generic watermark generator should activate
> only
> > > > when
> > > > > its input channel's watermark status is idle.
> > > > >
> > > > I will add a section. In general, we assume that we only have
> > > source-based
> > > > watermark generators once FLIP-27 is properly adopted.
> > > >
> > > > Best,
> > > >
> > > > Arvid
> > > >
> > > > On Wed, Jul 21, 2021 at 12:40 AM Eron Wright
> > > > <ew...@streamnative.io.invalid> wrote:
> > > >
> > > > > This proposal to narrow the definition of idleness to focus on the
> > > > > event-time clock is great.
> > > > >
> > > > > Please mention that the "temporary status toggle" code will be
> > removed.
> > > > >
> > > > > I agree with adding the markActive() functionality, for symmetry.
> > > > Speaking
> > > > > of symmetry, could we now include the minor enhancement we
> discussed
> > in
> > > > > FLIP-167, the exposure of watermark status changes on the Sink
> > > interface.
> > > > > I drafted a PR and would be happy to revisit it.
> > > > >
> > > > >
> > > >
> > >
> >
> https://github.com/streamnative/flink/pull/2/files#diff-64d9c652ffc3c60b6d838200a24b106eeeda4b2d853deae94dbbdf16d8d694c2R62-R70
> > > > >
> > > > > The flip mentions a 'watermarkstatus' package for the
> WatermarkStatus
> > > > > class.  Should it be 'eventtime' package?
> > > > >
> > > > > Regarding the change of 'streamStatus' to 'watermarkStatus', could
> > you
> > > > > spell out what the new method names will be on each interface? May
> I
> > > > > suggest that Input.emitStreamStatus be Input.processStreamStatus?
> > This
> > > > is
> > > > > to help decouple the input's watermark status from the output's
> > > watermark
> > > > > status.
> > > > >
> > > > > I observe that AbstractStreamOperator is hardcoded to derive the
> > output
> > > > > channel's status from the input channel's status.  May I suggest
> > > > > we refactor
> > > "AbstractStreamOperator::emitStreamStatus(StreamStatus,int)"
> > > > to
> > > > > allow for the operator subclass to customize the processing of the
> > > > > aggregated watermark and watermark status.
> > > > >
> > > > > Maybe the FLIP should spell out the expected behavior of the
> generic
> > > > > watermark generator (TimestampsAndWatermarksOperator).  Should the
> > > > > generator ignore the upstream idleness signal?  I believe it
> > propagates
> > > > the
> > > > > signal, even though it also generates its own signals.   Given that
> > > > > source-based and generic watermark generation shouldn't be
> combined,
> > > one
> > > > > could argue that the generic watermark generator should activate
> only
> > > > when
> > > > > its input channel's watermark status is idle.
> > > > >
> > > > > Thanks again for this effort!
> > > > > -Eron
> > > > >
> > > > >
> > > > > On Sun, Jul 18, 2021 at 11:53 PM Arvid Heise <ar...@apache.org>
> > wrote:
> > > > >
> > > > > > Dear devs,
> > > > > >
> > > > > > We recently discovered that StreamStatus and Idleness is
> > > insufficiently
> > > > > > defined [1], so I drafted a FLIP [3] to amend that situation. It
> > > would
> > > > be
> > > > > > good to hear more opinions on that matter. Ideally, we can make
> the
> > > > > changes
> > > > > > to 1.14 as some newly added methods are affected.
> > > > > >
> > > > > > Best,
> > > > > >
> > > > > > Arvid
> > > > > >
> > > > > > [1]
> > > > > >
> > > > > >
> > > > >
> > > >
> > >
> >
> https://lists.apache.org/thread.html/r5194e1cf157d1fd5ba7ca9b567cb01723bd582aa12dda57d25bca37e%40%3Cdev.flink.apache.org%3E
> > > > > > [2]
> > > > > >
> > > > > >
> > > > >
> > > >
> > >
> >
> https://lists.apache.org/thread.html/rb871f5aecbca6e5d786303557a6cdb3d425954385cbdb1b777f2fcf5%40%3Cdev.flink.apache.org%3E
> > > > > > [3]
> > > > > >
> > > > > >
> > > > >
> > > >
> > >
> >
> https://cwiki.apache.org/confluence/display/FLINK/FLIP-180%3A+Adjust+StreamStatus+and+Idleness+definition
> > > > > >
> > > > >
> > > >
> > >
> >
>

Re: [DISCUSS] FLIP-180: Adjust StreamStatus and Idleness definition

Posted by Arvid Heise <ar...@apache.org>.
Hi Dawid,

thanks for clarification. Having looked into the related classes again, I
agree that exposing the current StreamStatus would severly limit future
developments. I guess to expose it in the Sink, we would need to have a new
WatermarkStatus in eventtime package (as per Eron's suggestion) that is
independent of the internal status. It's very similar to how we have 2
Watermarks (and I now finally understand why there are 2 of them). I'd like
to leave that as future work.

With that being said, I started the vote thread about the original proposal
[1].

The explicit listing of the name changes are quite verbose, so I left it
out in the FLIP. I'd like to point to the POC PR [2] for details.
Ultimately, I can also attach an appendix or even an attachment if someone
needs it to cast the vote.

[1]
https://lists.apache.org/thread.html/rcfcb9126e31d6641e1cc96834310c5b6fafff0c948973f97d1ac70f2%40%3Cdev.flink.apache.org%3E
[2] https://github.com/apache/flink/pull/16433

On Thu, Aug 5, 2021 at 3:06 PM Dawid Wysakowicz <dw...@apache.org>
wrote:

> Hey all,
>
> Just a couple of comments from my side as I was called here.
>
> +1 for making stream status just about watermarks.
>
> I observe that AbstractStreamOperator is hardcoded to derive the output
> channel's status from the input channel's status.  May I suggest
> we refactor "AbstractStreamOperator::emitStreamStatus(StreamStatus,int)" to
> allow for the operator subclass to customize the processing of the
> aggregated watermark and watermark status.
>
> The reason for making that method final from my side is very similar to
> the reason of this discussion. There is/was no clear definition of what
> StreamStatus is and what it is not. I did not want to let users arbitrarily
> play with it until we have a clear semantic. Even if I still find the
> Operator API only semi public.
>
> As for the quesion if we should or should not expose it in sinks. If we
> say it is a purely user defined logic that affects only watermarks, even if
> hardly I can imagine it can be persisted. Still personally I don't think it
> is a good approach. As I mentioned a few times before, I see stream status
> as a tradeoff between correctness and making progress in real/processing
> time.
>
> However if we decide to expose that in sinks it must not be the
> StreamStatus we have now. It must be a completely new class. The current
> StreamStatus extends from StreamElement which is a really low level concept
> which I am strongly against exposing in any kind of public API.
>
> Best,
>
> Dawid
> On 05/08/2021 12:21, Till Rohrmann wrote:
>
> Coming back to my previous comment: I would actually propose to separate
> the discussion about whether to expose the WatermarkStatus in the sinks or
> not from correcting the StreamStatus and Idleness definition in order to
> keep the scope of this FLIP as small as possible. If there is a good reason
> to expose the WatermarkStatus, then we can probably do it.
>
> Cheers,
> Till
>
> On Fri, Jul 30, 2021 at 2:29 PM Arvid Heise <ar...@apache.org> <ar...@apache.org> wrote:
>
>
> Hi Martijn,
>
> 1. Good question. The watermarks and statuses of the splits are first
> aggregated before emitted through the reader. The watermark strategy of the
> user is actually applied on all SourceOutputs (=splits). Since one split is
> active and one is idle, the watermark of the reader will not advance until
> the user-defined idleness is triggered on the idle split. At this point,
> the combined watermark solely depends on the active split. The combined
> status remains ACTIVE.
> 2. Kafka has no dynamic partitions. This is a complete misnomer on Flink
> side. In fact, if you search for Kafka and partition discovery, you will
> only find Flink resources. What we actually do is dynamic topic discovery
> and that can only be triggered through pattern afaik. We could go for topic
> discovery on all patterns by default if we don't do that already.
> 3. Yes, idleness on assigned partitions would even work with dynamic
> assignments. I will update the FLIP to reflect that.
> 4. Afaik it was only meant for scenario 2 (and your question 3) and it
> should be this way after the FLIP. I don't know of any source
> implementation that uses the user-specified idleness to handle scenario 3.
> The thing that is currently extra is that some readers go idle, when the
> reader doesn't have an active assignment.
>
> Best,
>
> Arvid
>
> On Fri, Jul 30, 2021 at 12:17 PM Martijn Visser <ma...@ververica.com> <ma...@ververica.com>
> wrote:
>
>
> Hi all,
>
> I have a couple of questions after studying the FLIP and the docs:
>
> 1. What happens when one of the readers has two splits assigned and one
>
> of
>
> the splits actually receives data?
>
> 2. If I understand it correctly the Kinesis Source uses dynamic shard
> discovery by default (so in case of idleness scenario 3 would happen
>
> there)
>
> and the FileSource also has a dynamic assignment. The Kafka Source
>
> doesn't
>
> use dynamic partition discovery by default (so scenario 2 would be the
> default to happen there). Why did we choose to not enable dynamic
>
> partition
>
> discovery by default and should we actually change that?
>
> 3. To be sure, is it correct that in case of a dynamic assignment and
>
> there
>
> is temporarily no data, that scenario 2 is applicable?
>
> 4. Does WatermarkStrategy#withIdleness currently cover scenario 2, 3 and
> the one from my 3rd question? (edited)
>
> Best regards,
>
> Martijn
>
> On Fri, 23 Jul 2021 at 15:57, Till Rohrmann <tr...@apache.org> <tr...@apache.org>
>
> wrote:
>
> Hi everyone,
>
> I would be in favour of what Arvid said about not exposing the
> WatermarkStatus to the Sink. Unless there is a very strong argument
>
> that
>
> this is required I think that keeping this concept internal seems to me
>
> the
>
> better choice right now. Moreover, as Arvid said the downstream
>
> application
>
> can derive the WatermarkStatus on their own depending on its business
> logic.
>
> Cheers,
> Till
>
> On Fri, Jul 23, 2021 at 2:15 PM Arvid Heise <ar...@apache.org> <ar...@apache.org> wrote:
>
>
> Hi Eron,
>
> thank you very much for your feedback.
>
> Please mention that the "temporary status toggle" code will be
>
> removed.
>
> This code is already removed but there is still some automation of
>
> going
>
> idle when temporary no splits are assigned. I will include it in the
>
> FLIP.
>
> I agree with adding the markActive() functionality, for symmetry.
>
> Speaking
>
> of symmetry, could we now include the minor enhancement we
>
> discussed
>
> in
>
> FLIP-167, the exposure of watermark status changes on the Sink
>
> interface.
>
> I drafted a PR and would be happy to revisit it.
>
>
>
> https://github.com/streamnative/flink/pull/2/files#diff-64d9c652ffc3c60b6d838200a24b106eeeda4b2d853deae94dbbdf16d8d694c2R62-R70
>
> I'm still not sure if that's a good idea.
>
> If we have now refined idleness to be an user-specified,
> application-specific way to handle with temporarily stalled
>
> partitions,
>
> then downstream applications should actually implement their own
>
> idleness
>
> definition. Let's see what other devs think. I'm pinging the once
>
> that
>
> have
>
> been most involved in the discussion: @Stephan Ewen <
>
> sewen@apache.org>
>
> @Till
> Rohrmann <tr...@apache.org> <tr...@apache.org> @Dawid Wysakowicz <
>
> dwysakowicz@apache.org>
>
> .
>
> The flip mentions a 'watermarkstatus' package for the WatermarkStatus
>
> class.  Should it be 'eventtime' package?
>
>
> Are you proposing org.apache.flink.api.common.eventtime? I was simply
> suggesting to simply rename
> org.apache.flink.streaming.runtime.streamstatus but I'm very open for
>
> other
>
> suggestions (given that there are only 2 classes in the package).
>
>
>
> Regarding the change of 'streamStatus' to 'watermarkStatus', could
>
> you
>
> spell out what the new method names will be on each interface? May
>
> I
>
> suggest that Input.emitStreamStatus be Input.processStreamStatus?
>
> This
>
> is
>
> to help decouple the input's watermark status from the output's
>
> watermark
>
> status.
>
>
> I haven't found
> org.apache.flink.streaming.api.operators.Input#emitStreamStatus in
>
> master.
>
> Could you double-check if I'm looking at the correct class?
>
> The current idea was mainly to grep+replace
>
> /streamStatus/watermarkStatus/
>
> and /StreamStatus/WatermarkStatus/. But again I'm very open for more
> descriptive names. I can add an explicit list later. I'm assuming you
>
> are
>
> only interested in (semi-)public classes.
>
>
>
> I observe that AbstractStreamOperator is hardcoded to derive the
>
> output
>
> channel's status from the input channel's status.  May I suggest
> we refactor
>
> "AbstractStreamOperator::emitStreamStatus(StreamStatus,int)"
>
> to
>
> allow for the operator subclass to customize the processing of the
> aggregated watermark and watermark status.
>
>
> Can you add a motivation for that?
> @Dawid Wysakowicz <dw...@apache.org> <dw...@apache.org> , I think you are the
>
> last
>
> person that touched the code. Do you have some example operators in
>
> your
>
> head that would change it?
>
> Maybe the FLIP should spell out the expected behavior of the generic
>
> watermark generator (TimestampsAndWatermarksOperator).  Should the
> generator ignore the upstream idleness signal?  I believe it
>
> propagates
>
> the
>
> signal, even though it also generates its own signals.   Given that
> source-based and generic watermark generation shouldn't be
>
> combined,
>
> one
>
> could argue that the generic watermark generator should activate
>
> only
>
> when
>
> its input channel's watermark status is idle.
>
>
> I will add a section. In general, we assume that we only have
>
> source-based
>
> watermark generators once FLIP-27 is properly adopted.
>
> Best,
>
> Arvid
>
> On Wed, Jul 21, 2021 at 12:40 AM Eron Wright<ew...@streamnative.io.invalid> <ew...@streamnative.io.invalid> wrote:
>
>
> This proposal to narrow the definition of idleness to focus on the
> event-time clock is great.
>
> Please mention that the "temporary status toggle" code will be
>
> removed.
>
> I agree with adding the markActive() functionality, for symmetry.
>
> Speaking
>
> of symmetry, could we now include the minor enhancement we
>
> discussed
>
> in
>
> FLIP-167, the exposure of watermark status changes on the Sink
>
> interface.
>
> I drafted a PR and would be happy to revisit it.
>
>
>
> https://github.com/streamnative/flink/pull/2/files#diff-64d9c652ffc3c60b6d838200a24b106eeeda4b2d853deae94dbbdf16d8d694c2R62-R70
>
> The flip mentions a 'watermarkstatus' package for the
>
> WatermarkStatus
>
> class.  Should it be 'eventtime' package?
>
> Regarding the change of 'streamStatus' to 'watermarkStatus', could
>
> you
>
> spell out what the new method names will be on each interface? May
>
> I
>
> suggest that Input.emitStreamStatus be Input.processStreamStatus?
>
> This
>
> is
>
> to help decouple the input's watermark status from the output's
>
> watermark
>
> status.
>
> I observe that AbstractStreamOperator is hardcoded to derive the
>
> output
>
> channel's status from the input channel's status.  May I suggest
> we refactor
>
> "AbstractStreamOperator::emitStreamStatus(StreamStatus,int)"
>
> to
>
> allow for the operator subclass to customize the processing of the
> aggregated watermark and watermark status.
>
> Maybe the FLIP should spell out the expected behavior of the
>
> generic
>
> watermark generator (TimestampsAndWatermarksOperator).  Should the
> generator ignore the upstream idleness signal?  I believe it
>
> propagates
>
> the
>
> signal, even though it also generates its own signals.   Given that
> source-based and generic watermark generation shouldn't be
>
> combined,
>
> one
>
> could argue that the generic watermark generator should activate
>
> only
>
> when
>
> its input channel's watermark status is idle.
>
> Thanks again for this effort!
> -Eron
>
>
> On Sun, Jul 18, 2021 at 11:53 PM Arvid Heise <ar...@apache.org> <ar...@apache.org>
>
> wrote:
>
> Dear devs,
>
> We recently discovered that StreamStatus and Idleness is
>
> insufficiently
>
> defined [1], so I drafted a FLIP [3] to amend that situation. It
>
> would
>
> be
>
> good to hear more opinions on that matter. Ideally, we can make
>
> the
>
> changes
>
> to 1.14 as some newly added methods are affected.
>
> Best,
>
> Arvid
>
> [1]
>
>
>
> https://lists.apache.org/thread.html/r5194e1cf157d1fd5ba7ca9b567cb01723bd582aa12dda57d25bca37e%40%3Cdev.flink.apache.org%3E
>
> [2]
>
>
>
> https://lists.apache.org/thread.html/rb871f5aecbca6e5d786303557a6cdb3d425954385cbdb1b777f2fcf5%40%3Cdev.flink.apache.org%3E
>
> [3]
>
>
>
> https://cwiki.apache.org/confluence/display/FLINK/FLIP-180%3A+Adjust+StreamStatus+and+Idleness+definition
>
>

Re: [DISCUSS] FLIP-180: Adjust StreamStatus and Idleness definition

Posted by Dawid Wysakowicz <dw...@apache.org>.
Hey all,

Just a couple of comments from my side as I was called here.

+1 for making stream status just about watermarks.

    I observe that AbstractStreamOperator is hardcoded to derive the output
    channel's status from the input channel's status.  May I suggest
    we refactor
    "AbstractStreamOperator::emitStreamStatus(StreamStatus,int)" to
    allow for the operator subclass to customize the processing of the
    aggregated watermark and watermark status.

The reason for making that method final from my side is very similar to
the reason of this discussion. There is/was no clear definition of what
StreamStatus is and what it is not. I did not want to let users
arbitrarily play with it until we have a clear semantic. Even if I still
find the Operator API only semi public.

As for the quesion if we should or should not expose it in sinks. If we
say it is a purely user defined logic that affects only watermarks, even
if hardly I can imagine it can be persisted. Still personally I don't
think it is a good approach. As I mentioned a few times before, I see
stream status as a tradeoff between correctness and making progress in
real/processing time.

However if we decide to expose that in sinks it must not be the
StreamStatus we have now. It must be a completely new class. The current
StreamStatus extends from StreamElement which is a really low level
concept which I am strongly against exposing in any kind of public API.

Best,

Dawid

On 05/08/2021 12:21, Till Rohrmann wrote:
> Coming back to my previous comment: I would actually propose to separate
> the discussion about whether to expose the WatermarkStatus in the sinks or
> not from correcting the StreamStatus and Idleness definition in order to
> keep the scope of this FLIP as small as possible. If there is a good reason
> to expose the WatermarkStatus, then we can probably do it.
>
> Cheers,
> Till
>
> On Fri, Jul 30, 2021 at 2:29 PM Arvid Heise <ar...@apache.org> wrote:
>
>> Hi Martijn,
>>
>> 1. Good question. The watermarks and statuses of the splits are first
>> aggregated before emitted through the reader. The watermark strategy of the
>> user is actually applied on all SourceOutputs (=splits). Since one split is
>> active and one is idle, the watermark of the reader will not advance until
>> the user-defined idleness is triggered on the idle split. At this point,
>> the combined watermark solely depends on the active split. The combined
>> status remains ACTIVE.
>> 2. Kafka has no dynamic partitions. This is a complete misnomer on Flink
>> side. In fact, if you search for Kafka and partition discovery, you will
>> only find Flink resources. What we actually do is dynamic topic discovery
>> and that can only be triggered through pattern afaik. We could go for topic
>> discovery on all patterns by default if we don't do that already.
>> 3. Yes, idleness on assigned partitions would even work with dynamic
>> assignments. I will update the FLIP to reflect that.
>> 4. Afaik it was only meant for scenario 2 (and your question 3) and it
>> should be this way after the FLIP. I don't know of any source
>> implementation that uses the user-specified idleness to handle scenario 3.
>> The thing that is currently extra is that some readers go idle, when the
>> reader doesn't have an active assignment.
>>
>> Best,
>>
>> Arvid
>>
>> On Fri, Jul 30, 2021 at 12:17 PM Martijn Visser <ma...@ververica.com>
>> wrote:
>>
>>> Hi all,
>>>
>>> I have a couple of questions after studying the FLIP and the docs:
>>>
>>> 1. What happens when one of the readers has two splits assigned and one
>> of
>>> the splits actually receives data?
>>>
>>> 2. If I understand it correctly the Kinesis Source uses dynamic shard
>>> discovery by default (so in case of idleness scenario 3 would happen
>> there)
>>> and the FileSource also has a dynamic assignment. The Kafka Source
>> doesn't
>>> use dynamic partition discovery by default (so scenario 2 would be the
>>> default to happen there). Why did we choose to not enable dynamic
>> partition
>>> discovery by default and should we actually change that?
>>>
>>> 3. To be sure, is it correct that in case of a dynamic assignment and
>> there
>>> is temporarily no data, that scenario 2 is applicable?
>>>
>>> 4. Does WatermarkStrategy#withIdleness currently cover scenario 2, 3 and
>>> the one from my 3rd question? (edited)
>>>
>>> Best regards,
>>>
>>> Martijn
>>>
>>> On Fri, 23 Jul 2021 at 15:57, Till Rohrmann <tr...@apache.org>
>> wrote:
>>>> Hi everyone,
>>>>
>>>> I would be in favour of what Arvid said about not exposing the
>>>> WatermarkStatus to the Sink. Unless there is a very strong argument
>> that
>>>> this is required I think that keeping this concept internal seems to me
>>> the
>>>> better choice right now. Moreover, as Arvid said the downstream
>>> application
>>>> can derive the WatermarkStatus on their own depending on its business
>>>> logic.
>>>>
>>>> Cheers,
>>>> Till
>>>>
>>>> On Fri, Jul 23, 2021 at 2:15 PM Arvid Heise <ar...@apache.org> wrote:
>>>>
>>>>> Hi Eron,
>>>>>
>>>>> thank you very much for your feedback.
>>>>>
>>>>> Please mention that the "temporary status toggle" code will be
>> removed.
>>>>> This code is already removed but there is still some automation of
>>> going
>>>>> idle when temporary no splits are assigned. I will include it in the
>>>> FLIP.
>>>>> I agree with adding the markActive() functionality, for symmetry.
>>>> Speaking
>>>>>> of symmetry, could we now include the minor enhancement we
>> discussed
>>> in
>>>>>> FLIP-167, the exposure of watermark status changes on the Sink
>>>> interface.
>>>>>> I drafted a PR and would be happy to revisit it.
>>>>>>
>>>>>>
>> https://github.com/streamnative/flink/pull/2/files#diff-64d9c652ffc3c60b6d838200a24b106eeeda4b2d853deae94dbbdf16d8d694c2R62-R70
>>>>> I'm still not sure if that's a good idea.
>>>>>
>>>>> If we have now refined idleness to be an user-specified,
>>>>> application-specific way to handle with temporarily stalled
>> partitions,
>>>>> then downstream applications should actually implement their own
>>> idleness
>>>>> definition. Let's see what other devs think. I'm pinging the once
>> that
>>>> have
>>>>> been most involved in the discussion: @Stephan Ewen <
>> sewen@apache.org>
>>>>> @Till
>>>>> Rohrmann <tr...@apache.org> @Dawid Wysakowicz <
>>>> dwysakowicz@apache.org>
>>>>> .
>>>>>
>>>>> The flip mentions a 'watermarkstatus' package for the WatermarkStatus
>>>>>> class.  Should it be 'eventtime' package?
>>>>>>
>>>>> Are you proposing org.apache.flink.api.common.eventtime? I was simply
>>>>> suggesting to simply rename
>>>>> org.apache.flink.streaming.runtime.streamstatus but I'm very open for
>>>> other
>>>>> suggestions (given that there are only 2 classes in the package).
>>>>>
>>>>>
>>>>>> Regarding the change of 'streamStatus' to 'watermarkStatus', could
>>> you
>>>>>> spell out what the new method names will be on each interface? May
>> I
>>>>>> suggest that Input.emitStreamStatus be Input.processStreamStatus?
>>> This
>>>>> is
>>>>>> to help decouple the input's watermark status from the output's
>>>> watermark
>>>>>> status.
>>>>>>
>>>>> I haven't found
>>>>> org.apache.flink.streaming.api.operators.Input#emitStreamStatus in
>>>> master.
>>>>> Could you double-check if I'm looking at the correct class?
>>>>>
>>>>> The current idea was mainly to grep+replace
>>>> /streamStatus/watermarkStatus/
>>>>> and /StreamStatus/WatermarkStatus/. But again I'm very open for more
>>>>> descriptive names. I can add an explicit list later. I'm assuming you
>>> are
>>>>> only interested in (semi-)public classes.
>>>>>
>>>>>
>>>>>> I observe that AbstractStreamOperator is hardcoded to derive the
>>> output
>>>>>> channel's status from the input channel's status.  May I suggest
>>>>>> we refactor
>>>> "AbstractStreamOperator::emitStreamStatus(StreamStatus,int)"
>>>>> to
>>>>>> allow for the operator subclass to customize the processing of the
>>>>>> aggregated watermark and watermark status.
>>>>>>
>>>>> Can you add a motivation for that?
>>>>> @Dawid Wysakowicz <dw...@apache.org> , I think you are the
>> last
>>>>> person that touched the code. Do you have some example operators in
>>> your
>>>>> head that would change it?
>>>>>
>>>>> Maybe the FLIP should spell out the expected behavior of the generic
>>>>>> watermark generator (TimestampsAndWatermarksOperator).  Should the
>>>>>> generator ignore the upstream idleness signal?  I believe it
>>> propagates
>>>>> the
>>>>>> signal, even though it also generates its own signals.   Given that
>>>>>> source-based and generic watermark generation shouldn't be
>> combined,
>>>> one
>>>>>> could argue that the generic watermark generator should activate
>> only
>>>>> when
>>>>>> its input channel's watermark status is idle.
>>>>>>
>>>>> I will add a section. In general, we assume that we only have
>>>> source-based
>>>>> watermark generators once FLIP-27 is properly adopted.
>>>>>
>>>>> Best,
>>>>>
>>>>> Arvid
>>>>>
>>>>> On Wed, Jul 21, 2021 at 12:40 AM Eron Wright
>>>>> <ew...@streamnative.io.invalid> wrote:
>>>>>
>>>>>> This proposal to narrow the definition of idleness to focus on the
>>>>>> event-time clock is great.
>>>>>>
>>>>>> Please mention that the "temporary status toggle" code will be
>>> removed.
>>>>>> I agree with adding the markActive() functionality, for symmetry.
>>>>> Speaking
>>>>>> of symmetry, could we now include the minor enhancement we
>> discussed
>>> in
>>>>>> FLIP-167, the exposure of watermark status changes on the Sink
>>>> interface.
>>>>>> I drafted a PR and would be happy to revisit it.
>>>>>>
>>>>>>
>> https://github.com/streamnative/flink/pull/2/files#diff-64d9c652ffc3c60b6d838200a24b106eeeda4b2d853deae94dbbdf16d8d694c2R62-R70
>>>>>> The flip mentions a 'watermarkstatus' package for the
>> WatermarkStatus
>>>>>> class.  Should it be 'eventtime' package?
>>>>>>
>>>>>> Regarding the change of 'streamStatus' to 'watermarkStatus', could
>>> you
>>>>>> spell out what the new method names will be on each interface? May
>> I
>>>>>> suggest that Input.emitStreamStatus be Input.processStreamStatus?
>>> This
>>>>> is
>>>>>> to help decouple the input's watermark status from the output's
>>>> watermark
>>>>>> status.
>>>>>>
>>>>>> I observe that AbstractStreamOperator is hardcoded to derive the
>>> output
>>>>>> channel's status from the input channel's status.  May I suggest
>>>>>> we refactor
>>>> "AbstractStreamOperator::emitStreamStatus(StreamStatus,int)"
>>>>> to
>>>>>> allow for the operator subclass to customize the processing of the
>>>>>> aggregated watermark and watermark status.
>>>>>>
>>>>>> Maybe the FLIP should spell out the expected behavior of the
>> generic
>>>>>> watermark generator (TimestampsAndWatermarksOperator).  Should the
>>>>>> generator ignore the upstream idleness signal?  I believe it
>>> propagates
>>>>> the
>>>>>> signal, even though it also generates its own signals.   Given that
>>>>>> source-based and generic watermark generation shouldn't be
>> combined,
>>>> one
>>>>>> could argue that the generic watermark generator should activate
>> only
>>>>> when
>>>>>> its input channel's watermark status is idle.
>>>>>>
>>>>>> Thanks again for this effort!
>>>>>> -Eron
>>>>>>
>>>>>>
>>>>>> On Sun, Jul 18, 2021 at 11:53 PM Arvid Heise <ar...@apache.org>
>>> wrote:
>>>>>>> Dear devs,
>>>>>>>
>>>>>>> We recently discovered that StreamStatus and Idleness is
>>>> insufficiently
>>>>>>> defined [1], so I drafted a FLIP [3] to amend that situation. It
>>>> would
>>>>> be
>>>>>>> good to hear more opinions on that matter. Ideally, we can make
>> the
>>>>>> changes
>>>>>>> to 1.14 as some newly added methods are affected.
>>>>>>>
>>>>>>> Best,
>>>>>>>
>>>>>>> Arvid
>>>>>>>
>>>>>>> [1]
>>>>>>>
>>>>>>>
>> https://lists.apache.org/thread.html/r5194e1cf157d1fd5ba7ca9b567cb01723bd582aa12dda57d25bca37e%40%3Cdev.flink.apache.org%3E
>>>>>>> [2]
>>>>>>>
>>>>>>>
>> https://lists.apache.org/thread.html/rb871f5aecbca6e5d786303557a6cdb3d425954385cbdb1b777f2fcf5%40%3Cdev.flink.apache.org%3E
>>>>>>> [3]
>>>>>>>
>>>>>>>
>> https://cwiki.apache.org/confluence/display/FLINK/FLIP-180%3A+Adjust+StreamStatus+and+Idleness+definition