You are viewing a plain text version of this content. The canonical link for it is here.
Posted to dev@beam.apache.org by Luke Cwik <lc...@google.com> on 2020/03/02 23:32:32 UTC

Re: Java SplittableDoFn Watermark API

Jan, there are some parts of Apache Beam the watermarks package will likely
rely on (@Experimental annotation, javadoc links) but fundamentally should
not rely on core and someone could create a separate package for this.

Ismael, the unification of bounded/unbounded within SplittableDoFn has
always been a goal. There are a set of features that BoundedSources are
unlikely to use but would still be allowed to use them. For example,
bounded sources may want to have support for checkpointing since I could
foresee an BoundedSource that can notice that a certain resource becomes
unavailable and can only process it later. The choice of which watermark
estimator to use is a likely point of difference between bounded and
unbounded SDFs since bounded SDFs would be able to use a very simple
estimator where the watermark is held at -infinity and only advances
to +infinity once there is no more data to process. But even though
unbounded SDFs are likely to be the most common users of varied watermark
estimators, a bounded SDF may still want to advance the watermark as they
read records so that runners that are more "streaming" (for example micro
batch) could process the entire pipeline in parallel vs other runners that
execute one whole segment of the pipeline at a time.

Currently the watermark that is reported as part of the PollResult is
passed to the ProcessContext.updateWatermark [1, 2] function and instead
that call would be redirected to the ManualWatermarkEstimator.setWatermark
function [3].

1:
https://github.com/apache/beam/blob/a16725593b84b84b37bc67cd202d1ac8b724c6f4/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/Watch.java#L757
2:
https://github.com/apache/beam/blob/a16725593b84b84b37bc67cd202d1ac8b724c6f4/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/DoFn.java#L275
3:
https://github.com/apache/beam/blob/fb42666a4e1aec0413f161c742d8f010ef9fe9f2/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/splittabledofn/ManualWatermarkEstimator.java#L45

On Fri, Feb 28, 2020 at 6:09 AM Ismaël Mejía <ie...@gmail.com> wrote:

> I just realized that the HBaseIO example is not a good one because we can
> already have Watch like behavior as we do for Partition discovery in
> HCatalogIO.
> Still I am interested on your views on bounded/unbounded unification.
>
> Interesting question2: How this will annotations connect with the Watch
> transform Polling patterns?
>
> https://github.com/apache/beam/blob/650e6cd9c707472c34055382a1356cf22d14ee5e/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/Watch.java#L178
>
>
> On Fri, Feb 28, 2020 at 10:47 AM Ismaël Mejía <ie...@gmail.com> wrote:
>
>> Really interesting! Implementing correctly the watermark has been a common
>> struggle for IO authors, to the point that some IOs still have issues
>> around
>> that. So +1 for this, in particular if we can get to reuse common
>> patterns.
>> I was not aware of Boyuan's work around this, really nice.
>>
>> One aspect I have always being confused about since I read the SDF
>> proposal
>> documents is if we could get to have a single API for both Bounded and
>> Unbounded
>> IO by somehow assuming that with a BoundedSDF is an UnboundedSDF special
>> case.
>> Could WatermarkEstimator help in this direction?
>>
>> One quick case that I can think is to make the current HBaseIO SDF to
>> work in an
>> unbounded manner, for example to 'watch and read new tables'.
>>
>>
>> On Thu, Feb 27, 2020 at 11:43 PM Luke Cwik <lc...@google.com> wrote:
>>
>>> See this doc[1] and blog[2] for some context about SplittableDoFns.
>>>
>>> To support watermark reporting within the Java SDK for SplittableDoFns,
>>> we need a way to have SDF authors to report watermark estimates over the
>>> element and restriction pair that they are processing.
>>>
>>> For UnboundedSources, it was found to be a pain point to ask each SDF
>>> author to write their own watermark estimation which typically prevented
>>> re-use. Therefore we would like to have a "library" of watermark estimators
>>> that help SDF authors perform this estimation similar to how there is a
>>> "library" of restrictions and restriction trackers that SDF authors can
>>> use. For SDF authors where the existing library doesn't work, they can add
>>> additional ones that observe timestamps of elements or choose to directly
>>> report the watermark through a "ManualWatermarkEstimator" parameter that
>>> can be supplied to @ProcessElement methods.
>>>
>>> The public facing portion of the DoFn changes adds three new annotations
>>> for new DoFn style methods:
>>> GetInitialWatermarkEstimatorState: Returns the initial watermark state,
>>> similar to GetInitialRestriction
>>> GetWatermarkEstimatorStateCoder: Returns a coder compatible with
>>> watermark state type, similar to GetRestrictionCoder for restrictions
>>> returned by GetInitialRestriction.
>>> NewWatermarkEstimator: Returns a watermark estimator that either the
>>> framework invokes allowing it to observe the timestamps of output records
>>> or a manual watermark estimator that can be explicitly invoked to update
>>> the watermark.
>>>
>>> See [3] for an initial PR with the public facing additions to the core
>>> Java API related to SplittableDoFn.
>>>
>>> This mirrors a bunch of work that was done by Boyuan within the Pyhon
>>> SDK [4, 5] but in the style of new DoFn parameter/method invocation we have
>>> in the Java SDK.
>>>
>>> 1: https://s.apache.org/splittable-do-fn
>>> 2: https://beam.apache.org/blog/2017/08/16/splittable-do-fn.html
>>> 3: https://github.com/apache/beam/pull/10992
>>> 4: https://github.com/apache/beam/pull/9794
>>> 5: https://github.com/apache/beam/pull/10375
>>>
>>

Re: Java SplittableDoFn Watermark API

Posted by Luke Cwik <lc...@google.com>.
Using the interfaces defined in pr/10992[1], I started migrating from
ProcessContext#updateWatermark to the WatermarkEstimators with pr/11126[2].

The PR is very WIP but it does include the necessary changes to the Watch
transform and also the UnboundedSource SDF wrapper to be able to report
watermarks via the WatermarkEstimator. Another interesting point was that
we expect the setWatermark/observeTimestamp methods to be high volume while
curentWatermark/getState to be low volume in comparison so I restructured
the current set of WatermarkEstimators around that and also intent to move
the validation of the watermark values to more appropriate places.

1: https://github.com/apache/beam/pull/10992
2: https://github.com/apache/beam/pull/11126

On Mon, Mar 9, 2020 at 10:18 AM Luke Cwik <lc...@google.com> wrote:

> The current set of watermark estimators in Apache Beam for UnboundedSource
> are:
> SQS - tracks the timestamp of the last unacked message (does not report
> monotonically increasing watermarks and assumes that the system will make
> sure to lower bound what is being reported)
> AMP - tracks timestamp of last message
> GCP Pubsub
> <https://github.com/apache/beam/blob/784d18b7ac89f87dd7fbf2861ee877f5b6070276/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/pubsub/PubsubUnboundedSource.java#L950>:
> uses last minute of data to compute watermark but also handles cases where
> there is no data for long periods of time
> Kafka: Supports a watermark policy function
> <https://github.com/apache/beam/blob/c2f0d282337f3ae0196a7717712396a5a41fdde1/sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/TimestampPolicy.java#L62> that
> tracks one of (processing time, ingestion time, or custom user based
> version that gets to see the individual records)
> Kinesis: Supports a watermark policy function
> <https://github.com/apache/beam/blob/c2f0d282337f3ae0196a7717712396a5a41fdde1/sdks/java/io/kinesis/src/main/java/org/apache/beam/sdk/io/kinesis/WatermarkPolicy.java> that
> tracks one of (processing time, arrival time, or custom user based version
> that gets to see the individual records)
>
> Has anyone written or knows of custom user based watermark policy
> functions that have been implemented for Kafka or Kinesis?
>
> On Mon, Mar 9, 2020 at 9:54 AM Ismaël Mejía <ie...@gmail.com> wrote:
>
>> Thanks for the explanation on Watch + FileIO it is really clear. Extra
>> question
>> related to WatermarkEstimator, is it supposed to be called in pipelines
>> at the
>> same exact moments that getWatermark is today for Unbounded sources?
>>
>
> Yes. DoFn authors will invoke it either manually after they tell us that
> the watermark has advanced or "automatically" via the timestamp observing
> watermark estimators.
>
>
>> (slightly unrelated) There is an open JIRA for an issue related to watch
>> and
>> watermark progress that we should probably investigate. I will add
>> details there
>> and ping you once I have the time to go back to the subject.
>> https://issues.apache.org/jira/browse/BEAM-9134
>
>
> The Watch stuff is built on top of a very experimental version of SDF for
> classic runners. I know that some people have used it effectively though
> with watermark tracking working for them.
>
>
>> Thanks also for the pointers on the UnboudedSource SDF wrapper.
>>
>> > Why do you want to evolve a bounded SDF into an unbounded SDF (is the
>> > restriction truly unbounded)?
>>
>> To prove that the idea that we can use the same API/code for both Bounded
>> and
>> Unbounded sources holds true, but the more I think about it the harder it
>> is to
>> me to get an example that cannot be built with a combination of Unbounded
>> SDF a
>> la Watch + a Bounded SDF. Can you think of any?
>>
>
> No but I'm sure someone will think of something eventually.
>
> > ... Java has been less of a focus since it is the most mature
>> non-portable
>> > implementation but hopefully will move in that direction quickly soon.
>>
>> We MUST move into this direction if we want to sell the idea of removing
>> non
>> portable translations to runner authors. There are few numbers on
>> performance of
>> pipelines using portability + the Java SDK Harness but if we take the
>> ValidatesRunner tests as the only single case where we have numbers
>> (notice that
>> VR uses the EMBEDDED environment, the lowest overhead one for Java runs)
>> the
>> numbers are far from good from looking at the latest execution times in
>> Jenkins:
>>
>> - Flink Runner VR #6900: 6 mins 22s vs Portable VR #4332 + #4343: 32 mins
>>   (5 times slower)
>>
>> - Spark Runner VR #6854: 8 mins 45s vs Portable VR #2322: 27 mins
>>   (3 times slower)
>>
>> Of course we can argue that this use case of short lived pipelines with
>> not big
>> data is not the average Beam use case, but still the numbers don’t look
>> good to
>> sell.
>
>
> I agree.
>
>
>> On Wed, Mar 4, 2020 at 10:52 PM Luke Cwik <lc...@google.com> wrote:
>> >
>> >
>> >
>> > On Wed, Mar 4, 2020 at 7:36 AM Ismaël Mejía <ie...@gmail.com> wrote:
>> >>
>> >> > I think we should move to a world where *all* runners become
>> portable runners.
>> >> > The at doesn't mean they all need to user docker images, or even
>> GRPC, but I
>> >> > don't think having classical-only or classical-excluded features is
>> where we
>> >> > want to be long-term.
>> >>
>> >> Robert I agree 100% with you, I dream of the day where classic runners
>> do not
>> >> exist anymore and we do not have this issues like this one (of not
>> available
>> >> features), however there are still two requirements to abandon them:
>> (1) that
>> >> the performance overhead is not considerable bigger for existing users
>> (in
>> >> particular Java users) and (2) that the portability abstractions are
>> mature. We
>> >> are getting there, but not yet there.
>> >
>> >
>> > Dataflow and its internal counter point (Flume) have had good
>> experience running Go and Python portable pipelines at the same or better
>> performance then the closest non-portable equivalent has been. Java has
>> been less of a focus since it is the most mature non-portable
>> implementation but hopefully will move in that direction quickly soon. This
>> would be a great time for any contributors who are interested in specific
>> runners to help migrate them to portable implementations.
>> >
>> >>
>> >> On Tue, Mar 3, 2020 at 6:57 PM Robert Bradshaw <ro...@google.com>
>> wrote:
>> >> >
>> >> > On Tue, Mar 3, 2020 at 9:11 AM Ismaël Mejía <ie...@gmail.com>
>> wrote:
>> >> > >
>> >> > > > the unification of bounded/unbounded within SplittableDoFn has
>> always been a goal.
>> >> > >
>> >> > > I am glad to know that my intuition is correct and that this was
>> envisioned, the
>> >> > > idea of checkpoints for bounded inputs sounds super really useful.
>> Eager to try
>> >> > > that on practice.
>> >> > >
>> >> > > An explicit example (with a WatermarkEstimator for a bounded case
>> would be
>> >> > > really nice to see, for learning purposes), also with the
>> unification goal what
>> >> > > if we align then the Bounded SDFs to have similar signatures no? I
>> mean the
>> >> > > method that returns a continuation even for the Bounded case.
>> >> > >
>> >> > > > Currently the watermark that is reported as part of the
>> PollResult is passed
>> >> > > > to the ProcessContext.updateWatermark [1, 2] function and
>> instead that call
>> >> > > > would be redirected to the ManualWatermarkEstimator.setWatermark
>> function [3].
>> >> > >
>> >> > > Is there a JIRA for the Watch adjustments so we don't forget to
>> integrate the
>> >> > > WatermarkEstimators in? I am really curious on the implementation
>> to see if I
>> >> > > finally understand the internals of Watch too.
>> >> > >
>> >> > > Extra question: Do you think we can have a naive version of
>> Unbounded SDF like
>> >> > > we have the naive one on classical runners (if I understood
>> correctly the
>> >> > > current one is only for portable runners). I worry about the
>> adoption potential.
>> >> >
>> >> > I think we should move to a world where *all* runners become portable
>> >> > runners. The at doesn't mean they all need to user docker images, or
>> >> > even GRPC, but I don't think having classical-only or
>> >> > classical-excluded features is where we want to be long-term.
>> >> >
>> >> > > On Tue, Mar 3, 2020 at 1:41 AM Robert Bradshaw <
>> robertwb@google.com> wrote:
>> >> > > >
>> >> > > > I don't have a strong preference for using a provider/having a
>> set of
>> >> > > > tightly coupled methods in Java, other than that we be
>> consistent (and
>> >> > > > we already use the methods style for restrictions).
>> >> > > >
>> >> > > > On Mon, Mar 2, 2020 at 3:32 PM Luke Cwik <lc...@google.com>
>> wrote:
>> >> > > > >
>> >> > > > > Jan, there are some parts of Apache Beam the watermarks
>> package will likely rely on (@Experimental annotation, javadoc links) but
>> fundamentally should not rely on core and someone could create a separate
>> package for this.
>> >> > > >
>> >> > > > I think it does make sense for a set of common watermark
>> trackers to
>> >> > > > be shipped with core (e.g. manual, monotonic, and eventually a
>> >> > > > probabilistic one).
>> >> > > >
>> >> > > > > Ismael, the unification of bounded/unbounded within
>> SplittableDoFn has always been a goal. There are a set of features that
>> BoundedSources are unlikely to use but would still be allowed to use them.
>> For example, bounded sources may want to have support for checkpointing
>> since I could foresee an BoundedSource that can notice that a certain
>> resource becomes unavailable and can only process it later. The choice of
>> which watermark estimator to use is a likely point of difference between
>> bounded and unbounded SDFs since bounded SDFs would be able to use a very
>> simple estimator where the watermark is held at -infinity and only advances
>> to +infinity once there is no more data to process. But even though
>> unbounded SDFs are likely to be the most common users of varied watermark
>> estimators, a bounded SDF may still want to advance the watermark as they
>> read records so that runners that are more "streaming" (for example micro
>> batch) could process the entire pipeline in parallel vs other runners that
>> execute one whole segment of the pipeline at a time.
>> >> > > >
>> >> > > > Put another way, the value of watermark trackers is to allow
>> >> > > > processing to continue downstream before the source has completed
>> >> > > > reading. This is of course essential for streaming, but If the
>> source
>> >> > > > is read to completion before downstream stages start (as is the
>> case
>> >> > > > for most batch runners) it is not needed. What this unification
>> does
>> >> > > > allow, however, is a source to be written in such a way that can
>> be
>> >> > > > efficiently used in both batch and streaming mode.
>> >> > > >
>> >> > > > > Currently the watermark that is reported as part of the
>> PollResult is passed to the ProcessContext.updateWatermark [1, 2] function
>> and instead that call would be redirected to the
>> ManualWatermarkEstimator.setWatermark function [3].
>> >> > > > >
>> >> > > > > 1:
>> https://github.com/apache/beam/blob/a16725593b84b84b37bc67cd202d1ac8b724c6f4/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/Watch.java#L757
>> >> > > > > 2:
>> https://github.com/apache/beam/blob/a16725593b84b84b37bc67cd202d1ac8b724c6f4/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/DoFn.java#L275
>> >> > > > > 3:
>> https://github.com/apache/beam/blob/fb42666a4e1aec0413f161c742d8f010ef9fe9f2/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/splittabledofn/ManualWatermarkEstimator.java#L45
>> >> > > > >
>> >> > > > > On Fri, Feb 28, 2020 at 6:09 AM Ismaël Mejía <
>> iemejia@gmail.com> wrote:
>> >> > > > >>
>> >> > > > >> I just realized that the HBaseIO example is not a good one
>> because we can
>> >> > > > >> already have Watch like behavior as we do for Partition
>> discovery in HCatalogIO.
>> >> > > > >> Still I am interested on your views on bounded/unbounded
>> unification.
>> >> > > > >>
>> >> > > > >> Interesting question2: How this will annotations connect with
>> the Watch
>> >> > > > >> transform Polling patterns?
>> >> > > > >>
>> https://github.com/apache/beam/blob/650e6cd9c707472c34055382a1356cf22d14ee5e/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/Watch.java#L178
>> >> > > > >>
>> >> > > > >>
>> >> > > > >> On Fri, Feb 28, 2020 at 10:47 AM Ismaël Mejía <
>> iemejia@gmail.com> wrote:
>> >> > > > >>>
>> >> > > > >>> Really interesting! Implementing correctly the watermark has
>> been a common
>> >> > > > >>> struggle for IO authors, to the point that some IOs still
>> have issues around
>> >> > > > >>> that. So +1 for this, in particular if we can get to reuse
>> common patterns.
>> >> > > > >>> I was not aware of Boyuan's work around this, really nice.
>> >> > > > >>>
>> >> > > > >>> One aspect I have always being confused about since I read
>> the SDF proposal
>> >> > > > >>> documents is if we could get to have a single API for both
>> Bounded and Unbounded
>> >> > > > >>> IO by somehow assuming that with a BoundedSDF is an
>> UnboundedSDF special case.
>> >> > > > >>> Could WatermarkEstimator help in this direction?
>> >> > > > >>>
>> >> > > > >>> One quick case that I can think is to make the current
>> HBaseIO SDF to work in an
>> >> > > > >>> unbounded manner, for example to 'watch and read new tables'.
>> >> > > > >>>
>> >> > > > >>>
>> >> > > > >>> On Thu, Feb 27, 2020 at 11:43 PM Luke Cwik <lc...@google.com>
>> wrote:
>> >> > > > >>>>
>> >> > > > >>>> See this doc[1] and blog[2] for some context about
>> SplittableDoFns.
>> >> > > > >>>>
>> >> > > > >>>> To support watermark reporting within the Java SDK for
>> SplittableDoFns, we need a way to have SDF authors to report watermark
>> estimates over the element and restriction pair that they are processing.
>> >> > > > >>>>
>> >> > > > >>>> For UnboundedSources, it was found to be a pain point to
>> ask each SDF author to write their own watermark estimation which typically
>> prevented re-use. Therefore we would like to have a "library" of watermark
>> estimators that help SDF authors perform this estimation similar to how
>> there is a "library" of restrictions and restriction trackers that SDF
>> authors can use. For SDF authors where the existing library doesn't work,
>> they can add additional ones that observe timestamps of elements or choose
>> to directly report the watermark through a "ManualWatermarkEstimator"
>> parameter that can be supplied to @ProcessElement methods.
>> >> > > > >>>>
>> >> > > > >>>> The public facing portion of the DoFn changes adds three
>> new annotations for new DoFn style methods:
>> >> > > > >>>> GetInitialWatermarkEstimatorState: Returns the initial
>> watermark state, similar to GetInitialRestriction
>> >> > > > >>>> GetWatermarkEstimatorStateCoder: Returns a coder compatible
>> with watermark state type, similar to GetRestrictionCoder for restrictions
>> returned by GetInitialRestriction.
>> >> > > > >>>> NewWatermarkEstimator: Returns a watermark estimator that
>> either the framework invokes allowing it to observe the timestamps of
>> output records or a manual watermark estimator that can be explicitly
>> invoked to update the watermark.
>> >> > > > >>>>
>> >> > > > >>>> See [3] for an initial PR with the public facing additions
>> to the core Java API related to SplittableDoFn.
>> >> > > > >>>>
>> >> > > > >>>> This mirrors a bunch of work that was done by Boyuan within
>> the Pyhon SDK [4, 5] but in the style of new DoFn parameter/method
>> invocation we have in the Java SDK.
>> >> > > > >>>>
>> >> > > > >>>> 1: https://s.apache.org/splittable-do-fn
>> >> > > > >>>> 2:
>> https://beam.apache.org/blog/2017/08/16/splittable-do-fn.html
>> >> > > > >>>> 3: https://github.com/apache/beam/pull/10992
>> >> > > > >>>> 4: https://github.com/apache/beam/pull/9794
>> >> > > > >>>> 5: https://github.com/apache/beam/pull/10375
>>
>

Re: Java SplittableDoFn Watermark API

Posted by Luke Cwik <lc...@google.com>.
The current set of watermark estimators in Apache Beam for UnboundedSource
are:
SQS - tracks the timestamp of the last unacked message (does not report
monotonically increasing watermarks and assumes that the system will make
sure to lower bound what is being reported)
AMP - tracks timestamp of last message
GCP Pubsub
<https://github.com/apache/beam/blob/784d18b7ac89f87dd7fbf2861ee877f5b6070276/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/pubsub/PubsubUnboundedSource.java#L950>:
uses last minute of data to compute watermark but also handles cases where
there is no data for long periods of time
Kafka: Supports a watermark policy function
<https://github.com/apache/beam/blob/c2f0d282337f3ae0196a7717712396a5a41fdde1/sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/TimestampPolicy.java#L62>
that
tracks one of (processing time, ingestion time, or custom user based
version that gets to see the individual records)
Kinesis: Supports a watermark policy function
<https://github.com/apache/beam/blob/c2f0d282337f3ae0196a7717712396a5a41fdde1/sdks/java/io/kinesis/src/main/java/org/apache/beam/sdk/io/kinesis/WatermarkPolicy.java>
that
tracks one of (processing time, arrival time, or custom user based version
that gets to see the individual records)

Has anyone written or knows of custom user based watermark policy functions
that have been implemented for Kafka or Kinesis?

On Mon, Mar 9, 2020 at 9:54 AM Ismaël Mejía <ie...@gmail.com> wrote:

> Thanks for the explanation on Watch + FileIO it is really clear. Extra
> question
> related to WatermarkEstimator, is it supposed to be called in pipelines at
> the
> same exact moments that getWatermark is today for Unbounded sources?
>

Yes. DoFn authors will invoke it either manually after they tell us that
the watermark has advanced or "automatically" via the timestamp observing
watermark estimators.


> (slightly unrelated) There is an open JIRA for an issue related to watch
> and
> watermark progress that we should probably investigate. I will add details
> there
> and ping you once I have the time to go back to the subject.
> https://issues.apache.org/jira/browse/BEAM-9134


The Watch stuff is built on top of a very experimental version of SDF for
classic runners. I know that some people have used it effectively though
with watermark tracking working for them.


> Thanks also for the pointers on the UnboudedSource SDF wrapper.
>
> > Why do you want to evolve a bounded SDF into an unbounded SDF (is the
> > restriction truly unbounded)?
>
> To prove that the idea that we can use the same API/code for both Bounded
> and
> Unbounded sources holds true, but the more I think about it the harder it
> is to
> me to get an example that cannot be built with a combination of Unbounded
> SDF a
> la Watch + a Bounded SDF. Can you think of any?
>

No but I'm sure someone will think of something eventually.

> ... Java has been less of a focus since it is the most mature non-portable
> > implementation but hopefully will move in that direction quickly soon.
>
> We MUST move into this direction if we want to sell the idea of removing
> non
> portable translations to runner authors. There are few numbers on
> performance of
> pipelines using portability + the Java SDK Harness but if we take the
> ValidatesRunner tests as the only single case where we have numbers
> (notice that
> VR uses the EMBEDDED environment, the lowest overhead one for Java runs)
> the
> numbers are far from good from looking at the latest execution times in
> Jenkins:
>
> - Flink Runner VR #6900: 6 mins 22s vs Portable VR #4332 + #4343: 32 mins
>   (5 times slower)
>
> - Spark Runner VR #6854: 8 mins 45s vs Portable VR #2322: 27 mins
>   (3 times slower)
>
> Of course we can argue that this use case of short lived pipelines with
> not big
> data is not the average Beam use case, but still the numbers don’t look
> good to
> sell.


I agree.


> On Wed, Mar 4, 2020 at 10:52 PM Luke Cwik <lc...@google.com> wrote:
> >
> >
> >
> > On Wed, Mar 4, 2020 at 7:36 AM Ismaël Mejía <ie...@gmail.com> wrote:
> >>
> >> > I think we should move to a world where *all* runners become portable
> runners.
> >> > The at doesn't mean they all need to user docker images, or even
> GRPC, but I
> >> > don't think having classical-only or classical-excluded features is
> where we
> >> > want to be long-term.
> >>
> >> Robert I agree 100% with you, I dream of the day where classic runners
> do not
> >> exist anymore and we do not have this issues like this one (of not
> available
> >> features), however there are still two requirements to abandon them:
> (1) that
> >> the performance overhead is not considerable bigger for existing users
> (in
> >> particular Java users) and (2) that the portability abstractions are
> mature. We
> >> are getting there, but not yet there.
> >
> >
> > Dataflow and its internal counter point (Flume) have had good experience
> running Go and Python portable pipelines at the same or better performance
> then the closest non-portable equivalent has been. Java has been less of a
> focus since it is the most mature non-portable implementation but hopefully
> will move in that direction quickly soon. This would be a great time for
> any contributors who are interested in specific runners to help migrate
> them to portable implementations.
> >
> >>
> >> On Tue, Mar 3, 2020 at 6:57 PM Robert Bradshaw <ro...@google.com>
> wrote:
> >> >
> >> > On Tue, Mar 3, 2020 at 9:11 AM Ismaël Mejía <ie...@gmail.com>
> wrote:
> >> > >
> >> > > > the unification of bounded/unbounded within SplittableDoFn has
> always been a goal.
> >> > >
> >> > > I am glad to know that my intuition is correct and that this was
> envisioned, the
> >> > > idea of checkpoints for bounded inputs sounds super really useful.
> Eager to try
> >> > > that on practice.
> >> > >
> >> > > An explicit example (with a WatermarkEstimator for a bounded case
> would be
> >> > > really nice to see, for learning purposes), also with the
> unification goal what
> >> > > if we align then the Bounded SDFs to have similar signatures no? I
> mean the
> >> > > method that returns a continuation even for the Bounded case.
> >> > >
> >> > > > Currently the watermark that is reported as part of the
> PollResult is passed
> >> > > > to the ProcessContext.updateWatermark [1, 2] function and instead
> that call
> >> > > > would be redirected to the ManualWatermarkEstimator.setWatermark
> function [3].
> >> > >
> >> > > Is there a JIRA for the Watch adjustments so we don't forget to
> integrate the
> >> > > WatermarkEstimators in? I am really curious on the implementation
> to see if I
> >> > > finally understand the internals of Watch too.
> >> > >
> >> > > Extra question: Do you think we can have a naive version of
> Unbounded SDF like
> >> > > we have the naive one on classical runners (if I understood
> correctly the
> >> > > current one is only for portable runners). I worry about the
> adoption potential.
> >> >
> >> > I think we should move to a world where *all* runners become portable
> >> > runners. The at doesn't mean they all need to user docker images, or
> >> > even GRPC, but I don't think having classical-only or
> >> > classical-excluded features is where we want to be long-term.
> >> >
> >> > > On Tue, Mar 3, 2020 at 1:41 AM Robert Bradshaw <ro...@google.com>
> wrote:
> >> > > >
> >> > > > I don't have a strong preference for using a provider/having a
> set of
> >> > > > tightly coupled methods in Java, other than that we be consistent
> (and
> >> > > > we already use the methods style for restrictions).
> >> > > >
> >> > > > On Mon, Mar 2, 2020 at 3:32 PM Luke Cwik <lc...@google.com>
> wrote:
> >> > > > >
> >> > > > > Jan, there are some parts of Apache Beam the watermarks package
> will likely rely on (@Experimental annotation, javadoc links) but
> fundamentally should not rely on core and someone could create a separate
> package for this.
> >> > > >
> >> > > > I think it does make sense for a set of common watermark trackers
> to
> >> > > > be shipped with core (e.g. manual, monotonic, and eventually a
> >> > > > probabilistic one).
> >> > > >
> >> > > > > Ismael, the unification of bounded/unbounded within
> SplittableDoFn has always been a goal. There are a set of features that
> BoundedSources are unlikely to use but would still be allowed to use them.
> For example, bounded sources may want to have support for checkpointing
> since I could foresee an BoundedSource that can notice that a certain
> resource becomes unavailable and can only process it later. The choice of
> which watermark estimator to use is a likely point of difference between
> bounded and unbounded SDFs since bounded SDFs would be able to use a very
> simple estimator where the watermark is held at -infinity and only advances
> to +infinity once there is no more data to process. But even though
> unbounded SDFs are likely to be the most common users of varied watermark
> estimators, a bounded SDF may still want to advance the watermark as they
> read records so that runners that are more "streaming" (for example micro
> batch) could process the entire pipeline in parallel vs other runners that
> execute one whole segment of the pipeline at a time.
> >> > > >
> >> > > > Put another way, the value of watermark trackers is to allow
> >> > > > processing to continue downstream before the source has completed
> >> > > > reading. This is of course essential for streaming, but If the
> source
> >> > > > is read to completion before downstream stages start (as is the
> case
> >> > > > for most batch runners) it is not needed. What this unification
> does
> >> > > > allow, however, is a source to be written in such a way that can
> be
> >> > > > efficiently used in both batch and streaming mode.
> >> > > >
> >> > > > > Currently the watermark that is reported as part of the
> PollResult is passed to the ProcessContext.updateWatermark [1, 2] function
> and instead that call would be redirected to the
> ManualWatermarkEstimator.setWatermark function [3].
> >> > > > >
> >> > > > > 1:
> https://github.com/apache/beam/blob/a16725593b84b84b37bc67cd202d1ac8b724c6f4/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/Watch.java#L757
> >> > > > > 2:
> https://github.com/apache/beam/blob/a16725593b84b84b37bc67cd202d1ac8b724c6f4/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/DoFn.java#L275
> >> > > > > 3:
> https://github.com/apache/beam/blob/fb42666a4e1aec0413f161c742d8f010ef9fe9f2/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/splittabledofn/ManualWatermarkEstimator.java#L45
> >> > > > >
> >> > > > > On Fri, Feb 28, 2020 at 6:09 AM Ismaël Mejía <ie...@gmail.com>
> wrote:
> >> > > > >>
> >> > > > >> I just realized that the HBaseIO example is not a good one
> because we can
> >> > > > >> already have Watch like behavior as we do for Partition
> discovery in HCatalogIO.
> >> > > > >> Still I am interested on your views on bounded/unbounded
> unification.
> >> > > > >>
> >> > > > >> Interesting question2: How this will annotations connect with
> the Watch
> >> > > > >> transform Polling patterns?
> >> > > > >>
> https://github.com/apache/beam/blob/650e6cd9c707472c34055382a1356cf22d14ee5e/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/Watch.java#L178
> >> > > > >>
> >> > > > >>
> >> > > > >> On Fri, Feb 28, 2020 at 10:47 AM Ismaël Mejía <
> iemejia@gmail.com> wrote:
> >> > > > >>>
> >> > > > >>> Really interesting! Implementing correctly the watermark has
> been a common
> >> > > > >>> struggle for IO authors, to the point that some IOs still
> have issues around
> >> > > > >>> that. So +1 for this, in particular if we can get to reuse
> common patterns.
> >> > > > >>> I was not aware of Boyuan's work around this, really nice.
> >> > > > >>>
> >> > > > >>> One aspect I have always being confused about since I read
> the SDF proposal
> >> > > > >>> documents is if we could get to have a single API for both
> Bounded and Unbounded
> >> > > > >>> IO by somehow assuming that with a BoundedSDF is an
> UnboundedSDF special case.
> >> > > > >>> Could WatermarkEstimator help in this direction?
> >> > > > >>>
> >> > > > >>> One quick case that I can think is to make the current
> HBaseIO SDF to work in an
> >> > > > >>> unbounded manner, for example to 'watch and read new tables'.
> >> > > > >>>
> >> > > > >>>
> >> > > > >>> On Thu, Feb 27, 2020 at 11:43 PM Luke Cwik <lc...@google.com>
> wrote:
> >> > > > >>>>
> >> > > > >>>> See this doc[1] and blog[2] for some context about
> SplittableDoFns.
> >> > > > >>>>
> >> > > > >>>> To support watermark reporting within the Java SDK for
> SplittableDoFns, we need a way to have SDF authors to report watermark
> estimates over the element and restriction pair that they are processing.
> >> > > > >>>>
> >> > > > >>>> For UnboundedSources, it was found to be a pain point to ask
> each SDF author to write their own watermark estimation which typically
> prevented re-use. Therefore we would like to have a "library" of watermark
> estimators that help SDF authors perform this estimation similar to how
> there is a "library" of restrictions and restriction trackers that SDF
> authors can use. For SDF authors where the existing library doesn't work,
> they can add additional ones that observe timestamps of elements or choose
> to directly report the watermark through a "ManualWatermarkEstimator"
> parameter that can be supplied to @ProcessElement methods.
> >> > > > >>>>
> >> > > > >>>> The public facing portion of the DoFn changes adds three new
> annotations for new DoFn style methods:
> >> > > > >>>> GetInitialWatermarkEstimatorState: Returns the initial
> watermark state, similar to GetInitialRestriction
> >> > > > >>>> GetWatermarkEstimatorStateCoder: Returns a coder compatible
> with watermark state type, similar to GetRestrictionCoder for restrictions
> returned by GetInitialRestriction.
> >> > > > >>>> NewWatermarkEstimator: Returns a watermark estimator that
> either the framework invokes allowing it to observe the timestamps of
> output records or a manual watermark estimator that can be explicitly
> invoked to update the watermark.
> >> > > > >>>>
> >> > > > >>>> See [3] for an initial PR with the public facing additions
> to the core Java API related to SplittableDoFn.
> >> > > > >>>>
> >> > > > >>>> This mirrors a bunch of work that was done by Boyuan within
> the Pyhon SDK [4, 5] but in the style of new DoFn parameter/method
> invocation we have in the Java SDK.
> >> > > > >>>>
> >> > > > >>>> 1: https://s.apache.org/splittable-do-fn
> >> > > > >>>> 2:
> https://beam.apache.org/blog/2017/08/16/splittable-do-fn.html
> >> > > > >>>> 3: https://github.com/apache/beam/pull/10992
> >> > > > >>>> 4: https://github.com/apache/beam/pull/9794
> >> > > > >>>> 5: https://github.com/apache/beam/pull/10375
>

Re: Java SplittableDoFn Watermark API

Posted by Ismaël Mejía <ie...@gmail.com>.
Thanks for the explanation on Watch + FileIO it is really clear. Extra question
related to WatermarkEstimator, is it supposed to be called in pipelines at the
same exact moments that getWatermark is today for Unbounded sources?

(slightly unrelated) There is an open JIRA for an issue related to watch and
watermark progress that we should probably investigate. I will add details there
and ping you once I have the time to go back to the subject.
https://issues.apache.org/jira/browse/BEAM-9134

Thanks also for the pointers on the UnboudedSource SDF wrapper.

> Why do you want to evolve a bounded SDF into an unbounded SDF (is the
> restriction truly unbounded)?

To prove that the idea that we can use the same API/code for both Bounded and
Unbounded sources holds true, but the more I think about it the harder it is to
me to get an example that cannot be built with a combination of Unbounded SDF a
la Watch + a Bounded SDF. Can you think of any?

> ... Java has been less of a focus since it is the most mature non-portable
> implementation but hopefully will move in that direction quickly soon.

We MUST move into this direction if we want to sell the idea of removing non
portable translations to runner authors. There are few numbers on performance of
pipelines using portability + the Java SDK Harness but if we take the
ValidatesRunner tests as the only single case where we have numbers (notice that
VR uses the EMBEDDED environment, the lowest overhead one for Java runs) the
numbers are far from good from looking at the latest execution times in Jenkins:

- Flink Runner VR #6900: 6 mins 22s vs Portable VR #4332 + #4343: 32 mins
  (5 times slower)

- Spark Runner VR #6854: 8 mins 45s vs Portable VR #2322: 27 mins
  (3 times slower)

Of course we can argue that this use case of short lived pipelines with not big
data is not the average Beam use case, but still the numbers don’t look good to
sell.

On Wed, Mar 4, 2020 at 10:52 PM Luke Cwik <lc...@google.com> wrote:
>
>
>
> On Wed, Mar 4, 2020 at 7:36 AM Ismaël Mejía <ie...@gmail.com> wrote:
>>
>> > I think we should move to a world where *all* runners become portable runners.
>> > The at doesn't mean they all need to user docker images, or even GRPC, but I
>> > don't think having classical-only or classical-excluded features is where we
>> > want to be long-term.
>>
>> Robert I agree 100% with you, I dream of the day where classic runners do not
>> exist anymore and we do not have this issues like this one (of not available
>> features), however there are still two requirements to abandon them: (1) that
>> the performance overhead is not considerable bigger for existing users (in
>> particular Java users) and (2) that the portability abstractions are mature. We
>> are getting there, but not yet there.
>
>
> Dataflow and its internal counter point (Flume) have had good experience running Go and Python portable pipelines at the same or better performance then the closest non-portable equivalent has been. Java has been less of a focus since it is the most mature non-portable implementation but hopefully will move in that direction quickly soon. This would be a great time for any contributors who are interested in specific runners to help migrate them to portable implementations.
>
>>
>> On Tue, Mar 3, 2020 at 6:57 PM Robert Bradshaw <ro...@google.com> wrote:
>> >
>> > On Tue, Mar 3, 2020 at 9:11 AM Ismaël Mejía <ie...@gmail.com> wrote:
>> > >
>> > > > the unification of bounded/unbounded within SplittableDoFn has always been a goal.
>> > >
>> > > I am glad to know that my intuition is correct and that this was envisioned, the
>> > > idea of checkpoints for bounded inputs sounds super really useful. Eager to try
>> > > that on practice.
>> > >
>> > > An explicit example (with a WatermarkEstimator for a bounded case would be
>> > > really nice to see, for learning purposes), also with the unification goal what
>> > > if we align then the Bounded SDFs to have similar signatures no? I mean the
>> > > method that returns a continuation even for the Bounded case.
>> > >
>> > > > Currently the watermark that is reported as part of the PollResult is passed
>> > > > to the ProcessContext.updateWatermark [1, 2] function and instead that call
>> > > > would be redirected to the ManualWatermarkEstimator.setWatermark function [3].
>> > >
>> > > Is there a JIRA for the Watch adjustments so we don't forget to integrate the
>> > > WatermarkEstimators in? I am really curious on the implementation to see if I
>> > > finally understand the internals of Watch too.
>> > >
>> > > Extra question: Do you think we can have a naive version of Unbounded SDF like
>> > > we have the naive one on classical runners (if I understood correctly the
>> > > current one is only for portable runners). I worry about the adoption potential.
>> >
>> > I think we should move to a world where *all* runners become portable
>> > runners. The at doesn't mean they all need to user docker images, or
>> > even GRPC, but I don't think having classical-only or
>> > classical-excluded features is where we want to be long-term.
>> >
>> > > On Tue, Mar 3, 2020 at 1:41 AM Robert Bradshaw <ro...@google.com> wrote:
>> > > >
>> > > > I don't have a strong preference for using a provider/having a set of
>> > > > tightly coupled methods in Java, other than that we be consistent (and
>> > > > we already use the methods style for restrictions).
>> > > >
>> > > > On Mon, Mar 2, 2020 at 3:32 PM Luke Cwik <lc...@google.com> wrote:
>> > > > >
>> > > > > Jan, there are some parts of Apache Beam the watermarks package will likely rely on (@Experimental annotation, javadoc links) but fundamentally should not rely on core and someone could create a separate package for this.
>> > > >
>> > > > I think it does make sense for a set of common watermark trackers to
>> > > > be shipped with core (e.g. manual, monotonic, and eventually a
>> > > > probabilistic one).
>> > > >
>> > > > > Ismael, the unification of bounded/unbounded within SplittableDoFn has always been a goal. There are a set of features that BoundedSources are unlikely to use but would still be allowed to use them. For example, bounded sources may want to have support for checkpointing since I could foresee an BoundedSource that can notice that a certain resource becomes unavailable and can only process it later. The choice of which watermark estimator to use is a likely point of difference between bounded and unbounded SDFs since bounded SDFs would be able to use a very simple estimator where the watermark is held at -infinity and only advances to +infinity once there is no more data to process. But even though unbounded SDFs are likely to be the most common users of varied watermark estimators, a bounded SDF may still want to advance the watermark as they read records so that runners that are more "streaming" (for example micro batch) could process the entire pipeline in parallel vs other runners that execute one whole segment of the pipeline at a time.
>> > > >
>> > > > Put another way, the value of watermark trackers is to allow
>> > > > processing to continue downstream before the source has completed
>> > > > reading. This is of course essential for streaming, but If the source
>> > > > is read to completion before downstream stages start (as is the case
>> > > > for most batch runners) it is not needed. What this unification does
>> > > > allow, however, is a source to be written in such a way that can be
>> > > > efficiently used in both batch and streaming mode.
>> > > >
>> > > > > Currently the watermark that is reported as part of the PollResult is passed to the ProcessContext.updateWatermark [1, 2] function and instead that call would be redirected to the ManualWatermarkEstimator.setWatermark function [3].
>> > > > >
>> > > > > 1: https://github.com/apache/beam/blob/a16725593b84b84b37bc67cd202d1ac8b724c6f4/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/Watch.java#L757
>> > > > > 2: https://github.com/apache/beam/blob/a16725593b84b84b37bc67cd202d1ac8b724c6f4/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/DoFn.java#L275
>> > > > > 3: https://github.com/apache/beam/blob/fb42666a4e1aec0413f161c742d8f010ef9fe9f2/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/splittabledofn/ManualWatermarkEstimator.java#L45
>> > > > >
>> > > > > On Fri, Feb 28, 2020 at 6:09 AM Ismaël Mejía <ie...@gmail.com> wrote:
>> > > > >>
>> > > > >> I just realized that the HBaseIO example is not a good one because we can
>> > > > >> already have Watch like behavior as we do for Partition discovery in HCatalogIO.
>> > > > >> Still I am interested on your views on bounded/unbounded unification.
>> > > > >>
>> > > > >> Interesting question2: How this will annotations connect with the Watch
>> > > > >> transform Polling patterns?
>> > > > >> https://github.com/apache/beam/blob/650e6cd9c707472c34055382a1356cf22d14ee5e/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/Watch.java#L178
>> > > > >>
>> > > > >>
>> > > > >> On Fri, Feb 28, 2020 at 10:47 AM Ismaël Mejía <ie...@gmail.com> wrote:
>> > > > >>>
>> > > > >>> Really interesting! Implementing correctly the watermark has been a common
>> > > > >>> struggle for IO authors, to the point that some IOs still have issues around
>> > > > >>> that. So +1 for this, in particular if we can get to reuse common patterns.
>> > > > >>> I was not aware of Boyuan's work around this, really nice.
>> > > > >>>
>> > > > >>> One aspect I have always being confused about since I read the SDF proposal
>> > > > >>> documents is if we could get to have a single API for both Bounded and Unbounded
>> > > > >>> IO by somehow assuming that with a BoundedSDF is an UnboundedSDF special case.
>> > > > >>> Could WatermarkEstimator help in this direction?
>> > > > >>>
>> > > > >>> One quick case that I can think is to make the current HBaseIO SDF to work in an
>> > > > >>> unbounded manner, for example to 'watch and read new tables'.
>> > > > >>>
>> > > > >>>
>> > > > >>> On Thu, Feb 27, 2020 at 11:43 PM Luke Cwik <lc...@google.com> wrote:
>> > > > >>>>
>> > > > >>>> See this doc[1] and blog[2] for some context about SplittableDoFns.
>> > > > >>>>
>> > > > >>>> To support watermark reporting within the Java SDK for SplittableDoFns, we need a way to have SDF authors to report watermark estimates over the element and restriction pair that they are processing.
>> > > > >>>>
>> > > > >>>> For UnboundedSources, it was found to be a pain point to ask each SDF author to write their own watermark estimation which typically prevented re-use. Therefore we would like to have a "library" of watermark estimators that help SDF authors perform this estimation similar to how there is a "library" of restrictions and restriction trackers that SDF authors can use. For SDF authors where the existing library doesn't work, they can add additional ones that observe timestamps of elements or choose to directly report the watermark through a "ManualWatermarkEstimator" parameter that can be supplied to @ProcessElement methods.
>> > > > >>>>
>> > > > >>>> The public facing portion of the DoFn changes adds three new annotations for new DoFn style methods:
>> > > > >>>> GetInitialWatermarkEstimatorState: Returns the initial watermark state, similar to GetInitialRestriction
>> > > > >>>> GetWatermarkEstimatorStateCoder: Returns a coder compatible with watermark state type, similar to GetRestrictionCoder for restrictions returned by GetInitialRestriction.
>> > > > >>>> NewWatermarkEstimator: Returns a watermark estimator that either the framework invokes allowing it to observe the timestamps of output records or a manual watermark estimator that can be explicitly invoked to update the watermark.
>> > > > >>>>
>> > > > >>>> See [3] for an initial PR with the public facing additions to the core Java API related to SplittableDoFn.
>> > > > >>>>
>> > > > >>>> This mirrors a bunch of work that was done by Boyuan within the Pyhon SDK [4, 5] but in the style of new DoFn parameter/method invocation we have in the Java SDK.
>> > > > >>>>
>> > > > >>>> 1: https://s.apache.org/splittable-do-fn
>> > > > >>>> 2: https://beam.apache.org/blog/2017/08/16/splittable-do-fn.html
>> > > > >>>> 3: https://github.com/apache/beam/pull/10992
>> > > > >>>> 4: https://github.com/apache/beam/pull/9794
>> > > > >>>> 5: https://github.com/apache/beam/pull/10375

Re: Java SplittableDoFn Watermark API

Posted by Luke Cwik <lc...@google.com>.
On Wed, Mar 4, 2020 at 7:36 AM Ismaël Mejía <ie...@gmail.com> wrote:

> > I think we should move to a world where *all* runners become portable
> runners.
> > The at doesn't mean they all need to user docker images, or even GRPC,
> but I
> > don't think having classical-only or classical-excluded features is
> where we
> > want to be long-term.
>
> Robert I agree 100% with you, I dream of the day where classic runners do
> not
> exist anymore and we do not have this issues like this one (of not
> available
> features), however there are still two requirements to abandon them: (1)
> that
> the performance overhead is not considerable bigger for existing users (in
> particular Java users) and (2) that the portability abstractions are
> mature. We
> are getting there, but not yet there.
>

Dataflow and its internal counter point (Flume) have had good experience
running Go and Python portable pipelines at the same or better performance
then the closest non-portable equivalent has been. Java has been less of a
focus since it is the most mature non-portable implementation but hopefully
will move in that direction quickly soon. This would be a great time for
any contributors who are interested in specific runners to help migrate
them to portable implementations.


> On Tue, Mar 3, 2020 at 6:57 PM Robert Bradshaw <ro...@google.com>
> wrote:
> >
> > On Tue, Mar 3, 2020 at 9:11 AM Ismaël Mejía <ie...@gmail.com> wrote:
> > >
> > > > the unification of bounded/unbounded within SplittableDoFn has
> always been a goal.
> > >
> > > I am glad to know that my intuition is correct and that this was
> envisioned, the
> > > idea of checkpoints for bounded inputs sounds super really useful.
> Eager to try
> > > that on practice.
> > >
> > > An explicit example (with a WatermarkEstimator for a bounded case
> would be
> > > really nice to see, for learning purposes), also with the unification
> goal what
> > > if we align then the Bounded SDFs to have similar signatures no? I
> mean the
> > > method that returns a continuation even for the Bounded case.
> > >
> > > > Currently the watermark that is reported as part of the PollResult
> is passed
> > > > to the ProcessContext.updateWatermark [1, 2] function and instead
> that call
> > > > would be redirected to the ManualWatermarkEstimator.setWatermark
> function [3].
> > >
> > > Is there a JIRA for the Watch adjustments so we don't forget to
> integrate the
> > > WatermarkEstimators in? I am really curious on the implementation to
> see if I
> > > finally understand the internals of Watch too.
> > >
> > > Extra question: Do you think we can have a naive version of Unbounded
> SDF like
> > > we have the naive one on classical runners (if I understood correctly
> the
> > > current one is only for portable runners). I worry about the adoption
> potential.
> >
> > I think we should move to a world where *all* runners become portable
> > runners. The at doesn't mean they all need to user docker images, or
> > even GRPC, but I don't think having classical-only or
> > classical-excluded features is where we want to be long-term.
> >
> > > On Tue, Mar 3, 2020 at 1:41 AM Robert Bradshaw <ro...@google.com>
> wrote:
> > > >
> > > > I don't have a strong preference for using a provider/having a set of
> > > > tightly coupled methods in Java, other than that we be consistent
> (and
> > > > we already use the methods style for restrictions).
> > > >
> > > > On Mon, Mar 2, 2020 at 3:32 PM Luke Cwik <lc...@google.com> wrote:
> > > > >
> > > > > Jan, there are some parts of Apache Beam the watermarks package
> will likely rely on (@Experimental annotation, javadoc links) but
> fundamentally should not rely on core and someone could create a separate
> package for this.
> > > >
> > > > I think it does make sense for a set of common watermark trackers to
> > > > be shipped with core (e.g. manual, monotonic, and eventually a
> > > > probabilistic one).
> > > >
> > > > > Ismael, the unification of bounded/unbounded within SplittableDoFn
> has always been a goal. There are a set of features that BoundedSources are
> unlikely to use but would still be allowed to use them. For example,
> bounded sources may want to have support for checkpointing since I could
> foresee an BoundedSource that can notice that a certain resource becomes
> unavailable and can only process it later. The choice of which watermark
> estimator to use is a likely point of difference between bounded and
> unbounded SDFs since bounded SDFs would be able to use a very simple
> estimator where the watermark is held at -infinity and only advances to
> +infinity once there is no more data to process. But even though unbounded
> SDFs are likely to be the most common users of varied watermark estimators,
> a bounded SDF may still want to advance the watermark as they read records
> so that runners that are more "streaming" (for example micro batch) could
> process the entire pipeline in parallel vs other runners that execute one
> whole segment of the pipeline at a time.
> > > >
> > > > Put another way, the value of watermark trackers is to allow
> > > > processing to continue downstream before the source has completed
> > > > reading. This is of course essential for streaming, but If the source
> > > > is read to completion before downstream stages start (as is the case
> > > > for most batch runners) it is not needed. What this unification does
> > > > allow, however, is a source to be written in such a way that can be
> > > > efficiently used in both batch and streaming mode.
> > > >
> > > > > Currently the watermark that is reported as part of the PollResult
> is passed to the ProcessContext.updateWatermark [1, 2] function and instead
> that call would be redirected to the ManualWatermarkEstimator.setWatermark
> function [3].
> > > > >
> > > > > 1:
> https://github.com/apache/beam/blob/a16725593b84b84b37bc67cd202d1ac8b724c6f4/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/Watch.java#L757
> > > > > 2:
> https://github.com/apache/beam/blob/a16725593b84b84b37bc67cd202d1ac8b724c6f4/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/DoFn.java#L275
> > > > > 3:
> https://github.com/apache/beam/blob/fb42666a4e1aec0413f161c742d8f010ef9fe9f2/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/splittabledofn/ManualWatermarkEstimator.java#L45
> > > > >
> > > > > On Fri, Feb 28, 2020 at 6:09 AM Ismaël Mejía <ie...@gmail.com>
> wrote:
> > > > >>
> > > > >> I just realized that the HBaseIO example is not a good one
> because we can
> > > > >> already have Watch like behavior as we do for Partition discovery
> in HCatalogIO.
> > > > >> Still I am interested on your views on bounded/unbounded
> unification.
> > > > >>
> > > > >> Interesting question2: How this will annotations connect with the
> Watch
> > > > >> transform Polling patterns?
> > > > >>
> https://github.com/apache/beam/blob/650e6cd9c707472c34055382a1356cf22d14ee5e/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/Watch.java#L178
> > > > >>
> > > > >>
> > > > >> On Fri, Feb 28, 2020 at 10:47 AM Ismaël Mejía <ie...@gmail.com>
> wrote:
> > > > >>>
> > > > >>> Really interesting! Implementing correctly the watermark has
> been a common
> > > > >>> struggle for IO authors, to the point that some IOs still have
> issues around
> > > > >>> that. So +1 for this, in particular if we can get to reuse
> common patterns.
> > > > >>> I was not aware of Boyuan's work around this, really nice.
> > > > >>>
> > > > >>> One aspect I have always being confused about since I read the
> SDF proposal
> > > > >>> documents is if we could get to have a single API for both
> Bounded and Unbounded
> > > > >>> IO by somehow assuming that with a BoundedSDF is an UnboundedSDF
> special case.
> > > > >>> Could WatermarkEstimator help in this direction?
> > > > >>>
> > > > >>> One quick case that I can think is to make the current HBaseIO
> SDF to work in an
> > > > >>> unbounded manner, for example to 'watch and read new tables'.
> > > > >>>
> > > > >>>
> > > > >>> On Thu, Feb 27, 2020 at 11:43 PM Luke Cwik <lc...@google.com>
> wrote:
> > > > >>>>
> > > > >>>> See this doc[1] and blog[2] for some context about
> SplittableDoFns.
> > > > >>>>
> > > > >>>> To support watermark reporting within the Java SDK for
> SplittableDoFns, we need a way to have SDF authors to report watermark
> estimates over the element and restriction pair that they are processing.
> > > > >>>>
> > > > >>>> For UnboundedSources, it was found to be a pain point to ask
> each SDF author to write their own watermark estimation which typically
> prevented re-use. Therefore we would like to have a "library" of watermark
> estimators that help SDF authors perform this estimation similar to how
> there is a "library" of restrictions and restriction trackers that SDF
> authors can use. For SDF authors where the existing library doesn't work,
> they can add additional ones that observe timestamps of elements or choose
> to directly report the watermark through a "ManualWatermarkEstimator"
> parameter that can be supplied to @ProcessElement methods.
> > > > >>>>
> > > > >>>> The public facing portion of the DoFn changes adds three new
> annotations for new DoFn style methods:
> > > > >>>> GetInitialWatermarkEstimatorState: Returns the initial
> watermark state, similar to GetInitialRestriction
> > > > >>>> GetWatermarkEstimatorStateCoder: Returns a coder compatible
> with watermark state type, similar to GetRestrictionCoder for restrictions
> returned by GetInitialRestriction.
> > > > >>>> NewWatermarkEstimator: Returns a watermark estimator that
> either the framework invokes allowing it to observe the timestamps of
> output records or a manual watermark estimator that can be explicitly
> invoked to update the watermark.
> > > > >>>>
> > > > >>>> See [3] for an initial PR with the public facing additions to
> the core Java API related to SplittableDoFn.
> > > > >>>>
> > > > >>>> This mirrors a bunch of work that was done by Boyuan within the
> Pyhon SDK [4, 5] but in the style of new DoFn parameter/method invocation
> we have in the Java SDK.
> > > > >>>>
> > > > >>>> 1: https://s.apache.org/splittable-do-fn
> > > > >>>> 2:
> https://beam.apache.org/blog/2017/08/16/splittable-do-fn.html
> > > > >>>> 3: https://github.com/apache/beam/pull/10992
> > > > >>>> 4: https://github.com/apache/beam/pull/9794
> > > > >>>> 5: https://github.com/apache/beam/pull/10375
>

Re: Java SplittableDoFn Watermark API

Posted by Ismaël Mejía <ie...@gmail.com>.
> I think we should move to a world where *all* runners become portable runners.
> The at doesn't mean they all need to user docker images, or even GRPC, but I
> don't think having classical-only or classical-excluded features is where we
> want to be long-term.

Robert I agree 100% with you, I dream of the day where classic runners do not
exist anymore and we do not have this issues like this one (of not available
features), however there are still two requirements to abandon them: (1) that
the performance overhead is not considerable bigger for existing users (in
particular Java users) and (2) that the portability abstractions are mature. We
are getting there, but not yet there.

On Tue, Mar 3, 2020 at 6:57 PM Robert Bradshaw <ro...@google.com> wrote:
>
> On Tue, Mar 3, 2020 at 9:11 AM Ismaël Mejía <ie...@gmail.com> wrote:
> >
> > > the unification of bounded/unbounded within SplittableDoFn has always been a goal.
> >
> > I am glad to know that my intuition is correct and that this was envisioned, the
> > idea of checkpoints for bounded inputs sounds super really useful. Eager to try
> > that on practice.
> >
> > An explicit example (with a WatermarkEstimator for a bounded case would be
> > really nice to see, for learning purposes), also with the unification goal what
> > if we align then the Bounded SDFs to have similar signatures no? I mean the
> > method that returns a continuation even for the Bounded case.
> >
> > > Currently the watermark that is reported as part of the PollResult is passed
> > > to the ProcessContext.updateWatermark [1, 2] function and instead that call
> > > would be redirected to the ManualWatermarkEstimator.setWatermark function [3].
> >
> > Is there a JIRA for the Watch adjustments so we don't forget to integrate the
> > WatermarkEstimators in? I am really curious on the implementation to see if I
> > finally understand the internals of Watch too.
> >
> > Extra question: Do you think we can have a naive version of Unbounded SDF like
> > we have the naive one on classical runners (if I understood correctly the
> > current one is only for portable runners). I worry about the adoption potential.
>
> I think we should move to a world where *all* runners become portable
> runners. The at doesn't mean they all need to user docker images, or
> even GRPC, but I don't think having classical-only or
> classical-excluded features is where we want to be long-term.
>
> > On Tue, Mar 3, 2020 at 1:41 AM Robert Bradshaw <ro...@google.com> wrote:
> > >
> > > I don't have a strong preference for using a provider/having a set of
> > > tightly coupled methods in Java, other than that we be consistent (and
> > > we already use the methods style for restrictions).
> > >
> > > On Mon, Mar 2, 2020 at 3:32 PM Luke Cwik <lc...@google.com> wrote:
> > > >
> > > > Jan, there are some parts of Apache Beam the watermarks package will likely rely on (@Experimental annotation, javadoc links) but fundamentally should not rely on core and someone could create a separate package for this.
> > >
> > > I think it does make sense for a set of common watermark trackers to
> > > be shipped with core (e.g. manual, monotonic, and eventually a
> > > probabilistic one).
> > >
> > > > Ismael, the unification of bounded/unbounded within SplittableDoFn has always been a goal. There are a set of features that BoundedSources are unlikely to use but would still be allowed to use them. For example, bounded sources may want to have support for checkpointing since I could foresee an BoundedSource that can notice that a certain resource becomes unavailable and can only process it later. The choice of which watermark estimator to use is a likely point of difference between bounded and unbounded SDFs since bounded SDFs would be able to use a very simple estimator where the watermark is held at -infinity and only advances to +infinity once there is no more data to process. But even though unbounded SDFs are likely to be the most common users of varied watermark estimators, a bounded SDF may still want to advance the watermark as they read records so that runners that are more "streaming" (for example micro batch) could process the entire pipeline in parallel vs other runners that execute one whole segment of the pipeline at a time.
> > >
> > > Put another way, the value of watermark trackers is to allow
> > > processing to continue downstream before the source has completed
> > > reading. This is of course essential for streaming, but If the source
> > > is read to completion before downstream stages start (as is the case
> > > for most batch runners) it is not needed. What this unification does
> > > allow, however, is a source to be written in such a way that can be
> > > efficiently used in both batch and streaming mode.
> > >
> > > > Currently the watermark that is reported as part of the PollResult is passed to the ProcessContext.updateWatermark [1, 2] function and instead that call would be redirected to the ManualWatermarkEstimator.setWatermark function [3].
> > > >
> > > > 1: https://github.com/apache/beam/blob/a16725593b84b84b37bc67cd202d1ac8b724c6f4/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/Watch.java#L757
> > > > 2: https://github.com/apache/beam/blob/a16725593b84b84b37bc67cd202d1ac8b724c6f4/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/DoFn.java#L275
> > > > 3: https://github.com/apache/beam/blob/fb42666a4e1aec0413f161c742d8f010ef9fe9f2/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/splittabledofn/ManualWatermarkEstimator.java#L45
> > > >
> > > > On Fri, Feb 28, 2020 at 6:09 AM Ismaël Mejía <ie...@gmail.com> wrote:
> > > >>
> > > >> I just realized that the HBaseIO example is not a good one because we can
> > > >> already have Watch like behavior as we do for Partition discovery in HCatalogIO.
> > > >> Still I am interested on your views on bounded/unbounded unification.
> > > >>
> > > >> Interesting question2: How this will annotations connect with the Watch
> > > >> transform Polling patterns?
> > > >> https://github.com/apache/beam/blob/650e6cd9c707472c34055382a1356cf22d14ee5e/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/Watch.java#L178
> > > >>
> > > >>
> > > >> On Fri, Feb 28, 2020 at 10:47 AM Ismaël Mejía <ie...@gmail.com> wrote:
> > > >>>
> > > >>> Really interesting! Implementing correctly the watermark has been a common
> > > >>> struggle for IO authors, to the point that some IOs still have issues around
> > > >>> that. So +1 for this, in particular if we can get to reuse common patterns.
> > > >>> I was not aware of Boyuan's work around this, really nice.
> > > >>>
> > > >>> One aspect I have always being confused about since I read the SDF proposal
> > > >>> documents is if we could get to have a single API for both Bounded and Unbounded
> > > >>> IO by somehow assuming that with a BoundedSDF is an UnboundedSDF special case.
> > > >>> Could WatermarkEstimator help in this direction?
> > > >>>
> > > >>> One quick case that I can think is to make the current HBaseIO SDF to work in an
> > > >>> unbounded manner, for example to 'watch and read new tables'.
> > > >>>
> > > >>>
> > > >>> On Thu, Feb 27, 2020 at 11:43 PM Luke Cwik <lc...@google.com> wrote:
> > > >>>>
> > > >>>> See this doc[1] and blog[2] for some context about SplittableDoFns.
> > > >>>>
> > > >>>> To support watermark reporting within the Java SDK for SplittableDoFns, we need a way to have SDF authors to report watermark estimates over the element and restriction pair that they are processing.
> > > >>>>
> > > >>>> For UnboundedSources, it was found to be a pain point to ask each SDF author to write their own watermark estimation which typically prevented re-use. Therefore we would like to have a "library" of watermark estimators that help SDF authors perform this estimation similar to how there is a "library" of restrictions and restriction trackers that SDF authors can use. For SDF authors where the existing library doesn't work, they can add additional ones that observe timestamps of elements or choose to directly report the watermark through a "ManualWatermarkEstimator" parameter that can be supplied to @ProcessElement methods.
> > > >>>>
> > > >>>> The public facing portion of the DoFn changes adds three new annotations for new DoFn style methods:
> > > >>>> GetInitialWatermarkEstimatorState: Returns the initial watermark state, similar to GetInitialRestriction
> > > >>>> GetWatermarkEstimatorStateCoder: Returns a coder compatible with watermark state type, similar to GetRestrictionCoder for restrictions returned by GetInitialRestriction.
> > > >>>> NewWatermarkEstimator: Returns a watermark estimator that either the framework invokes allowing it to observe the timestamps of output records or a manual watermark estimator that can be explicitly invoked to update the watermark.
> > > >>>>
> > > >>>> See [3] for an initial PR with the public facing additions to the core Java API related to SplittableDoFn.
> > > >>>>
> > > >>>> This mirrors a bunch of work that was done by Boyuan within the Pyhon SDK [4, 5] but in the style of new DoFn parameter/method invocation we have in the Java SDK.
> > > >>>>
> > > >>>> 1: https://s.apache.org/splittable-do-fn
> > > >>>> 2: https://beam.apache.org/blog/2017/08/16/splittable-do-fn.html
> > > >>>> 3: https://github.com/apache/beam/pull/10992
> > > >>>> 4: https://github.com/apache/beam/pull/9794
> > > >>>> 5: https://github.com/apache/beam/pull/10375

Re: Java SplittableDoFn Watermark API

Posted by Robert Bradshaw <ro...@google.com>.
On Tue, Mar 3, 2020 at 9:11 AM Ismaël Mejía <ie...@gmail.com> wrote:
>
> > the unification of bounded/unbounded within SplittableDoFn has always been a goal.
>
> I am glad to know that my intuition is correct and that this was envisioned, the
> idea of checkpoints for bounded inputs sounds super really useful. Eager to try
> that on practice.
>
> An explicit example (with a WatermarkEstimator for a bounded case would be
> really nice to see, for learning purposes), also with the unification goal what
> if we align then the Bounded SDFs to have similar signatures no? I mean the
> method that returns a continuation even for the Bounded case.
>
> > Currently the watermark that is reported as part of the PollResult is passed
> > to the ProcessContext.updateWatermark [1, 2] function and instead that call
> > would be redirected to the ManualWatermarkEstimator.setWatermark function [3].
>
> Is there a JIRA for the Watch adjustments so we don't forget to integrate the
> WatermarkEstimators in? I am really curious on the implementation to see if I
> finally understand the internals of Watch too.
>
> Extra question: Do you think we can have a naive version of Unbounded SDF like
> we have the naive one on classical runners (if I understood correctly the
> current one is only for portable runners). I worry about the adoption potential.

I think we should move to a world where *all* runners become portable
runners. The at doesn't mean they all need to user docker images, or
even GRPC, but I don't think having classical-only or
classical-excluded features is where we want to be long-term.

> On Tue, Mar 3, 2020 at 1:41 AM Robert Bradshaw <ro...@google.com> wrote:
> >
> > I don't have a strong preference for using a provider/having a set of
> > tightly coupled methods in Java, other than that we be consistent (and
> > we already use the methods style for restrictions).
> >
> > On Mon, Mar 2, 2020 at 3:32 PM Luke Cwik <lc...@google.com> wrote:
> > >
> > > Jan, there are some parts of Apache Beam the watermarks package will likely rely on (@Experimental annotation, javadoc links) but fundamentally should not rely on core and someone could create a separate package for this.
> >
> > I think it does make sense for a set of common watermark trackers to
> > be shipped with core (e.g. manual, monotonic, and eventually a
> > probabilistic one).
> >
> > > Ismael, the unification of bounded/unbounded within SplittableDoFn has always been a goal. There are a set of features that BoundedSources are unlikely to use but would still be allowed to use them. For example, bounded sources may want to have support for checkpointing since I could foresee an BoundedSource that can notice that a certain resource becomes unavailable and can only process it later. The choice of which watermark estimator to use is a likely point of difference between bounded and unbounded SDFs since bounded SDFs would be able to use a very simple estimator where the watermark is held at -infinity and only advances to +infinity once there is no more data to process. But even though unbounded SDFs are likely to be the most common users of varied watermark estimators, a bounded SDF may still want to advance the watermark as they read records so that runners that are more "streaming" (for example micro batch) could process the entire pipeline in parallel vs other runners that execute one whole segment of the pipeline at a time.
> >
> > Put another way, the value of watermark trackers is to allow
> > processing to continue downstream before the source has completed
> > reading. This is of course essential for streaming, but If the source
> > is read to completion before downstream stages start (as is the case
> > for most batch runners) it is not needed. What this unification does
> > allow, however, is a source to be written in such a way that can be
> > efficiently used in both batch and streaming mode.
> >
> > > Currently the watermark that is reported as part of the PollResult is passed to the ProcessContext.updateWatermark [1, 2] function and instead that call would be redirected to the ManualWatermarkEstimator.setWatermark function [3].
> > >
> > > 1: https://github.com/apache/beam/blob/a16725593b84b84b37bc67cd202d1ac8b724c6f4/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/Watch.java#L757
> > > 2: https://github.com/apache/beam/blob/a16725593b84b84b37bc67cd202d1ac8b724c6f4/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/DoFn.java#L275
> > > 3: https://github.com/apache/beam/blob/fb42666a4e1aec0413f161c742d8f010ef9fe9f2/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/splittabledofn/ManualWatermarkEstimator.java#L45
> > >
> > > On Fri, Feb 28, 2020 at 6:09 AM Ismaël Mejía <ie...@gmail.com> wrote:
> > >>
> > >> I just realized that the HBaseIO example is not a good one because we can
> > >> already have Watch like behavior as we do for Partition discovery in HCatalogIO.
> > >> Still I am interested on your views on bounded/unbounded unification.
> > >>
> > >> Interesting question2: How this will annotations connect with the Watch
> > >> transform Polling patterns?
> > >> https://github.com/apache/beam/blob/650e6cd9c707472c34055382a1356cf22d14ee5e/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/Watch.java#L178
> > >>
> > >>
> > >> On Fri, Feb 28, 2020 at 10:47 AM Ismaël Mejía <ie...@gmail.com> wrote:
> > >>>
> > >>> Really interesting! Implementing correctly the watermark has been a common
> > >>> struggle for IO authors, to the point that some IOs still have issues around
> > >>> that. So +1 for this, in particular if we can get to reuse common patterns.
> > >>> I was not aware of Boyuan's work around this, really nice.
> > >>>
> > >>> One aspect I have always being confused about since I read the SDF proposal
> > >>> documents is if we could get to have a single API for both Bounded and Unbounded
> > >>> IO by somehow assuming that with a BoundedSDF is an UnboundedSDF special case.
> > >>> Could WatermarkEstimator help in this direction?
> > >>>
> > >>> One quick case that I can think is to make the current HBaseIO SDF to work in an
> > >>> unbounded manner, for example to 'watch and read new tables'.
> > >>>
> > >>>
> > >>> On Thu, Feb 27, 2020 at 11:43 PM Luke Cwik <lc...@google.com> wrote:
> > >>>>
> > >>>> See this doc[1] and blog[2] for some context about SplittableDoFns.
> > >>>>
> > >>>> To support watermark reporting within the Java SDK for SplittableDoFns, we need a way to have SDF authors to report watermark estimates over the element and restriction pair that they are processing.
> > >>>>
> > >>>> For UnboundedSources, it was found to be a pain point to ask each SDF author to write their own watermark estimation which typically prevented re-use. Therefore we would like to have a "library" of watermark estimators that help SDF authors perform this estimation similar to how there is a "library" of restrictions and restriction trackers that SDF authors can use. For SDF authors where the existing library doesn't work, they can add additional ones that observe timestamps of elements or choose to directly report the watermark through a "ManualWatermarkEstimator" parameter that can be supplied to @ProcessElement methods.
> > >>>>
> > >>>> The public facing portion of the DoFn changes adds three new annotations for new DoFn style methods:
> > >>>> GetInitialWatermarkEstimatorState: Returns the initial watermark state, similar to GetInitialRestriction
> > >>>> GetWatermarkEstimatorStateCoder: Returns a coder compatible with watermark state type, similar to GetRestrictionCoder for restrictions returned by GetInitialRestriction.
> > >>>> NewWatermarkEstimator: Returns a watermark estimator that either the framework invokes allowing it to observe the timestamps of output records or a manual watermark estimator that can be explicitly invoked to update the watermark.
> > >>>>
> > >>>> See [3] for an initial PR with the public facing additions to the core Java API related to SplittableDoFn.
> > >>>>
> > >>>> This mirrors a bunch of work that was done by Boyuan within the Pyhon SDK [4, 5] but in the style of new DoFn parameter/method invocation we have in the Java SDK.
> > >>>>
> > >>>> 1: https://s.apache.org/splittable-do-fn
> > >>>> 2: https://beam.apache.org/blog/2017/08/16/splittable-do-fn.html
> > >>>> 3: https://github.com/apache/beam/pull/10992
> > >>>> 4: https://github.com/apache/beam/pull/9794
> > >>>> 5: https://github.com/apache/beam/pull/10375

Re: Java SplittableDoFn Watermark API

Posted by Luke Cwik <lc...@google.com>.
On Wed, Mar 4, 2020 at 7:37 AM Ismaël Mejía <ie...@gmail.com> wrote:

> > Bounded SDFs are allowed to have a method signature which has void as the
> > return type OR a ProcessContinuation. Unbounded SDFs must use a
> > ProcessContinuation as the return type.  The "void" return case improves
> ease
> > of use since it is likely to be the common case for bounded SDFs.
>
> Luke, thanks for the answer, so I can assume that in the processElement of
> an
> existing Bounded SDF like HBaseReadSplittableDoFn I can return also
> ProcessContinuation? If so maybe it is worth to make it explicit (at least
> in
> some example) instead of the 'ease of use' approach.  Maybe to clarify my
> point
> (and to connect with the WatermarkEstimator concept) what I am trying to
> understand is how to evolve an existing Bounded SDF into an Unbounded one?
> Is
> this just that signature + assigning a different WatermarkEstimator. Again
> I
> think an example is worth (a thousand words).
>

Why do you want to evolve a bounded SDF into an unbounded SDF (is the
restriction truly unbounded)?

For example, FileIO should be a bounded SDF while Watch should be
unbounded. The combination of the two (Watch followed by FileIO) leads to
having an unbounded pipeline. How does the watermark integrate in this case?
If FileIO doesn't report a watermark, then the records will all be produced
at the timestamp of the "file descriptor" that Watch produced and FileIO
will effectively track the watermark of Watch. However, let's say Watch was
scanning a directory for hourly log files then FileIO could hold back the
watermark based upon the data within those files. For example:
Watch starts scanning files in the range [1pm, 2pm) and reports the
watermark of [1pm]
Each FileIO file descriptor starts reading those file descriptors and is
reporting a watermark that is monotonically increasing in the range of
[1pm, 2pm)
Even though all known fileIO instances are reporting watermarks that could
be >1pm, the system holds the downstream FileIO watermark back at 1pm since
the Watch could still produce a new file descriptor that could produce
records at 1pm that aren't late.
Let's say that Watch says its finished scanning the [1pm, 2pm) range and
now starts scanning the [2pm, 3pm) range and advances the watermark to 2pm
and FileIO has now started processing all known inputs that were produced
at the 1pm watermark.
This allows the Watermark downstream of FileIO to advance to the minimum
watermark of all FileIO "file descriptor" reported watermarks upto 2pm
since that is the upper bound that the upstream Watch transform still
imposes.
Note that in this example, it is important that Watch advance the watermark
to 2pm as soon as it thinks/knows there will be no more files. If Watch
advances too early and some file appears then that "file descriptor" will
be considered late and could be dropped or it might be read, it all depends
on the windowing strategy the pipeline author has chosen.


> > There is SplittableParDoViaKeyedWorkItems[1] that can be used by
> classical
> > runners but it has limitations.
>
> Can you please elaborate on the limitations? Will we have a default
> translation
> override for Unbounded SDF? The existing override for Bounded SDF brought
> support for almost every runner (even if in a naive way) and unlock IO
> authors
> to develop IOs that could be tested easily, it looks like worth to have a
> default override for the Unbounded case.
>

pr/10897[1] added an UnboundedSource -> SDF wrapper but is still lacking
deduplication support (I have some preliminary work in a branch[2]) and
progress reporting. (Dynamic work rebalancing for unbounded sources is not
supported in any Runner)

SplittableParDoViaKeyedWorkItems doesn't support bundle finalization which
is necessary to support the finalizeCheckpoint callback from
CheckpointMark[3] in UnboundedSource and since
SplittableParDoViaKeyedWorkItems is built on top of DoFns/PTransform APIs,
it would require adding support for the BundleFinalizer[4] to all classic
runners. I would greatly appreciate the help in getting BundleFinalizer
working on classic runners which I believe would unblock naive support for
unbounded SDFs on classic runners.

1: https://github.com/apache/beam/pull/10897
2: https://github.com/lukecwik/incubator-beam/tree/splittabledofn3
3:
https://github.com/apache/beam/blob/ded686a58ad4747e91a26d3e59f61019b641e655/sdks/java/core/src/main/java/org/apache/beam/sdk/io/UnboundedSource.java#L130
4:
https://github.com/apache/beam/blob/ded686a58ad4747e91a26d3e59f61019b641e655/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/DoFn.java#L1174


> On Tue, Mar 3, 2020 at 7:03 PM Luke Cwik <lc...@google.com> wrote:
> >
> >
> >
> > On Tue, Mar 3, 2020 at 9:11 AM Ismaël Mejía <ie...@gmail.com> wrote:
> >>
> >> > the unification of bounded/unbounded within SplittableDoFn has always
> been a goal.
> >>
> >> I am glad to know that my intuition is correct and that this was
> envisioned, the
> >> idea of checkpoints for bounded inputs sounds super really useful.
> Eager to try
> >> that on practice.
> >>
> >> An explicit example (with a WatermarkEstimator for a bounded case would
> be
> >> really nice to see, for learning purposes), also with the unification
> goal what
> >> if we align then the Bounded SDFs to have similar signatures no? I mean
> the
> >> method that returns a continuation even for the Bounded case.
> >
> >
> > Bounded SDFs are allowed to have a method signature which has void as
> the return type OR a ProcessContinuation. Unbounded SDFs must use a
> ProcessContinuation as the return type.  The "void" return case improves
> ease of use since it is likely to be the common case for bounded SDFs.
> >
> >>
> >> > Currently the watermark that is reported as part of the PollResult is
> passed
> >> > to the ProcessContext.updateWatermark [1, 2] function and instead
> that call
> >> > would be redirected to the ManualWatermarkEstimator.setWatermark
> function [3].
> >>
> >> Is there a JIRA for the Watch adjustments so we don't forget to
> integrate the
> >> WatermarkEstimators in? I am really curious on the implementation to
> see if I
> >> finally understand the internals of Watch too.
> >
> >
> > Migrating from ProcessContext#updateWatermark to WatermarkEstimators
> will require updating Watch. Filed BEAM-9430.
> >
> >>
> >> Extra question: Do you think we can have a naive version of Unbounded
> SDF like
> >> we have the naive one on classical runners (if I understood correctly
> the
> >> current one is only for portable runners). I worry about the adoption
> potential.
> >
> >
> > There is SplittableParDoViaKeyedWorkItems[1] that can be used by
> classical runners but it has limitations.
> >
> > 1:
> https://github.com/apache/beam/blob/master/runners/core-java/src/main/java/org/apache/beam/runners/core/SplittableParDoViaKeyedWorkItems.java
> >
> >>
> >> On Tue, Mar 3, 2020 at 1:41 AM Robert Bradshaw <ro...@google.com>
> wrote:
> >> >
> >> > I don't have a strong preference for using a provider/having a set of
> >> > tightly coupled methods in Java, other than that we be consistent (and
> >> > we already use the methods style for restrictions).
> >> >
> >> > On Mon, Mar 2, 2020 at 3:32 PM Luke Cwik <lc...@google.com> wrote:
> >> > >
> >> > > Jan, there are some parts of Apache Beam the watermarks package
> will likely rely on (@Experimental annotation, javadoc links) but
> fundamentally should not rely on core and someone could create a separate
> package for this.
> >> >
> >> > I think it does make sense for a set of common watermark trackers to
> >> > be shipped with core (e.g. manual, monotonic, and eventually a
> >> > probabilistic one).
> >> >
> >> > > Ismael, the unification of bounded/unbounded within SplittableDoFn
> has always been a goal. There are a set of features that BoundedSources are
> unlikely to use but would still be allowed to use them. For example,
> bounded sources may want to have support for checkpointing since I could
> foresee an BoundedSource that can notice that a certain resource becomes
> unavailable and can only process it later. The choice of which watermark
> estimator to use is a likely point of difference between bounded and
> unbounded SDFs since bounded SDFs would be able to use a very simple
> estimator where the watermark is held at -infinity and only advances to
> +infinity once there is no more data to process. But even though unbounded
> SDFs are likely to be the most common users of varied watermark estimators,
> a bounded SDF may still want to advance the watermark as they read records
> so that runners that are more "streaming" (for example micro batch) could
> process the entire pipeline in parallel vs other runners that execute one
> whole segment of the pipeline at a time.
> >> >
> >> > Put another way, the value of watermark trackers is to allow
> >> > processing to continue downstream before the source has completed
> >> > reading. This is of course essential for streaming, but If the source
> >> > is read to completion before downstream stages start (as is the case
> >> > for most batch runners) it is not needed. What this unification does
> >> > allow, however, is a source to be written in such a way that can be
> >> > efficiently used in both batch and streaming mode.
> >> >
> >> > > Currently the watermark that is reported as part of the PollResult
> is passed to the ProcessContext.updateWatermark [1, 2] function and instead
> that call would be redirected to the ManualWatermarkEstimator.setWatermark
> function [3].
> >> > >
> >> > > 1:
> https://github.com/apache/beam/blob/a16725593b84b84b37bc67cd202d1ac8b724c6f4/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/Watch.java#L757
> >> > > 2:
> https://github.com/apache/beam/blob/a16725593b84b84b37bc67cd202d1ac8b724c6f4/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/DoFn.java#L275
> >> > > 3:
> https://github.com/apache/beam/blob/fb42666a4e1aec0413f161c742d8f010ef9fe9f2/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/splittabledofn/ManualWatermarkEstimator.java#L45
> >> > >
> >> > > On Fri, Feb 28, 2020 at 6:09 AM Ismaël Mejía <ie...@gmail.com>
> wrote:
> >> > >>
> >> > >> I just realized that the HBaseIO example is not a good one because
> we can
> >> > >> already have Watch like behavior as we do for Partition discovery
> in HCatalogIO.
> >> > >> Still I am interested on your views on bounded/unbounded
> unification.
> >> > >>
> >> > >> Interesting question2: How this will annotations connect with the
> Watch
> >> > >> transform Polling patterns?
> >> > >>
> https://github.com/apache/beam/blob/650e6cd9c707472c34055382a1356cf22d14ee5e/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/Watch.java#L178
> >> > >>
> >> > >>
> >> > >> On Fri, Feb 28, 2020 at 10:47 AM Ismaël Mejía <ie...@gmail.com>
> wrote:
> >> > >>>
> >> > >>> Really interesting! Implementing correctly the watermark has been
> a common
> >> > >>> struggle for IO authors, to the point that some IOs still have
> issues around
> >> > >>> that. So +1 for this, in particular if we can get to reuse common
> patterns.
> >> > >>> I was not aware of Boyuan's work around this, really nice.
> >> > >>>
> >> > >>> One aspect I have always being confused about since I read the
> SDF proposal
> >> > >>> documents is if we could get to have a single API for both
> Bounded and Unbounded
> >> > >>> IO by somehow assuming that with a BoundedSDF is an UnboundedSDF
> special case.
> >> > >>> Could WatermarkEstimator help in this direction?
> >> > >>>
> >> > >>> One quick case that I can think is to make the current HBaseIO
> SDF to work in an
> >> > >>> unbounded manner, for example to 'watch and read new tables'.
> >> > >>>
> >> > >>>
> >> > >>> On Thu, Feb 27, 2020 at 11:43 PM Luke Cwik <lc...@google.com>
> wrote:
> >> > >>>>
> >> > >>>> See this doc[1] and blog[2] for some context about
> SplittableDoFns.
> >> > >>>>
> >> > >>>> To support watermark reporting within the Java SDK for
> SplittableDoFns, we need a way to have SDF authors to report watermark
> estimates over the element and restriction pair that they are processing.
> >> > >>>>
> >> > >>>> For UnboundedSources, it was found to be a pain point to ask
> each SDF author to write their own watermark estimation which typically
> prevented re-use. Therefore we would like to have a "library" of watermark
> estimators that help SDF authors perform this estimation similar to how
> there is a "library" of restrictions and restriction trackers that SDF
> authors can use. For SDF authors where the existing library doesn't work,
> they can add additional ones that observe timestamps of elements or choose
> to directly report the watermark through a "ManualWatermarkEstimator"
> parameter that can be supplied to @ProcessElement methods.
> >> > >>>>
> >> > >>>> The public facing portion of the DoFn changes adds three new
> annotations for new DoFn style methods:
> >> > >>>> GetInitialWatermarkEstimatorState: Returns the initial watermark
> state, similar to GetInitialRestriction
> >> > >>>> GetWatermarkEstimatorStateCoder: Returns a coder compatible with
> watermark state type, similar to GetRestrictionCoder for restrictions
> returned by GetInitialRestriction.
> >> > >>>> NewWatermarkEstimator: Returns a watermark estimator that either
> the framework invokes allowing it to observe the timestamps of output
> records or a manual watermark estimator that can be explicitly invoked to
> update the watermark.
> >> > >>>>
> >> > >>>> See [3] for an initial PR with the public facing additions to
> the core Java API related to SplittableDoFn.
> >> > >>>>
> >> > >>>> This mirrors a bunch of work that was done by Boyuan within the
> Pyhon SDK [4, 5] but in the style of new DoFn parameter/method invocation
> we have in the Java SDK.
> >> > >>>>
> >> > >>>> 1: https://s.apache.org/splittable-do-fn
> >> > >>>> 2: https://beam.apache.org/blog/2017/08/16/splittable-do-fn.html
> >> > >>>> 3: https://github.com/apache/beam/pull/10992
> >> > >>>> 4: https://github.com/apache/beam/pull/9794
> >> > >>>> 5: https://github.com/apache/beam/pull/10375
>

Re: Java SplittableDoFn Watermark API

Posted by Ismaël Mejía <ie...@gmail.com>.
> Bounded SDFs are allowed to have a method signature which has void as the
> return type OR a ProcessContinuation. Unbounded SDFs must use a
> ProcessContinuation as the return type.  The "void" return case improves ease
> of use since it is likely to be the common case for bounded SDFs.

Luke, thanks for the answer, so I can assume that in the processElement of an
existing Bounded SDF like HBaseReadSplittableDoFn I can return also
ProcessContinuation? If so maybe it is worth to make it explicit (at least in
some example) instead of the 'ease of use' approach.  Maybe to clarify my point
(and to connect with the WatermarkEstimator concept) what I am trying to
understand is how to evolve an existing Bounded SDF into an Unbounded one? Is
this just that signature + assigning a different WatermarkEstimator. Again I
think an example is worth (a thousand words).

> There is SplittableParDoViaKeyedWorkItems[1] that can be used by classical
> runners but it has limitations.

Can you please elaborate on the limitations? Will we have a default translation
override for Unbounded SDF? The existing override for Bounded SDF brought
support for almost every runner (even if in a naive way) and unlock IO authors
to develop IOs that could be tested easily, it looks like worth to have a
default override for the Unbounded case.

On Tue, Mar 3, 2020 at 7:03 PM Luke Cwik <lc...@google.com> wrote:
>
>
>
> On Tue, Mar 3, 2020 at 9:11 AM Ismaël Mejía <ie...@gmail.com> wrote:
>>
>> > the unification of bounded/unbounded within SplittableDoFn has always been a goal.
>>
>> I am glad to know that my intuition is correct and that this was envisioned, the
>> idea of checkpoints for bounded inputs sounds super really useful. Eager to try
>> that on practice.
>>
>> An explicit example (with a WatermarkEstimator for a bounded case would be
>> really nice to see, for learning purposes), also with the unification goal what
>> if we align then the Bounded SDFs to have similar signatures no? I mean the
>> method that returns a continuation even for the Bounded case.
>
>
> Bounded SDFs are allowed to have a method signature which has void as the return type OR a ProcessContinuation. Unbounded SDFs must use a ProcessContinuation as the return type.  The "void" return case improves ease of use since it is likely to be the common case for bounded SDFs.
>
>>
>> > Currently the watermark that is reported as part of the PollResult is passed
>> > to the ProcessContext.updateWatermark [1, 2] function and instead that call
>> > would be redirected to the ManualWatermarkEstimator.setWatermark function [3].
>>
>> Is there a JIRA for the Watch adjustments so we don't forget to integrate the
>> WatermarkEstimators in? I am really curious on the implementation to see if I
>> finally understand the internals of Watch too.
>
>
> Migrating from ProcessContext#updateWatermark to WatermarkEstimators will require updating Watch. Filed BEAM-9430.
>
>>
>> Extra question: Do you think we can have a naive version of Unbounded SDF like
>> we have the naive one on classical runners (if I understood correctly the
>> current one is only for portable runners). I worry about the adoption potential.
>
>
> There is SplittableParDoViaKeyedWorkItems[1] that can be used by classical runners but it has limitations.
>
> 1: https://github.com/apache/beam/blob/master/runners/core-java/src/main/java/org/apache/beam/runners/core/SplittableParDoViaKeyedWorkItems.java
>
>>
>> On Tue, Mar 3, 2020 at 1:41 AM Robert Bradshaw <ro...@google.com> wrote:
>> >
>> > I don't have a strong preference for using a provider/having a set of
>> > tightly coupled methods in Java, other than that we be consistent (and
>> > we already use the methods style for restrictions).
>> >
>> > On Mon, Mar 2, 2020 at 3:32 PM Luke Cwik <lc...@google.com> wrote:
>> > >
>> > > Jan, there are some parts of Apache Beam the watermarks package will likely rely on (@Experimental annotation, javadoc links) but fundamentally should not rely on core and someone could create a separate package for this.
>> >
>> > I think it does make sense for a set of common watermark trackers to
>> > be shipped with core (e.g. manual, monotonic, and eventually a
>> > probabilistic one).
>> >
>> > > Ismael, the unification of bounded/unbounded within SplittableDoFn has always been a goal. There are a set of features that BoundedSources are unlikely to use but would still be allowed to use them. For example, bounded sources may want to have support for checkpointing since I could foresee an BoundedSource that can notice that a certain resource becomes unavailable and can only process it later. The choice of which watermark estimator to use is a likely point of difference between bounded and unbounded SDFs since bounded SDFs would be able to use a very simple estimator where the watermark is held at -infinity and only advances to +infinity once there is no more data to process. But even though unbounded SDFs are likely to be the most common users of varied watermark estimators, a bounded SDF may still want to advance the watermark as they read records so that runners that are more "streaming" (for example micro batch) could process the entire pipeline in parallel vs other runners that execute one whole segment of the pipeline at a time.
>> >
>> > Put another way, the value of watermark trackers is to allow
>> > processing to continue downstream before the source has completed
>> > reading. This is of course essential for streaming, but If the source
>> > is read to completion before downstream stages start (as is the case
>> > for most batch runners) it is not needed. What this unification does
>> > allow, however, is a source to be written in such a way that can be
>> > efficiently used in both batch and streaming mode.
>> >
>> > > Currently the watermark that is reported as part of the PollResult is passed to the ProcessContext.updateWatermark [1, 2] function and instead that call would be redirected to the ManualWatermarkEstimator.setWatermark function [3].
>> > >
>> > > 1: https://github.com/apache/beam/blob/a16725593b84b84b37bc67cd202d1ac8b724c6f4/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/Watch.java#L757
>> > > 2: https://github.com/apache/beam/blob/a16725593b84b84b37bc67cd202d1ac8b724c6f4/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/DoFn.java#L275
>> > > 3: https://github.com/apache/beam/blob/fb42666a4e1aec0413f161c742d8f010ef9fe9f2/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/splittabledofn/ManualWatermarkEstimator.java#L45
>> > >
>> > > On Fri, Feb 28, 2020 at 6:09 AM Ismaël Mejía <ie...@gmail.com> wrote:
>> > >>
>> > >> I just realized that the HBaseIO example is not a good one because we can
>> > >> already have Watch like behavior as we do for Partition discovery in HCatalogIO.
>> > >> Still I am interested on your views on bounded/unbounded unification.
>> > >>
>> > >> Interesting question2: How this will annotations connect with the Watch
>> > >> transform Polling patterns?
>> > >> https://github.com/apache/beam/blob/650e6cd9c707472c34055382a1356cf22d14ee5e/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/Watch.java#L178
>> > >>
>> > >>
>> > >> On Fri, Feb 28, 2020 at 10:47 AM Ismaël Mejía <ie...@gmail.com> wrote:
>> > >>>
>> > >>> Really interesting! Implementing correctly the watermark has been a common
>> > >>> struggle for IO authors, to the point that some IOs still have issues around
>> > >>> that. So +1 for this, in particular if we can get to reuse common patterns.
>> > >>> I was not aware of Boyuan's work around this, really nice.
>> > >>>
>> > >>> One aspect I have always being confused about since I read the SDF proposal
>> > >>> documents is if we could get to have a single API for both Bounded and Unbounded
>> > >>> IO by somehow assuming that with a BoundedSDF is an UnboundedSDF special case.
>> > >>> Could WatermarkEstimator help in this direction?
>> > >>>
>> > >>> One quick case that I can think is to make the current HBaseIO SDF to work in an
>> > >>> unbounded manner, for example to 'watch and read new tables'.
>> > >>>
>> > >>>
>> > >>> On Thu, Feb 27, 2020 at 11:43 PM Luke Cwik <lc...@google.com> wrote:
>> > >>>>
>> > >>>> See this doc[1] and blog[2] for some context about SplittableDoFns.
>> > >>>>
>> > >>>> To support watermark reporting within the Java SDK for SplittableDoFns, we need a way to have SDF authors to report watermark estimates over the element and restriction pair that they are processing.
>> > >>>>
>> > >>>> For UnboundedSources, it was found to be a pain point to ask each SDF author to write their own watermark estimation which typically prevented re-use. Therefore we would like to have a "library" of watermark estimators that help SDF authors perform this estimation similar to how there is a "library" of restrictions and restriction trackers that SDF authors can use. For SDF authors where the existing library doesn't work, they can add additional ones that observe timestamps of elements or choose to directly report the watermark through a "ManualWatermarkEstimator" parameter that can be supplied to @ProcessElement methods.
>> > >>>>
>> > >>>> The public facing portion of the DoFn changes adds three new annotations for new DoFn style methods:
>> > >>>> GetInitialWatermarkEstimatorState: Returns the initial watermark state, similar to GetInitialRestriction
>> > >>>> GetWatermarkEstimatorStateCoder: Returns a coder compatible with watermark state type, similar to GetRestrictionCoder for restrictions returned by GetInitialRestriction.
>> > >>>> NewWatermarkEstimator: Returns a watermark estimator that either the framework invokes allowing it to observe the timestamps of output records or a manual watermark estimator that can be explicitly invoked to update the watermark.
>> > >>>>
>> > >>>> See [3] for an initial PR with the public facing additions to the core Java API related to SplittableDoFn.
>> > >>>>
>> > >>>> This mirrors a bunch of work that was done by Boyuan within the Pyhon SDK [4, 5] but in the style of new DoFn parameter/method invocation we have in the Java SDK.
>> > >>>>
>> > >>>> 1: https://s.apache.org/splittable-do-fn
>> > >>>> 2: https://beam.apache.org/blog/2017/08/16/splittable-do-fn.html
>> > >>>> 3: https://github.com/apache/beam/pull/10992
>> > >>>> 4: https://github.com/apache/beam/pull/9794
>> > >>>> 5: https://github.com/apache/beam/pull/10375

Re: Java SplittableDoFn Watermark API

Posted by Luke Cwik <lc...@google.com>.
On Tue, Mar 3, 2020 at 9:11 AM Ismaël Mejía <ie...@gmail.com> wrote:

> > the unification of bounded/unbounded within SplittableDoFn has always
> been a goal.
>
> I am glad to know that my intuition is correct and that this was
> envisioned, the
> idea of checkpoints for bounded inputs sounds super really useful. Eager
> to try
> that on practice.
>
> An explicit example (with a WatermarkEstimator for a bounded case would be
> really nice to see, for learning purposes), also with the unification goal
> what
> if we align then the Bounded SDFs to have similar signatures no? I mean the
> method that returns a continuation even for the Bounded case.
>

Bounded SDFs are allowed to have a method signature which has void as the
return type OR a ProcessContinuation. Unbounded SDFs must use a
ProcessContinuation as the return type.  The "void" return case improves
ease of use since it is likely to be the common case for bounded SDFs.


> > Currently the watermark that is reported as part of the PollResult is
> passed
> > to the ProcessContext.updateWatermark [1, 2] function and instead that
> call
> > would be redirected to the ManualWatermarkEstimator.setWatermark
> function [3].
>
> Is there a JIRA for the Watch adjustments so we don't forget to integrate
> the
> WatermarkEstimators in? I am really curious on the implementation to see
> if I
> finally understand the internals of Watch too.
>

Migrating from ProcessContext#updateWatermark to WatermarkEstimators will
require updating Watch. Filed BEAM-9430.


> Extra question: Do you think we can have a naive version of Unbounded SDF
> like
> we have the naive one on classical runners (if I understood correctly the
> current one is only for portable runners). I worry about the adoption
> potential.
>

There is SplittableParDoViaKeyedWorkItems[1] that can be used by classical
runners but it has limitations.

1:
https://github.com/apache/beam/blob/master/runners/core-java/src/main/java/org/apache/beam/runners/core/SplittableParDoViaKeyedWorkItems.java


> On Tue, Mar 3, 2020 at 1:41 AM Robert Bradshaw <ro...@google.com>
> wrote:
> >
> > I don't have a strong preference for using a provider/having a set of
> > tightly coupled methods in Java, other than that we be consistent (and
> > we already use the methods style for restrictions).
> >
> > On Mon, Mar 2, 2020 at 3:32 PM Luke Cwik <lc...@google.com> wrote:
> > >
> > > Jan, there are some parts of Apache Beam the watermarks package will
> likely rely on (@Experimental annotation, javadoc links) but fundamentally
> should not rely on core and someone could create a separate package for
> this.
> >
> > I think it does make sense for a set of common watermark trackers to
> > be shipped with core (e.g. manual, monotonic, and eventually a
> > probabilistic one).
> >
> > > Ismael, the unification of bounded/unbounded within SplittableDoFn has
> always been a goal. There are a set of features that BoundedSources are
> unlikely to use but would still be allowed to use them. For example,
> bounded sources may want to have support for checkpointing since I could
> foresee an BoundedSource that can notice that a certain resource becomes
> unavailable and can only process it later. The choice of which watermark
> estimator to use is a likely point of difference between bounded and
> unbounded SDFs since bounded SDFs would be able to use a very simple
> estimator where the watermark is held at -infinity and only advances to
> +infinity once there is no more data to process. But even though unbounded
> SDFs are likely to be the most common users of varied watermark estimators,
> a bounded SDF may still want to advance the watermark as they read records
> so that runners that are more "streaming" (for example micro batch) could
> process the entire pipeline in parallel vs other runners that execute one
> whole segment of the pipeline at a time.
> >
> > Put another way, the value of watermark trackers is to allow
> > processing to continue downstream before the source has completed
> > reading. This is of course essential for streaming, but If the source
> > is read to completion before downstream stages start (as is the case
> > for most batch runners) it is not needed. What this unification does
> > allow, however, is a source to be written in such a way that can be
> > efficiently used in both batch and streaming mode.
> >
> > > Currently the watermark that is reported as part of the PollResult is
> passed to the ProcessContext.updateWatermark [1, 2] function and instead
> that call would be redirected to the ManualWatermarkEstimator.setWatermark
> function [3].
> > >
> > > 1:
> https://github.com/apache/beam/blob/a16725593b84b84b37bc67cd202d1ac8b724c6f4/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/Watch.java#L757
> > > 2:
> https://github.com/apache/beam/blob/a16725593b84b84b37bc67cd202d1ac8b724c6f4/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/DoFn.java#L275
> > > 3:
> https://github.com/apache/beam/blob/fb42666a4e1aec0413f161c742d8f010ef9fe9f2/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/splittabledofn/ManualWatermarkEstimator.java#L45
> > >
> > > On Fri, Feb 28, 2020 at 6:09 AM Ismaël Mejía <ie...@gmail.com>
> wrote:
> > >>
> > >> I just realized that the HBaseIO example is not a good one because we
> can
> > >> already have Watch like behavior as we do for Partition discovery in
> HCatalogIO.
> > >> Still I am interested on your views on bounded/unbounded unification.
> > >>
> > >> Interesting question2: How this will annotations connect with the
> Watch
> > >> transform Polling patterns?
> > >>
> https://github.com/apache/beam/blob/650e6cd9c707472c34055382a1356cf22d14ee5e/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/Watch.java#L178
> > >>
> > >>
> > >> On Fri, Feb 28, 2020 at 10:47 AM Ismaël Mejía <ie...@gmail.com>
> wrote:
> > >>>
> > >>> Really interesting! Implementing correctly the watermark has been a
> common
> > >>> struggle for IO authors, to the point that some IOs still have
> issues around
> > >>> that. So +1 for this, in particular if we can get to reuse common
> patterns.
> > >>> I was not aware of Boyuan's work around this, really nice.
> > >>>
> > >>> One aspect I have always being confused about since I read the SDF
> proposal
> > >>> documents is if we could get to have a single API for both Bounded
> and Unbounded
> > >>> IO by somehow assuming that with a BoundedSDF is an UnboundedSDF
> special case.
> > >>> Could WatermarkEstimator help in this direction?
> > >>>
> > >>> One quick case that I can think is to make the current HBaseIO SDF
> to work in an
> > >>> unbounded manner, for example to 'watch and read new tables'.
> > >>>
> > >>>
> > >>> On Thu, Feb 27, 2020 at 11:43 PM Luke Cwik <lc...@google.com> wrote:
> > >>>>
> > >>>> See this doc[1] and blog[2] for some context about SplittableDoFns.
> > >>>>
> > >>>> To support watermark reporting within the Java SDK for
> SplittableDoFns, we need a way to have SDF authors to report watermark
> estimates over the element and restriction pair that they are processing.
> > >>>>
> > >>>> For UnboundedSources, it was found to be a pain point to ask each
> SDF author to write their own watermark estimation which typically
> prevented re-use. Therefore we would like to have a "library" of watermark
> estimators that help SDF authors perform this estimation similar to how
> there is a "library" of restrictions and restriction trackers that SDF
> authors can use. For SDF authors where the existing library doesn't work,
> they can add additional ones that observe timestamps of elements or choose
> to directly report the watermark through a "ManualWatermarkEstimator"
> parameter that can be supplied to @ProcessElement methods.
> > >>>>
> > >>>> The public facing portion of the DoFn changes adds three new
> annotations for new DoFn style methods:
> > >>>> GetInitialWatermarkEstimatorState: Returns the initial watermark
> state, similar to GetInitialRestriction
> > >>>> GetWatermarkEstimatorStateCoder: Returns a coder compatible with
> watermark state type, similar to GetRestrictionCoder for restrictions
> returned by GetInitialRestriction.
> > >>>> NewWatermarkEstimator: Returns a watermark estimator that either
> the framework invokes allowing it to observe the timestamps of output
> records or a manual watermark estimator that can be explicitly invoked to
> update the watermark.
> > >>>>
> > >>>> See [3] for an initial PR with the public facing additions to the
> core Java API related to SplittableDoFn.
> > >>>>
> > >>>> This mirrors a bunch of work that was done by Boyuan within the
> Pyhon SDK [4, 5] but in the style of new DoFn parameter/method invocation
> we have in the Java SDK.
> > >>>>
> > >>>> 1: https://s.apache.org/splittable-do-fn
> > >>>> 2: https://beam.apache.org/blog/2017/08/16/splittable-do-fn.html
> > >>>> 3: https://github.com/apache/beam/pull/10992
> > >>>> 4: https://github.com/apache/beam/pull/9794
> > >>>> 5: https://github.com/apache/beam/pull/10375
>

Re: Java SplittableDoFn Watermark API

Posted by Ismaël Mejía <ie...@gmail.com>.
> the unification of bounded/unbounded within SplittableDoFn has always been a goal.

I am glad to know that my intuition is correct and that this was envisioned, the
idea of checkpoints for bounded inputs sounds super really useful. Eager to try
that on practice.

An explicit example (with a WatermarkEstimator for a bounded case would be
really nice to see, for learning purposes), also with the unification goal what
if we align then the Bounded SDFs to have similar signatures no? I mean the
method that returns a continuation even for the Bounded case.

> Currently the watermark that is reported as part of the PollResult is passed
> to the ProcessContext.updateWatermark [1, 2] function and instead that call
> would be redirected to the ManualWatermarkEstimator.setWatermark function [3].

Is there a JIRA for the Watch adjustments so we don't forget to integrate the
WatermarkEstimators in? I am really curious on the implementation to see if I
finally understand the internals of Watch too.

Extra question: Do you think we can have a naive version of Unbounded SDF like
we have the naive one on classical runners (if I understood correctly the
current one is only for portable runners). I worry about the adoption potential.

On Tue, Mar 3, 2020 at 1:41 AM Robert Bradshaw <ro...@google.com> wrote:
>
> I don't have a strong preference for using a provider/having a set of
> tightly coupled methods in Java, other than that we be consistent (and
> we already use the methods style for restrictions).
>
> On Mon, Mar 2, 2020 at 3:32 PM Luke Cwik <lc...@google.com> wrote:
> >
> > Jan, there are some parts of Apache Beam the watermarks package will likely rely on (@Experimental annotation, javadoc links) but fundamentally should not rely on core and someone could create a separate package for this.
>
> I think it does make sense for a set of common watermark trackers to
> be shipped with core (e.g. manual, monotonic, and eventually a
> probabilistic one).
>
> > Ismael, the unification of bounded/unbounded within SplittableDoFn has always been a goal. There are a set of features that BoundedSources are unlikely to use but would still be allowed to use them. For example, bounded sources may want to have support for checkpointing since I could foresee an BoundedSource that can notice that a certain resource becomes unavailable and can only process it later. The choice of which watermark estimator to use is a likely point of difference between bounded and unbounded SDFs since bounded SDFs would be able to use a very simple estimator where the watermark is held at -infinity and only advances to +infinity once there is no more data to process. But even though unbounded SDFs are likely to be the most common users of varied watermark estimators, a bounded SDF may still want to advance the watermark as they read records so that runners that are more "streaming" (for example micro batch) could process the entire pipeline in parallel vs other runners that execute one whole segment of the pipeline at a time.
>
> Put another way, the value of watermark trackers is to allow
> processing to continue downstream before the source has completed
> reading. This is of course essential for streaming, but If the source
> is read to completion before downstream stages start (as is the case
> for most batch runners) it is not needed. What this unification does
> allow, however, is a source to be written in such a way that can be
> efficiently used in both batch and streaming mode.
>
> > Currently the watermark that is reported as part of the PollResult is passed to the ProcessContext.updateWatermark [1, 2] function and instead that call would be redirected to the ManualWatermarkEstimator.setWatermark function [3].
> >
> > 1: https://github.com/apache/beam/blob/a16725593b84b84b37bc67cd202d1ac8b724c6f4/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/Watch.java#L757
> > 2: https://github.com/apache/beam/blob/a16725593b84b84b37bc67cd202d1ac8b724c6f4/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/DoFn.java#L275
> > 3: https://github.com/apache/beam/blob/fb42666a4e1aec0413f161c742d8f010ef9fe9f2/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/splittabledofn/ManualWatermarkEstimator.java#L45
> >
> > On Fri, Feb 28, 2020 at 6:09 AM Ismaël Mejía <ie...@gmail.com> wrote:
> >>
> >> I just realized that the HBaseIO example is not a good one because we can
> >> already have Watch like behavior as we do for Partition discovery in HCatalogIO.
> >> Still I am interested on your views on bounded/unbounded unification.
> >>
> >> Interesting question2: How this will annotations connect with the Watch
> >> transform Polling patterns?
> >> https://github.com/apache/beam/blob/650e6cd9c707472c34055382a1356cf22d14ee5e/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/Watch.java#L178
> >>
> >>
> >> On Fri, Feb 28, 2020 at 10:47 AM Ismaël Mejía <ie...@gmail.com> wrote:
> >>>
> >>> Really interesting! Implementing correctly the watermark has been a common
> >>> struggle for IO authors, to the point that some IOs still have issues around
> >>> that. So +1 for this, in particular if we can get to reuse common patterns.
> >>> I was not aware of Boyuan's work around this, really nice.
> >>>
> >>> One aspect I have always being confused about since I read the SDF proposal
> >>> documents is if we could get to have a single API for both Bounded and Unbounded
> >>> IO by somehow assuming that with a BoundedSDF is an UnboundedSDF special case.
> >>> Could WatermarkEstimator help in this direction?
> >>>
> >>> One quick case that I can think is to make the current HBaseIO SDF to work in an
> >>> unbounded manner, for example to 'watch and read new tables'.
> >>>
> >>>
> >>> On Thu, Feb 27, 2020 at 11:43 PM Luke Cwik <lc...@google.com> wrote:
> >>>>
> >>>> See this doc[1] and blog[2] for some context about SplittableDoFns.
> >>>>
> >>>> To support watermark reporting within the Java SDK for SplittableDoFns, we need a way to have SDF authors to report watermark estimates over the element and restriction pair that they are processing.
> >>>>
> >>>> For UnboundedSources, it was found to be a pain point to ask each SDF author to write their own watermark estimation which typically prevented re-use. Therefore we would like to have a "library" of watermark estimators that help SDF authors perform this estimation similar to how there is a "library" of restrictions and restriction trackers that SDF authors can use. For SDF authors where the existing library doesn't work, they can add additional ones that observe timestamps of elements or choose to directly report the watermark through a "ManualWatermarkEstimator" parameter that can be supplied to @ProcessElement methods.
> >>>>
> >>>> The public facing portion of the DoFn changes adds three new annotations for new DoFn style methods:
> >>>> GetInitialWatermarkEstimatorState: Returns the initial watermark state, similar to GetInitialRestriction
> >>>> GetWatermarkEstimatorStateCoder: Returns a coder compatible with watermark state type, similar to GetRestrictionCoder for restrictions returned by GetInitialRestriction.
> >>>> NewWatermarkEstimator: Returns a watermark estimator that either the framework invokes allowing it to observe the timestamps of output records or a manual watermark estimator that can be explicitly invoked to update the watermark.
> >>>>
> >>>> See [3] for an initial PR with the public facing additions to the core Java API related to SplittableDoFn.
> >>>>
> >>>> This mirrors a bunch of work that was done by Boyuan within the Pyhon SDK [4, 5] but in the style of new DoFn parameter/method invocation we have in the Java SDK.
> >>>>
> >>>> 1: https://s.apache.org/splittable-do-fn
> >>>> 2: https://beam.apache.org/blog/2017/08/16/splittable-do-fn.html
> >>>> 3: https://github.com/apache/beam/pull/10992
> >>>> 4: https://github.com/apache/beam/pull/9794
> >>>> 5: https://github.com/apache/beam/pull/10375

Re: Java SplittableDoFn Watermark API

Posted by Robert Bradshaw <ro...@google.com>.
I don't have a strong preference for using a provider/having a set of
tightly coupled methods in Java, other than that we be consistent (and
we already use the methods style for restrictions).

On Mon, Mar 2, 2020 at 3:32 PM Luke Cwik <lc...@google.com> wrote:
>
> Jan, there are some parts of Apache Beam the watermarks package will likely rely on (@Experimental annotation, javadoc links) but fundamentally should not rely on core and someone could create a separate package for this.

I think it does make sense for a set of common watermark trackers to
be shipped with core (e.g. manual, monotonic, and eventually a
probabilistic one).

> Ismael, the unification of bounded/unbounded within SplittableDoFn has always been a goal. There are a set of features that BoundedSources are unlikely to use but would still be allowed to use them. For example, bounded sources may want to have support for checkpointing since I could foresee an BoundedSource that can notice that a certain resource becomes unavailable and can only process it later. The choice of which watermark estimator to use is a likely point of difference between bounded and unbounded SDFs since bounded SDFs would be able to use a very simple estimator where the watermark is held at -infinity and only advances to +infinity once there is no more data to process. But even though unbounded SDFs are likely to be the most common users of varied watermark estimators, a bounded SDF may still want to advance the watermark as they read records so that runners that are more "streaming" (for example micro batch) could process the entire pipeline in parallel vs other runners that execute one whole segment of the pipeline at a time.

Put another way, the value of watermark trackers is to allow
processing to continue downstream before the source has completed
reading. This is of course essential for streaming, but If the source
is read to completion before downstream stages start (as is the case
for most batch runners) it is not needed. What this unification does
allow, however, is a source to be written in such a way that can be
efficiently used in both batch and streaming mode.

> Currently the watermark that is reported as part of the PollResult is passed to the ProcessContext.updateWatermark [1, 2] function and instead that call would be redirected to the ManualWatermarkEstimator.setWatermark function [3].
>
> 1: https://github.com/apache/beam/blob/a16725593b84b84b37bc67cd202d1ac8b724c6f4/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/Watch.java#L757
> 2: https://github.com/apache/beam/blob/a16725593b84b84b37bc67cd202d1ac8b724c6f4/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/DoFn.java#L275
> 3: https://github.com/apache/beam/blob/fb42666a4e1aec0413f161c742d8f010ef9fe9f2/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/splittabledofn/ManualWatermarkEstimator.java#L45
>
> On Fri, Feb 28, 2020 at 6:09 AM Ismaël Mejía <ie...@gmail.com> wrote:
>>
>> I just realized that the HBaseIO example is not a good one because we can
>> already have Watch like behavior as we do for Partition discovery in HCatalogIO.
>> Still I am interested on your views on bounded/unbounded unification.
>>
>> Interesting question2: How this will annotations connect with the Watch
>> transform Polling patterns?
>> https://github.com/apache/beam/blob/650e6cd9c707472c34055382a1356cf22d14ee5e/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/Watch.java#L178
>>
>>
>> On Fri, Feb 28, 2020 at 10:47 AM Ismaël Mejía <ie...@gmail.com> wrote:
>>>
>>> Really interesting! Implementing correctly the watermark has been a common
>>> struggle for IO authors, to the point that some IOs still have issues around
>>> that. So +1 for this, in particular if we can get to reuse common patterns.
>>> I was not aware of Boyuan's work around this, really nice.
>>>
>>> One aspect I have always being confused about since I read the SDF proposal
>>> documents is if we could get to have a single API for both Bounded and Unbounded
>>> IO by somehow assuming that with a BoundedSDF is an UnboundedSDF special case.
>>> Could WatermarkEstimator help in this direction?
>>>
>>> One quick case that I can think is to make the current HBaseIO SDF to work in an
>>> unbounded manner, for example to 'watch and read new tables'.
>>>
>>>
>>> On Thu, Feb 27, 2020 at 11:43 PM Luke Cwik <lc...@google.com> wrote:
>>>>
>>>> See this doc[1] and blog[2] for some context about SplittableDoFns.
>>>>
>>>> To support watermark reporting within the Java SDK for SplittableDoFns, we need a way to have SDF authors to report watermark estimates over the element and restriction pair that they are processing.
>>>>
>>>> For UnboundedSources, it was found to be a pain point to ask each SDF author to write their own watermark estimation which typically prevented re-use. Therefore we would like to have a "library" of watermark estimators that help SDF authors perform this estimation similar to how there is a "library" of restrictions and restriction trackers that SDF authors can use. For SDF authors where the existing library doesn't work, they can add additional ones that observe timestamps of elements or choose to directly report the watermark through a "ManualWatermarkEstimator" parameter that can be supplied to @ProcessElement methods.
>>>>
>>>> The public facing portion of the DoFn changes adds three new annotations for new DoFn style methods:
>>>> GetInitialWatermarkEstimatorState: Returns the initial watermark state, similar to GetInitialRestriction
>>>> GetWatermarkEstimatorStateCoder: Returns a coder compatible with watermark state type, similar to GetRestrictionCoder for restrictions returned by GetInitialRestriction.
>>>> NewWatermarkEstimator: Returns a watermark estimator that either the framework invokes allowing it to observe the timestamps of output records or a manual watermark estimator that can be explicitly invoked to update the watermark.
>>>>
>>>> See [3] for an initial PR with the public facing additions to the core Java API related to SplittableDoFn.
>>>>
>>>> This mirrors a bunch of work that was done by Boyuan within the Pyhon SDK [4, 5] but in the style of new DoFn parameter/method invocation we have in the Java SDK.
>>>>
>>>> 1: https://s.apache.org/splittable-do-fn
>>>> 2: https://beam.apache.org/blog/2017/08/16/splittable-do-fn.html
>>>> 3: https://github.com/apache/beam/pull/10992
>>>> 4: https://github.com/apache/beam/pull/9794
>>>> 5: https://github.com/apache/beam/pull/10375