You are viewing a plain text version of this content. The canonical link for it is here.
Posted to dev@flink.apache.org by Jeff Carter <jp...@gmail.com> on 2022/02/01 20:34:30 UTC

Re: Need help with finding inner workings of watermark stream idleness

Thanks, Till.

That definitely helps a bit. I'm still not seeing where there is some idle
variable that the output.markIdle is setting to true (or whatever it sets).
Like the ideal thing would be if there is just some "output.isIdle()" that
could be called to know if the stream is or isnt idle. Since that doesn't
exist, what is the variable in "output" that dictates if it is idle or not
that that I'd just have to make an isIdle() method to make its state
visible to other code.

I see the checkIfIdle() method in the code (in at least the testing piece)
you pointed out, but that seems like it's just a way to set a timer and
check if the idle state should be set or not. But I dont know if that's
setting some isIdle variable or if it's just checked and calculated
everytime and that method is basically the variable I'm looking for. But
that might just be my confusion.

On Tue, Jan 11, 2022, 11:05 AM Till Rohrmann <tr...@apache.org> wrote:

> Hi Jeff,
>
> I think this happens in the WatermarksWithIdleness [1].
>
> [1]
>
> https://github.com/apache/flink/blob/master/flink-core/src/main/java/org/apache/flink/api/common/eventtime/WatermarksWithIdleness.java#L73
>
> Cheers,
> Till
>
> On Tue, Jan 11, 2022 at 6:05 PM Jeff Carter <jp...@gmail.com> wrote:
>
> > I'm looking into making a feature for flink related to watermarks and am
> > digging into the inner watermark mechanisms, specifically with idleness.
> > I'm familiar with idleness, but digging into the root code I can only get
> > to where idlenessTimeout gets set in WatermarkStrategyWithIdleness.java.
> >
> >  But what I'm looking for the pieces beyond that. If I set the idleness
> to
> > 500 milliseconds, where in the code does it actually go "I haven't seen a
> > message in 500 milliseconds. I'm setting this stream to idle."?
> >
> > The reason being that what I'm thinking of would need to be able to see
> if
> > any streams are marked idle, and if so react accordingly.
> >
> > Thanks for any help in advance.
> >
>

Re: Need help with finding inner workings of watermark stream idleness

Posted by Jeff Carter <jp...@gmail.com>.
Thanks, Seth. Yea this looks perfect.

I had a feeling I'd need to get deep into things, and no time like the
present haha.

May ask for more guidance with those inner workings to get a bit of a road
map. But that gets into the feature idea and beyond the scope of this
thread's original question so I'll just do that in a jira ticket in a bit.
Just wanted this so I could structure the ticket and plan of attack better.

Thanks!!

On Tue, Feb 1, 2022, 2:03 PM Seth Wiesman <sj...@gmail.com> wrote:

> Hi Jeff,
>
> I think the class you're looking for is StatusWatermarkValve. Note that
> this is fairly deep into the runtime stack.
>
> Seth
>
> On Tue, Feb 1, 2022 at 2:34 PM Jeff Carter <jp...@gmail.com> wrote:
>
> > Thanks, Till.
> >
> > That definitely helps a bit. I'm still not seeing where there is some
> idle
> > variable that the output.markIdle is setting to true (or whatever it
> sets).
> > Like the ideal thing would be if there is just some "output.isIdle()"
> that
> > could be called to know if the stream is or isnt idle. Since that doesn't
> > exist, what is the variable in "output" that dictates if it is idle or
> not
> > that that I'd just have to make an isIdle() method to make its state
> > visible to other code.
> >
> > I see the checkIfIdle() method in the code (in at least the testing
> piece)
> > you pointed out, but that seems like it's just a way to set a timer and
> > check if the idle state should be set or not. But I dont know if that's
> > setting some isIdle variable or if it's just checked and calculated
> > everytime and that method is basically the variable I'm looking for. But
> > that might just be my confusion.
> >
> > On Tue, Jan 11, 2022, 11:05 AM Till Rohrmann <tr...@apache.org>
> wrote:
> >
> > > Hi Jeff,
> > >
> > > I think this happens in the WatermarksWithIdleness [1].
> > >
> > > [1]
> > >
> > >
> >
> https://github.com/apache/flink/blob/master/flink-core/src/main/java/org/apache/flink/api/common/eventtime/WatermarksWithIdleness.java#L73
> > >
> > > Cheers,
> > > Till
> > >
> > > On Tue, Jan 11, 2022 at 6:05 PM Jeff Carter <jp...@gmail.com>
> > wrote:
> > >
> > > > I'm looking into making a feature for flink related to watermarks and
> > am
> > > > digging into the inner watermark mechanisms, specifically with
> > idleness.
> > > > I'm familiar with idleness, but digging into the root code I can only
> > get
> > > > to where idlenessTimeout gets set in
> > WatermarkStrategyWithIdleness.java.
> > > >
> > > >  But what I'm looking for the pieces beyond that. If I set the
> idleness
> > > to
> > > > 500 milliseconds, where in the code does it actually go "I haven't
> > seen a
> > > > message in 500 milliseconds. I'm setting this stream to idle."?
> > > >
> > > > The reason being that what I'm thinking of would need to be able to
> see
> > > if
> > > > any streams are marked idle, and if so react accordingly.
> > > >
> > > > Thanks for any help in advance.
> > > >
> > >
> >
>

Re: Need help with finding inner workings of watermark stream idleness

Posted by Seth Wiesman <sj...@gmail.com>.
Hi Jeff,

I think the class you're looking for is StatusWatermarkValve. Note that
this is fairly deep into the runtime stack.

Seth

On Tue, Feb 1, 2022 at 2:34 PM Jeff Carter <jp...@gmail.com> wrote:

> Thanks, Till.
>
> That definitely helps a bit. I'm still not seeing where there is some idle
> variable that the output.markIdle is setting to true (or whatever it sets).
> Like the ideal thing would be if there is just some "output.isIdle()" that
> could be called to know if the stream is or isnt idle. Since that doesn't
> exist, what is the variable in "output" that dictates if it is idle or not
> that that I'd just have to make an isIdle() method to make its state
> visible to other code.
>
> I see the checkIfIdle() method in the code (in at least the testing piece)
> you pointed out, but that seems like it's just a way to set a timer and
> check if the idle state should be set or not. But I dont know if that's
> setting some isIdle variable or if it's just checked and calculated
> everytime and that method is basically the variable I'm looking for. But
> that might just be my confusion.
>
> On Tue, Jan 11, 2022, 11:05 AM Till Rohrmann <tr...@apache.org> wrote:
>
> > Hi Jeff,
> >
> > I think this happens in the WatermarksWithIdleness [1].
> >
> > [1]
> >
> >
> https://github.com/apache/flink/blob/master/flink-core/src/main/java/org/apache/flink/api/common/eventtime/WatermarksWithIdleness.java#L73
> >
> > Cheers,
> > Till
> >
> > On Tue, Jan 11, 2022 at 6:05 PM Jeff Carter <jp...@gmail.com>
> wrote:
> >
> > > I'm looking into making a feature for flink related to watermarks and
> am
> > > digging into the inner watermark mechanisms, specifically with
> idleness.
> > > I'm familiar with idleness, but digging into the root code I can only
> get
> > > to where idlenessTimeout gets set in
> WatermarkStrategyWithIdleness.java.
> > >
> > >  But what I'm looking for the pieces beyond that. If I set the idleness
> > to
> > > 500 milliseconds, where in the code does it actually go "I haven't
> seen a
> > > message in 500 milliseconds. I'm setting this stream to idle."?
> > >
> > > The reason being that what I'm thinking of would need to be able to see
> > if
> > > any streams are marked idle, and if so react accordingly.
> > >
> > > Thanks for any help in advance.
> > >
> >
>