You are viewing a plain text version of this content. The canonical link for it is here.
Posted to dev@beam.apache.org by Luke Cwik <lc...@google.com> on 2020/08/10 20:59:45 UTC

[DISCUSS][BEAM-10670] Migrating BoundedSource/UnboundedSource to execute as a Splittable DoFn for non-portable Java runners

In the past couple of months wrappers[1, 2] have been added to the Beam
Java SDK which can execute BoundedSource and UnboundedSource as Splittable
DoFns. These have been opt-out for portable pipelines (e.g. Dataflow runner
v2, XLang pipelines on Flink/Spark) and opt-in using an experiment for all
other pipelines.

I would like to start making the non-portable pipelines starting with the
DirectRunner[3] to be opt-out with the plan that eventually all runners
will only execute splittable DoFns and the BoundedSource/UnboundedSource
specific execution logic from the runners will be removed.

Users will be able to opt-in any pipeline using the experiment
'use_sdf_read' and opt-out with the experiment 'use_deprecated_read'. (For
portable pipelines these experiments were 'beam_fn_api' and
'beam_fn_api_use_deprecated_read' respectively and I have added these two
additional aliases to make the experience less confusing).

1:
https://github.com/apache/beam/blob/af1ce643d8fde5352d4519a558de4a2dfd24721d/sdks/java/core/src/main/java/org/apache/beam/sdk/io/Read.java#L275
2:
https://github.com/apache/beam/blob/af1ce643d8fde5352d4519a558de4a2dfd24721d/sdks/java/core/src/main/java/org/apache/beam/sdk/io/Read.java#L449
3: https://github.com/apache/beam/pull/12519

Re: [DISCUSS][BEAM-10670] Migrating BoundedSource/UnboundedSource to execute as a Splittable DoFn for non-portable Java runners

Posted by Robert Burke <ro...@frantil.com>.
+1 to that. The programming guide is generally assumed to be up to date
which can't be said for arbitrary blog posts. Likely more discoverable too.

On Mon, Oct 19, 2020, 10:17 AM Luke Cwik <lc...@google.com> wrote:

> +Rose Nguyen <rt...@google.com> suggested that instead of just a blog,
> we should add the majority of the current blog's content to the core
> programming guide and either drop the blog and/or have a much smaller blog
> that links to the docs.
>
> I think this is a great idea, what do others think?
>
> On Wed, Oct 14, 2020 at 10:51 AM Luke Cwik <lc...@google.com> wrote:
>
>> Thanks Alexey, that is correct.
>>
>> On Wed, Oct 14, 2020 at 10:33 AM Alexey Romanenko <
>> aromanenko.dev@gmail.com> wrote:
>>
>>> Thanks Luke, just I guess that the proper link should be this one:
>>>
>>> https://docs.google.com/document/d/1kpn0RxqZaoacUPVSMYhhnfmlo8fGT-p50fEblaFr2HE
>>>
>>> On 13 Oct 2020, at 00:23, Luke Cwik <lc...@google.com> wrote:
>>>
>>> I have a draft[1] off the blog ready. Please take a look.
>>>
>>> 1:
>>> http://doc/1kpn0RxqZaoacUPVSMYhhnfmlo8fGT-p50fEblaFr2HE#heading=h.tbab2n97o3eo
>>>
>>> On Mon, Oct 5, 2020 at 4:28 PM Luke Cwik <lc...@google.com> wrote:
>>>
>>>>
>>>>
>>>> On Mon, Oct 5, 2020 at 3:45 PM Kenneth Knowles <ke...@apache.org> wrote:
>>>>
>>>>>
>>>>>
>>>>> On Mon, Oct 5, 2020 at 2:44 PM Luke Cwik <lc...@google.com> wrote:
>>>>>
>>>>>> For the 2.25 release the Java Direct, Flink, Jet, Samza, Twister2
>>>>>> will use SDF powered Read transforms. Users can opt-out
>>>>>> with --experiments=use_deprecated_read.
>>>>>>
>>>>>
>>>>> Huzzah! In our release notes maybe be clear about the expectations for
>>>>> users:
>>>>>
>>>>> Done in https://github.com/apache/beam/pull/13015
>>>>
>>>>
>>>>>  - semantics are expected to be the same: file bugs for any change in
>>>>> results
>>>>>  - perf may vary: file bugs or write to user@
>>>>>
>>>>> I was unable to get Spark done for 2.25 as I found out that Spark
>>>>>> streaming doesn't support watermark holds[1]. If someone knows more about
>>>>>> the watermark system in Spark I could use some guidance here as I believe I
>>>>>> have a version of unbounded SDF support written for Spark (I get all the
>>>>>> expected output from tests, just that watermarks aren't being held back so
>>>>>> PAssert fails).
>>>>>>
>>>>>
>>>>> Spark's watermarks are not comparable to Beam's. The rule as I
>>>>> understand it is that any data that is later than `max(seen timestamps) -
>>>>> allowedLateness` is dropped. One difference is that dropping is relative to
>>>>> the watermark instead of expiring windows, like early versions of Beam. The
>>>>> other difference is that it track the latest event (some call it a "high
>>>>> water mark" because it is the highest datetime value seen) where Beam's
>>>>> watermark is an approximation of the earliest (some call it a "low water
>>>>> mark" because it is a guarantee that it will not dip lower). When I chatted
>>>>> about this with Amit in the early days, it was necessary to implement a
>>>>> Beam-style watermark using Spark state. I think that may still be the case,
>>>>> for correct results.
>>>>>
>>>>>
>>>> In the Spark implementation I saw that watermark holds weren't wired at
>>>> all to control Sparks watermarks and this was causing triggers to fire too
>>>> early.
>>>>
>>>>
>>>>> Also, I started a doc[2] to produce an updated blog post since the
>>>>>> original SplittableDoFn blog from 2017 is out of date[3]. I was thinking of
>>>>>> making this a new blog post and having the old blog post point to it. We
>>>>>> could also remove the old blog post and or update it. Any thoughts?
>>>>>>
>>>>>
>>>>> New blog post w/ pointer from the old one.
>>>>>
>>>>> Finally, I have a clean-up PR[4] that pushes the Read -> primitive
>>>>>> Read expansion into each of the runners instead of having it within Read
>>>>>> transform within beam-sdks-java-core.
>>>>>>
>>>>>
>>>>> Approved! I did CC a bunch of runner authors already. I think the
>>>>> important thing is if a default changes we should be sure everyone is OK
>>>>> with the perf changes, and everyone is confident that no incorrect results
>>>>> are produced. The abstractions between sdk-core, runners-core-*, and
>>>>> individual runners is important to me:
>>>>>
>>>>>  - The SDK's job is to produce a portable, un-tweaked pipeline so
>>>>> moving flags out of SDK core (and IOs) ASAP is super important.
>>>>>  - The runner's job is to execute that pipeline, if they can, however
>>>>> they want. If a runner wants to run Read transforms differently/directly
>>>>> that is fine. If a runner is incapable of supporting SDF, then Read is
>>>>> better than nothing. Etc.
>>>>>  - The runners-core-* job is to just be internal libraries for runner
>>>>> authors to share code, and should not make any decisions about the Beam
>>>>> model, etc.
>>>>>
>>>>> Kenn
>>>>>
>>>>> 1: https://github.com/apache/beam/pull/12603
>>>>>> 2: http://doc/1kpn0RxqZaoacUPVSMYhhnfmlo8fGT-p50fEblaFr2HE
>>>>>> 3: https://beam.apache.org/blog/splittable-do-fn/
>>>>>> 4: https://github.com/apache/beam/pull/13006
>>>>>>
>>>>>>
>>>>>> On Fri, Aug 28, 2020 at 1:45 AM Maximilian Michels <mx...@apache.org>
>>>>>> wrote:
>>>>>>
>>>>>>> Thanks Luke! I've had a pass.
>>>>>>>
>>>>>>> -Max
>>>>>>>
>>>>>>> On 28.08.20 01:22, Luke Cwik wrote:
>>>>>>> > As an update.
>>>>>>> >
>>>>>>> > Direct and Twister2 are done.
>>>>>>> > Samza: is ready for review[1].
>>>>>>> > Flink: is almost ready for review. [2] lays all the groundwork for
>>>>>>> the
>>>>>>> > migration and [3] finishes the migration (there is a timeout
>>>>>>> happening
>>>>>>> > in FlinkSubmissionTest that I'm trying to figure out).
>>>>>>> > No further updates on Spark[4] or Jet[5].
>>>>>>> >
>>>>>>> > @Maximilian Michels <ma...@apache.org> or @thw@apache.org
>>>>>>> > <ma...@gmail.com>, can either of you take a look at
>>>>>>> the
>>>>>>> > Flink PRs?
>>>>>>> > @ke.wu.cs@icloud.com <ma...@icloud.com>, Since Xinyu
>>>>>>> delegated
>>>>>>> > to you, can you take another look at the Samza PR?
>>>>>>> >
>>>>>>> > 1: https://github.com/apache/beam/pull/12617
>>>>>>> > 2: https://github.com/apache/beam/pull/12706
>>>>>>> > 3: https://github.com/apache/beam/pull/12708
>>>>>>> > 4: https://github.com/apache/beam/pull/12603
>>>>>>> > 5: https://github.com/apache/beam/pull/12616
>>>>>>> >
>>>>>>> > On Tue, Aug 18, 2020 at 11:42 AM Pulasthi Supun Wickramasinghe
>>>>>>> > <pulasthi911@gmail.com <ma...@gmail.com>> wrote:
>>>>>>> >
>>>>>>> >     Hi Luke
>>>>>>> >
>>>>>>> >     Will take a look at this as soon as possible and get back to
>>>>>>> you.
>>>>>>> >
>>>>>>> >     Best Regards,
>>>>>>> >     Pulasthi
>>>>>>> >
>>>>>>> >     On Tue, Aug 18, 2020 at 2:30 PM Luke Cwik <lcwik@google.com
>>>>>>> >     <ma...@google.com>> wrote:
>>>>>>> >
>>>>>>> >         I have made some good progress here and have gotten to the
>>>>>>> >         following state for non-portable runners:
>>>>>>> >
>>>>>>> >         DirectRunner[1]: Merged. Supports Read.Bounded and
>>>>>>> Read.Unbounded.
>>>>>>> >         Twister2[2]: Ready for review. Supports Read.Bounded, the
>>>>>>> >         current runner doesn't support unbounded pipelines.
>>>>>>> >         Spark[3]: WIP. Supports Read.Bounded, Nexmark suite
>>>>>>> passes. Not
>>>>>>> >         certain about level of unbounded pipeline support coverage
>>>>>>> since
>>>>>>> >         Spark uses its own tiny suite of tests to get unbounded
>>>>>>> pipeline
>>>>>>> >         coverage instead of the validates runner set.
>>>>>>> >         Jet[4]: WIP. Supports Read.Bounded. Read.Unbounded
>>>>>>> definitely
>>>>>>> >         needs additional work.
>>>>>>> >         Sazma[5]: WIP. Supports Read.Bounded. Not certain about
>>>>>>> level of
>>>>>>> >         unbounded pipeline support coverage since Spark uses its
>>>>>>> own
>>>>>>> >         tiny suite of tests to get unbounded pipeline coverage
>>>>>>> instead
>>>>>>> >         of the validates runner set.
>>>>>>> >         Flink: Unstarted.
>>>>>>> >
>>>>>>> >         @Pulasthi Supun Wickramasinghe <mailto:
>>>>>>> pulasthi911@gmail.com> ,
>>>>>>> >         can you help me with the Twister2 PR[2]?
>>>>>>> >         @Ismaël Mejía <ma...@gmail.com>, is PR[3] the
>>>>>>> expected
>>>>>>> >         level of support for unbounded pipelines and hence ready
>>>>>>> for review?
>>>>>>> >         @Jozsef Bartok <ma...@hazelcast.com>, can you help
>>>>>>> me out
>>>>>>> >         to get support for unbounded splittable DoFn's into Jet[4]?
>>>>>>> >         @Xinyu Liu <ma...@gmail.com>, is PR[5] the
>>>>>>> expected
>>>>>>> >         level of support for unbounded pipelines and hence ready
>>>>>>> for review?
>>>>>>> >
>>>>>>> >         1: https://github.com/apache/beam/pull/12519
>>>>>>> >         2: https://github.com/apache/beam/pull/12594
>>>>>>> >         3: https://github.com/apache/beam/pull/12603
>>>>>>> >         4: https://github.com/apache/beam/pull/12616
>>>>>>> >         5: https://github.com/apache/beam/pull/12617
>>>>>>> >
>>>>>>> >         On Tue, Aug 11, 2020 at 10:55 AM Luke Cwik <
>>>>>>> lcwik@google.com
>>>>>>> >         <ma...@google.com>> wrote:
>>>>>>> >
>>>>>>> >             There shouldn't be any changes required since the
>>>>>>> wrapper
>>>>>>> >             will smoothly transition the execution to be run as an
>>>>>>> SDF.
>>>>>>> >             New IOs should strongly prefer to use SDF since it
>>>>>>> should be
>>>>>>> >             simpler to write and will be more flexible but they
>>>>>>> can use
>>>>>>> >             the "*Source"-based APIs. Eventually we'll deprecate
>>>>>>> the
>>>>>>> >             APIs but we will never stop supporting them.
>>>>>>> Eventually they
>>>>>>> >             should all be migrated to use SDF and if there is
>>>>>>> another
>>>>>>> >             major Beam version, we'll finally be able to remove
>>>>>>> them.
>>>>>>> >
>>>>>>> >             On Tue, Aug 11, 2020 at 8:40 AM Alexey Romanenko
>>>>>>> >             <aromanenko.dev@gmail.com <mailto:
>>>>>>> aromanenko.dev@gmail.com>>
>>>>>>> >             wrote:
>>>>>>> >
>>>>>>> >                 Hi Luke,
>>>>>>> >
>>>>>>> >                 Great to hear about such progress on this!
>>>>>>> >
>>>>>>> >                 Talking about opt-out for all runners in the
>>>>>>> future,
>>>>>>> >                 will it require any code changes for current
>>>>>>> >                 “*Source”-based IOs or the wrappers should
>>>>>>> completely
>>>>>>> >                 smooth this transition?
>>>>>>> >                 Do we need to require to create new IOs only based
>>>>>>> on
>>>>>>> >                 SDF or again, the wrappers should help to avoid
>>>>>>> this?
>>>>>>> >
>>>>>>> >>                 On 10 Aug 2020, at 22:59, Luke Cwik <
>>>>>>> lcwik@google.com
>>>>>>> >>                 <ma...@google.com>> wrote:
>>>>>>> >>
>>>>>>> >>                 In the past couple of months wrappers[1, 2] have
>>>>>>> been
>>>>>>> >>                 added to the Beam Java SDK which can execute
>>>>>>> >>                 BoundedSource and UnboundedSource as Splittable
>>>>>>> DoFns.
>>>>>>> >>                 These have been opt-out for portable pipelines
>>>>>>> (e.g.
>>>>>>> >>                 Dataflow runner v2, XLang pipelines on
>>>>>>> Flink/Spark)
>>>>>>> >>                 and opt-in using an experiment for all other
>>>>>>> pipelines.
>>>>>>> >>
>>>>>>> >>                 I would like to start making the non-portable
>>>>>>> >>                 pipelines starting with the DirectRunner[3] to be
>>>>>>> >>                 opt-out with the plan that eventually all runners
>>>>>>> will
>>>>>>> >>                 only execute splittable DoFns and the
>>>>>>> >>                 BoundedSource/UnboundedSource specific execution
>>>>>>> logic
>>>>>>> >>                 from the runners will be removed.
>>>>>>> >>
>>>>>>> >>                 Users will be able to opt-in any pipeline using
>>>>>>> the
>>>>>>> >>                 experiment 'use_sdf_read' and opt-out with the
>>>>>>> >>                 experiment 'use_deprecated_read'. (For portable
>>>>>>> >>                 pipelines these experiments were 'beam_fn_api' and
>>>>>>> >>                 'beam_fn_api_use_deprecated_read' respectively
>>>>>>> and I
>>>>>>> >>                 have added these two additional aliases to make
>>>>>>> the
>>>>>>> >>                 experience less confusing).
>>>>>>> >>
>>>>>>> >>                 1:
>>>>>>> >>
>>>>>>> https://github.com/apache/beam/blob/af1ce643d8fde5352d4519a558de4a2dfd24721d/sdks/java/core/src/main/java/org/apache/beam/sdk/io/Read.java#L275
>>>>>>> >>                 2:
>>>>>>> >>
>>>>>>> https://github.com/apache/beam/blob/af1ce643d8fde5352d4519a558de4a2dfd24721d/sdks/java/core/src/main/java/org/apache/beam/sdk/io/Read.java#L449
>>>>>>> >>                 3: https://github.com/apache/beam/pull/12519
>>>>>>> >
>>>>>>> >
>>>>>>> >
>>>>>>> >     --
>>>>>>> >     Pulasthi S. Wickramasinghe
>>>>>>> >     PhD Candidate  | Research Assistant
>>>>>>> >     School of Informatics and Computing | Digital Science Center
>>>>>>> >     Indiana University, Bloomington
>>>>>>> >     cell: 224-386-9035 <(224)%20386-9035> <tel:(224)%20386-9035>
>>>>>>> >
>>>>>>>
>>>>>>
>>>

Re: [DISCUSS][BEAM-10670] Migrating BoundedSource/UnboundedSource to execute as a Splittable DoFn for non-portable Java runners

Posted by Luke Cwik <lc...@google.com>.
+Rose Nguyen <rt...@google.com> suggested that instead of just a blog,
we should add the majority of the current blog's content to the core
programming guide and either drop the blog and/or have a much smaller blog
that links to the docs.

I think this is a great idea, what do others think?

On Wed, Oct 14, 2020 at 10:51 AM Luke Cwik <lc...@google.com> wrote:

> Thanks Alexey, that is correct.
>
> On Wed, Oct 14, 2020 at 10:33 AM Alexey Romanenko <
> aromanenko.dev@gmail.com> wrote:
>
>> Thanks Luke, just I guess that the proper link should be this one:
>>
>> https://docs.google.com/document/d/1kpn0RxqZaoacUPVSMYhhnfmlo8fGT-p50fEblaFr2HE
>>
>> On 13 Oct 2020, at 00:23, Luke Cwik <lc...@google.com> wrote:
>>
>> I have a draft[1] off the blog ready. Please take a look.
>>
>> 1:
>> http://doc/1kpn0RxqZaoacUPVSMYhhnfmlo8fGT-p50fEblaFr2HE#heading=h.tbab2n97o3eo
>>
>> On Mon, Oct 5, 2020 at 4:28 PM Luke Cwik <lc...@google.com> wrote:
>>
>>>
>>>
>>> On Mon, Oct 5, 2020 at 3:45 PM Kenneth Knowles <ke...@apache.org> wrote:
>>>
>>>>
>>>>
>>>> On Mon, Oct 5, 2020 at 2:44 PM Luke Cwik <lc...@google.com> wrote:
>>>>
>>>>> For the 2.25 release the Java Direct, Flink, Jet, Samza, Twister2 will
>>>>> use SDF powered Read transforms. Users can opt-out
>>>>> with --experiments=use_deprecated_read.
>>>>>
>>>>
>>>> Huzzah! In our release notes maybe be clear about the expectations for
>>>> users:
>>>>
>>>> Done in https://github.com/apache/beam/pull/13015
>>>
>>>
>>>>  - semantics are expected to be the same: file bugs for any change in
>>>> results
>>>>  - perf may vary: file bugs or write to user@
>>>>
>>>> I was unable to get Spark done for 2.25 as I found out that Spark
>>>>> streaming doesn't support watermark holds[1]. If someone knows more about
>>>>> the watermark system in Spark I could use some guidance here as I believe I
>>>>> have a version of unbounded SDF support written for Spark (I get all the
>>>>> expected output from tests, just that watermarks aren't being held back so
>>>>> PAssert fails).
>>>>>
>>>>
>>>> Spark's watermarks are not comparable to Beam's. The rule as I
>>>> understand it is that any data that is later than `max(seen timestamps) -
>>>> allowedLateness` is dropped. One difference is that dropping is relative to
>>>> the watermark instead of expiring windows, like early versions of Beam. The
>>>> other difference is that it track the latest event (some call it a "high
>>>> water mark" because it is the highest datetime value seen) where Beam's
>>>> watermark is an approximation of the earliest (some call it a "low water
>>>> mark" because it is a guarantee that it will not dip lower). When I chatted
>>>> about this with Amit in the early days, it was necessary to implement a
>>>> Beam-style watermark using Spark state. I think that may still be the case,
>>>> for correct results.
>>>>
>>>>
>>> In the Spark implementation I saw that watermark holds weren't wired at
>>> all to control Sparks watermarks and this was causing triggers to fire too
>>> early.
>>>
>>>
>>>> Also, I started a doc[2] to produce an updated blog post since the
>>>>> original SplittableDoFn blog from 2017 is out of date[3]. I was thinking of
>>>>> making this a new blog post and having the old blog post point to it. We
>>>>> could also remove the old blog post and or update it. Any thoughts?
>>>>>
>>>>
>>>> New blog post w/ pointer from the old one.
>>>>
>>>> Finally, I have a clean-up PR[4] that pushes the Read -> primitive Read
>>>>> expansion into each of the runners instead of having it within Read
>>>>> transform within beam-sdks-java-core.
>>>>>
>>>>
>>>> Approved! I did CC a bunch of runner authors already. I think the
>>>> important thing is if a default changes we should be sure everyone is OK
>>>> with the perf changes, and everyone is confident that no incorrect results
>>>> are produced. The abstractions between sdk-core, runners-core-*, and
>>>> individual runners is important to me:
>>>>
>>>>  - The SDK's job is to produce a portable, un-tweaked pipeline so
>>>> moving flags out of SDK core (and IOs) ASAP is super important.
>>>>  - The runner's job is to execute that pipeline, if they can, however
>>>> they want. If a runner wants to run Read transforms differently/directly
>>>> that is fine. If a runner is incapable of supporting SDF, then Read is
>>>> better than nothing. Etc.
>>>>  - The runners-core-* job is to just be internal libraries for runner
>>>> authors to share code, and should not make any decisions about the Beam
>>>> model, etc.
>>>>
>>>> Kenn
>>>>
>>>> 1: https://github.com/apache/beam/pull/12603
>>>>> 2: http://doc/1kpn0RxqZaoacUPVSMYhhnfmlo8fGT-p50fEblaFr2HE
>>>>> 3: https://beam.apache.org/blog/splittable-do-fn/
>>>>> 4: https://github.com/apache/beam/pull/13006
>>>>>
>>>>>
>>>>> On Fri, Aug 28, 2020 at 1:45 AM Maximilian Michels <mx...@apache.org>
>>>>> wrote:
>>>>>
>>>>>> Thanks Luke! I've had a pass.
>>>>>>
>>>>>> -Max
>>>>>>
>>>>>> On 28.08.20 01:22, Luke Cwik wrote:
>>>>>> > As an update.
>>>>>> >
>>>>>> > Direct and Twister2 are done.
>>>>>> > Samza: is ready for review[1].
>>>>>> > Flink: is almost ready for review. [2] lays all the groundwork for
>>>>>> the
>>>>>> > migration and [3] finishes the migration (there is a timeout
>>>>>> happening
>>>>>> > in FlinkSubmissionTest that I'm trying to figure out).
>>>>>> > No further updates on Spark[4] or Jet[5].
>>>>>> >
>>>>>> > @Maximilian Michels <ma...@apache.org> or @thw@apache.org
>>>>>> > <ma...@gmail.com>, can either of you take a look at
>>>>>> the
>>>>>> > Flink PRs?
>>>>>> > @ke.wu.cs@icloud.com <ma...@icloud.com>, Since Xinyu
>>>>>> delegated
>>>>>> > to you, can you take another look at the Samza PR?
>>>>>> >
>>>>>> > 1: https://github.com/apache/beam/pull/12617
>>>>>> > 2: https://github.com/apache/beam/pull/12706
>>>>>> > 3: https://github.com/apache/beam/pull/12708
>>>>>> > 4: https://github.com/apache/beam/pull/12603
>>>>>> > 5: https://github.com/apache/beam/pull/12616
>>>>>> >
>>>>>> > On Tue, Aug 18, 2020 at 11:42 AM Pulasthi Supun Wickramasinghe
>>>>>> > <pulasthi911@gmail.com <ma...@gmail.com>> wrote:
>>>>>> >
>>>>>> >     Hi Luke
>>>>>> >
>>>>>> >     Will take a look at this as soon as possible and get back to
>>>>>> you.
>>>>>> >
>>>>>> >     Best Regards,
>>>>>> >     Pulasthi
>>>>>> >
>>>>>> >     On Tue, Aug 18, 2020 at 2:30 PM Luke Cwik <lcwik@google.com
>>>>>> >     <ma...@google.com>> wrote:
>>>>>> >
>>>>>> >         I have made some good progress here and have gotten to the
>>>>>> >         following state for non-portable runners:
>>>>>> >
>>>>>> >         DirectRunner[1]: Merged. Supports Read.Bounded and
>>>>>> Read.Unbounded.
>>>>>> >         Twister2[2]: Ready for review. Supports Read.Bounded, the
>>>>>> >         current runner doesn't support unbounded pipelines.
>>>>>> >         Spark[3]: WIP. Supports Read.Bounded, Nexmark suite passes.
>>>>>> Not
>>>>>> >         certain about level of unbounded pipeline support coverage
>>>>>> since
>>>>>> >         Spark uses its own tiny suite of tests to get unbounded
>>>>>> pipeline
>>>>>> >         coverage instead of the validates runner set.
>>>>>> >         Jet[4]: WIP. Supports Read.Bounded. Read.Unbounded
>>>>>> definitely
>>>>>> >         needs additional work.
>>>>>> >         Sazma[5]: WIP. Supports Read.Bounded. Not certain about
>>>>>> level of
>>>>>> >         unbounded pipeline support coverage since Spark uses its own
>>>>>> >         tiny suite of tests to get unbounded pipeline coverage
>>>>>> instead
>>>>>> >         of the validates runner set.
>>>>>> >         Flink: Unstarted.
>>>>>> >
>>>>>> >         @Pulasthi Supun Wickramasinghe <mailto:
>>>>>> pulasthi911@gmail.com> ,
>>>>>> >         can you help me with the Twister2 PR[2]?
>>>>>> >         @Ismaël Mejía <ma...@gmail.com>, is PR[3] the
>>>>>> expected
>>>>>> >         level of support for unbounded pipelines and hence ready
>>>>>> for review?
>>>>>> >         @Jozsef Bartok <ma...@hazelcast.com>, can you help
>>>>>> me out
>>>>>> >         to get support for unbounded splittable DoFn's into Jet[4]?
>>>>>> >         @Xinyu Liu <ma...@gmail.com>, is PR[5] the
>>>>>> expected
>>>>>> >         level of support for unbounded pipelines and hence ready
>>>>>> for review?
>>>>>> >
>>>>>> >         1: https://github.com/apache/beam/pull/12519
>>>>>> >         2: https://github.com/apache/beam/pull/12594
>>>>>> >         3: https://github.com/apache/beam/pull/12603
>>>>>> >         4: https://github.com/apache/beam/pull/12616
>>>>>> >         5: https://github.com/apache/beam/pull/12617
>>>>>> >
>>>>>> >         On Tue, Aug 11, 2020 at 10:55 AM Luke Cwik <
>>>>>> lcwik@google.com
>>>>>> >         <ma...@google.com>> wrote:
>>>>>> >
>>>>>> >             There shouldn't be any changes required since the
>>>>>> wrapper
>>>>>> >             will smoothly transition the execution to be run as an
>>>>>> SDF.
>>>>>> >             New IOs should strongly prefer to use SDF since it
>>>>>> should be
>>>>>> >             simpler to write and will be more flexible but they can
>>>>>> use
>>>>>> >             the "*Source"-based APIs. Eventually we'll deprecate the
>>>>>> >             APIs but we will never stop supporting them. Eventually
>>>>>> they
>>>>>> >             should all be migrated to use SDF and if there is
>>>>>> another
>>>>>> >             major Beam version, we'll finally be able to remove
>>>>>> them.
>>>>>> >
>>>>>> >             On Tue, Aug 11, 2020 at 8:40 AM Alexey Romanenko
>>>>>> >             <aromanenko.dev@gmail.com <mailto:
>>>>>> aromanenko.dev@gmail.com>>
>>>>>> >             wrote:
>>>>>> >
>>>>>> >                 Hi Luke,
>>>>>> >
>>>>>> >                 Great to hear about such progress on this!
>>>>>> >
>>>>>> >                 Talking about opt-out for all runners in the future,
>>>>>> >                 will it require any code changes for current
>>>>>> >                 “*Source”-based IOs or the wrappers should
>>>>>> completely
>>>>>> >                 smooth this transition?
>>>>>> >                 Do we need to require to create new IOs only based
>>>>>> on
>>>>>> >                 SDF or again, the wrappers should help to avoid
>>>>>> this?
>>>>>> >
>>>>>> >>                 On 10 Aug 2020, at 22:59, Luke Cwik <
>>>>>> lcwik@google.com
>>>>>> >>                 <ma...@google.com>> wrote:
>>>>>> >>
>>>>>> >>                 In the past couple of months wrappers[1, 2] have
>>>>>> been
>>>>>> >>                 added to the Beam Java SDK which can execute
>>>>>> >>                 BoundedSource and UnboundedSource as Splittable
>>>>>> DoFns.
>>>>>> >>                 These have been opt-out for portable pipelines
>>>>>> (e.g.
>>>>>> >>                 Dataflow runner v2, XLang pipelines on Flink/Spark)
>>>>>> >>                 and opt-in using an experiment for all other
>>>>>> pipelines.
>>>>>> >>
>>>>>> >>                 I would like to start making the non-portable
>>>>>> >>                 pipelines starting with the DirectRunner[3] to be
>>>>>> >>                 opt-out with the plan that eventually all runners
>>>>>> will
>>>>>> >>                 only execute splittable DoFns and the
>>>>>> >>                 BoundedSource/UnboundedSource specific execution
>>>>>> logic
>>>>>> >>                 from the runners will be removed.
>>>>>> >>
>>>>>> >>                 Users will be able to opt-in any pipeline using the
>>>>>> >>                 experiment 'use_sdf_read' and opt-out with the
>>>>>> >>                 experiment 'use_deprecated_read'. (For portable
>>>>>> >>                 pipelines these experiments were 'beam_fn_api' and
>>>>>> >>                 'beam_fn_api_use_deprecated_read' respectively and
>>>>>> I
>>>>>> >>                 have added these two additional aliases to make the
>>>>>> >>                 experience less confusing).
>>>>>> >>
>>>>>> >>                 1:
>>>>>> >>
>>>>>> https://github.com/apache/beam/blob/af1ce643d8fde5352d4519a558de4a2dfd24721d/sdks/java/core/src/main/java/org/apache/beam/sdk/io/Read.java#L275
>>>>>> >>                 2:
>>>>>> >>
>>>>>> https://github.com/apache/beam/blob/af1ce643d8fde5352d4519a558de4a2dfd24721d/sdks/java/core/src/main/java/org/apache/beam/sdk/io/Read.java#L449
>>>>>> >>                 3: https://github.com/apache/beam/pull/12519
>>>>>> >
>>>>>> >
>>>>>> >
>>>>>> >     --
>>>>>> >     Pulasthi S. Wickramasinghe
>>>>>> >     PhD Candidate  | Research Assistant
>>>>>> >     School of Informatics and Computing | Digital Science Center
>>>>>> >     Indiana University, Bloomington
>>>>>> >     cell: 224-386-9035 <(224)%20386-9035> <tel:(224)%20386-9035>
>>>>>> >
>>>>>>
>>>>>
>>

Re: [DISCUSS][BEAM-10670] Migrating BoundedSource/UnboundedSource to execute as a Splittable DoFn for non-portable Java runners

Posted by Luke Cwik <lc...@google.com>.
Thanks Alexey, that is correct.

On Wed, Oct 14, 2020 at 10:33 AM Alexey Romanenko <ar...@gmail.com>
wrote:

> Thanks Luke, just I guess that the proper link should be this one:
>
> https://docs.google.com/document/d/1kpn0RxqZaoacUPVSMYhhnfmlo8fGT-p50fEblaFr2HE
>
> On 13 Oct 2020, at 00:23, Luke Cwik <lc...@google.com> wrote:
>
> I have a draft[1] off the blog ready. Please take a look.
>
> 1:
> http://doc/1kpn0RxqZaoacUPVSMYhhnfmlo8fGT-p50fEblaFr2HE#heading=h.tbab2n97o3eo
>
> On Mon, Oct 5, 2020 at 4:28 PM Luke Cwik <lc...@google.com> wrote:
>
>>
>>
>> On Mon, Oct 5, 2020 at 3:45 PM Kenneth Knowles <ke...@apache.org> wrote:
>>
>>>
>>>
>>> On Mon, Oct 5, 2020 at 2:44 PM Luke Cwik <lc...@google.com> wrote:
>>>
>>>> For the 2.25 release the Java Direct, Flink, Jet, Samza, Twister2 will
>>>> use SDF powered Read transforms. Users can opt-out
>>>> with --experiments=use_deprecated_read.
>>>>
>>>
>>> Huzzah! In our release notes maybe be clear about the expectations for
>>> users:
>>>
>>> Done in https://github.com/apache/beam/pull/13015
>>
>>
>>>  - semantics are expected to be the same: file bugs for any change in
>>> results
>>>  - perf may vary: file bugs or write to user@
>>>
>>> I was unable to get Spark done for 2.25 as I found out that Spark
>>>> streaming doesn't support watermark holds[1]. If someone knows more about
>>>> the watermark system in Spark I could use some guidance here as I believe I
>>>> have a version of unbounded SDF support written for Spark (I get all the
>>>> expected output from tests, just that watermarks aren't being held back so
>>>> PAssert fails).
>>>>
>>>
>>> Spark's watermarks are not comparable to Beam's. The rule as I
>>> understand it is that any data that is later than `max(seen timestamps) -
>>> allowedLateness` is dropped. One difference is that dropping is relative to
>>> the watermark instead of expiring windows, like early versions of Beam. The
>>> other difference is that it track the latest event (some call it a "high
>>> water mark" because it is the highest datetime value seen) where Beam's
>>> watermark is an approximation of the earliest (some call it a "low water
>>> mark" because it is a guarantee that it will not dip lower). When I chatted
>>> about this with Amit in the early days, it was necessary to implement a
>>> Beam-style watermark using Spark state. I think that may still be the case,
>>> for correct results.
>>>
>>>
>> In the Spark implementation I saw that watermark holds weren't wired at
>> all to control Sparks watermarks and this was causing triggers to fire too
>> early.
>>
>>
>>> Also, I started a doc[2] to produce an updated blog post since the
>>>> original SplittableDoFn blog from 2017 is out of date[3]. I was thinking of
>>>> making this a new blog post and having the old blog post point to it. We
>>>> could also remove the old blog post and or update it. Any thoughts?
>>>>
>>>
>>> New blog post w/ pointer from the old one.
>>>
>>> Finally, I have a clean-up PR[4] that pushes the Read -> primitive Read
>>>> expansion into each of the runners instead of having it within Read
>>>> transform within beam-sdks-java-core.
>>>>
>>>
>>> Approved! I did CC a bunch of runner authors already. I think the
>>> important thing is if a default changes we should be sure everyone is OK
>>> with the perf changes, and everyone is confident that no incorrect results
>>> are produced. The abstractions between sdk-core, runners-core-*, and
>>> individual runners is important to me:
>>>
>>>  - The SDK's job is to produce a portable, un-tweaked pipeline so moving
>>> flags out of SDK core (and IOs) ASAP is super important.
>>>  - The runner's job is to execute that pipeline, if they can, however
>>> they want. If a runner wants to run Read transforms differently/directly
>>> that is fine. If a runner is incapable of supporting SDF, then Read is
>>> better than nothing. Etc.
>>>  - The runners-core-* job is to just be internal libraries for runner
>>> authors to share code, and should not make any decisions about the Beam
>>> model, etc.
>>>
>>> Kenn
>>>
>>> 1: https://github.com/apache/beam/pull/12603
>>>> 2: http://doc/1kpn0RxqZaoacUPVSMYhhnfmlo8fGT-p50fEblaFr2HE
>>>> 3: https://beam.apache.org/blog/splittable-do-fn/
>>>> 4: https://github.com/apache/beam/pull/13006
>>>>
>>>>
>>>> On Fri, Aug 28, 2020 at 1:45 AM Maximilian Michels <mx...@apache.org>
>>>> wrote:
>>>>
>>>>> Thanks Luke! I've had a pass.
>>>>>
>>>>> -Max
>>>>>
>>>>> On 28.08.20 01:22, Luke Cwik wrote:
>>>>> > As an update.
>>>>> >
>>>>> > Direct and Twister2 are done.
>>>>> > Samza: is ready for review[1].
>>>>> > Flink: is almost ready for review. [2] lays all the groundwork for
>>>>> the
>>>>> > migration and [3] finishes the migration (there is a timeout
>>>>> happening
>>>>> > in FlinkSubmissionTest that I'm trying to figure out).
>>>>> > No further updates on Spark[4] or Jet[5].
>>>>> >
>>>>> > @Maximilian Michels <ma...@apache.org> or @thw@apache.org
>>>>> > <ma...@gmail.com>, can either of you take a look at
>>>>> the
>>>>> > Flink PRs?
>>>>> > @ke.wu.cs@icloud.com <ma...@icloud.com>, Since Xinyu
>>>>> delegated
>>>>> > to you, can you take another look at the Samza PR?
>>>>> >
>>>>> > 1: https://github.com/apache/beam/pull/12617
>>>>> > 2: https://github.com/apache/beam/pull/12706
>>>>> > 3: https://github.com/apache/beam/pull/12708
>>>>> > 4: https://github.com/apache/beam/pull/12603
>>>>> > 5: https://github.com/apache/beam/pull/12616
>>>>> >
>>>>> > On Tue, Aug 18, 2020 at 11:42 AM Pulasthi Supun Wickramasinghe
>>>>> > <pulasthi911@gmail.com <ma...@gmail.com>> wrote:
>>>>> >
>>>>> >     Hi Luke
>>>>> >
>>>>> >     Will take a look at this as soon as possible and get back to you.
>>>>> >
>>>>> >     Best Regards,
>>>>> >     Pulasthi
>>>>> >
>>>>> >     On Tue, Aug 18, 2020 at 2:30 PM Luke Cwik <lcwik@google.com
>>>>> >     <ma...@google.com>> wrote:
>>>>> >
>>>>> >         I have made some good progress here and have gotten to the
>>>>> >         following state for non-portable runners:
>>>>> >
>>>>> >         DirectRunner[1]: Merged. Supports Read.Bounded and
>>>>> Read.Unbounded.
>>>>> >         Twister2[2]: Ready for review. Supports Read.Bounded, the
>>>>> >         current runner doesn't support unbounded pipelines.
>>>>> >         Spark[3]: WIP. Supports Read.Bounded, Nexmark suite passes.
>>>>> Not
>>>>> >         certain about level of unbounded pipeline support coverage
>>>>> since
>>>>> >         Spark uses its own tiny suite of tests to get unbounded
>>>>> pipeline
>>>>> >         coverage instead of the validates runner set.
>>>>> >         Jet[4]: WIP. Supports Read.Bounded. Read.Unbounded definitely
>>>>> >         needs additional work.
>>>>> >         Sazma[5]: WIP. Supports Read.Bounded. Not certain about
>>>>> level of
>>>>> >         unbounded pipeline support coverage since Spark uses its own
>>>>> >         tiny suite of tests to get unbounded pipeline coverage
>>>>> instead
>>>>> >         of the validates runner set.
>>>>> >         Flink: Unstarted.
>>>>> >
>>>>> >         @Pulasthi Supun Wickramasinghe <mailto:pulasthi911@gmail.com
>>>>> > ,
>>>>> >         can you help me with the Twister2 PR[2]?
>>>>> >         @Ismaël Mejía <ma...@gmail.com>, is PR[3] the
>>>>> expected
>>>>> >         level of support for unbounded pipelines and hence ready for
>>>>> review?
>>>>> >         @Jozsef Bartok <ma...@hazelcast.com>, can you help
>>>>> me out
>>>>> >         to get support for unbounded splittable DoFn's into Jet[4]?
>>>>> >         @Xinyu Liu <ma...@gmail.com>, is PR[5] the
>>>>> expected
>>>>> >         level of support for unbounded pipelines and hence ready for
>>>>> review?
>>>>> >
>>>>> >         1: https://github.com/apache/beam/pull/12519
>>>>> >         2: https://github.com/apache/beam/pull/12594
>>>>> >         3: https://github.com/apache/beam/pull/12603
>>>>> >         4: https://github.com/apache/beam/pull/12616
>>>>> >         5: https://github.com/apache/beam/pull/12617
>>>>> >
>>>>> >         On Tue, Aug 11, 2020 at 10:55 AM Luke Cwik <lcwik@google.com
>>>>> >         <ma...@google.com>> wrote:
>>>>> >
>>>>> >             There shouldn't be any changes required since the wrapper
>>>>> >             will smoothly transition the execution to be run as an
>>>>> SDF.
>>>>> >             New IOs should strongly prefer to use SDF since it
>>>>> should be
>>>>> >             simpler to write and will be more flexible but they can
>>>>> use
>>>>> >             the "*Source"-based APIs. Eventually we'll deprecate the
>>>>> >             APIs but we will never stop supporting them. Eventually
>>>>> they
>>>>> >             should all be migrated to use SDF and if there is another
>>>>> >             major Beam version, we'll finally be able to remove them.
>>>>> >
>>>>> >             On Tue, Aug 11, 2020 at 8:40 AM Alexey Romanenko
>>>>> >             <aromanenko.dev@gmail.com <mailto:
>>>>> aromanenko.dev@gmail.com>>
>>>>> >             wrote:
>>>>> >
>>>>> >                 Hi Luke,
>>>>> >
>>>>> >                 Great to hear about such progress on this!
>>>>> >
>>>>> >                 Talking about opt-out for all runners in the future,
>>>>> >                 will it require any code changes for current
>>>>> >                 “*Source”-based IOs or the wrappers should completely
>>>>> >                 smooth this transition?
>>>>> >                 Do we need to require to create new IOs only based on
>>>>> >                 SDF or again, the wrappers should help to avoid this?
>>>>> >
>>>>> >>                 On 10 Aug 2020, at 22:59, Luke Cwik <
>>>>> lcwik@google.com
>>>>> >>                 <ma...@google.com>> wrote:
>>>>> >>
>>>>> >>                 In the past couple of months wrappers[1, 2] have
>>>>> been
>>>>> >>                 added to the Beam Java SDK which can execute
>>>>> >>                 BoundedSource and UnboundedSource as Splittable
>>>>> DoFns.
>>>>> >>                 These have been opt-out for portable pipelines (e.g.
>>>>> >>                 Dataflow runner v2, XLang pipelines on Flink/Spark)
>>>>> >>                 and opt-in using an experiment for all other
>>>>> pipelines.
>>>>> >>
>>>>> >>                 I would like to start making the non-portable
>>>>> >>                 pipelines starting with the DirectRunner[3] to be
>>>>> >>                 opt-out with the plan that eventually all runners
>>>>> will
>>>>> >>                 only execute splittable DoFns and the
>>>>> >>                 BoundedSource/UnboundedSource specific execution
>>>>> logic
>>>>> >>                 from the runners will be removed.
>>>>> >>
>>>>> >>                 Users will be able to opt-in any pipeline using the
>>>>> >>                 experiment 'use_sdf_read' and opt-out with the
>>>>> >>                 experiment 'use_deprecated_read'. (For portable
>>>>> >>                 pipelines these experiments were 'beam_fn_api' and
>>>>> >>                 'beam_fn_api_use_deprecated_read' respectively and I
>>>>> >>                 have added these two additional aliases to make the
>>>>> >>                 experience less confusing).
>>>>> >>
>>>>> >>                 1:
>>>>> >>
>>>>> https://github.com/apache/beam/blob/af1ce643d8fde5352d4519a558de4a2dfd24721d/sdks/java/core/src/main/java/org/apache/beam/sdk/io/Read.java#L275
>>>>> >>                 2:
>>>>> >>
>>>>> https://github.com/apache/beam/blob/af1ce643d8fde5352d4519a558de4a2dfd24721d/sdks/java/core/src/main/java/org/apache/beam/sdk/io/Read.java#L449
>>>>> >>                 3: https://github.com/apache/beam/pull/12519
>>>>> >
>>>>> >
>>>>> >
>>>>> >     --
>>>>> >     Pulasthi S. Wickramasinghe
>>>>> >     PhD Candidate  | Research Assistant
>>>>> >     School of Informatics and Computing | Digital Science Center
>>>>> >     Indiana University, Bloomington
>>>>> >     cell: 224-386-9035 <(224)%20386-9035> <tel:(224)%20386-9035>
>>>>> >
>>>>>
>>>>
>

Re: [DISCUSS][BEAM-10670] Migrating BoundedSource/UnboundedSource to execute as a Splittable DoFn for non-portable Java runners

Posted by Alexey Romanenko <ar...@gmail.com>.
Thanks Luke, just I guess that the proper link should be this one:
https://docs.google.com/document/d/1kpn0RxqZaoacUPVSMYhhnfmlo8fGT-p50fEblaFr2HE

> On 13 Oct 2020, at 00:23, Luke Cwik <lc...@google.com> wrote:
> 
> I have a draft[1] off the blog ready. Please take a look.
> 
> 1: http://doc/1kpn0RxqZaoacUPVSMYhhnfmlo8fGT-p50fEblaFr2HE#heading=h.tbab2n97o3eo <http://doc/1kpn0RxqZaoacUPVSMYhhnfmlo8fGT-p50fEblaFr2HE#heading=h.tbab2n97o3eo>
> On Mon, Oct 5, 2020 at 4:28 PM Luke Cwik <lcwik@google.com <ma...@google.com>> wrote:
> 
> 
> On Mon, Oct 5, 2020 at 3:45 PM Kenneth Knowles <kenn@apache.org <ma...@apache.org>> wrote:
> 
> 
> On Mon, Oct 5, 2020 at 2:44 PM Luke Cwik <lcwik@google.com <ma...@google.com>> wrote:
> For the 2.25 release the Java Direct, Flink, Jet, Samza, Twister2 will use SDF powered Read transforms. Users can opt-out with --experiments=use_deprecated_read.
> 
> Huzzah! In our release notes maybe be clear about the expectations for users:
> 
> Done in https://github.com/apache/beam/pull/13015 <https://github.com/apache/beam/pull/13015>
>  
>  - semantics are expected to be the same: file bugs for any change in results
>  - perf may vary: file bugs or write to user@
> 
> I was unable to get Spark done for 2.25 as I found out that Spark streaming doesn't support watermark holds[1]. If someone knows more about the watermark system in Spark I could use some guidance here as I believe I have a version of unbounded SDF support written for Spark (I get all the expected output from tests, just that watermarks aren't being held back so PAssert fails).
> 
> Spark's watermarks are not comparable to Beam's. The rule as I understand it is that any data that is later than `max(seen timestamps) - allowedLateness` is dropped. One difference is that dropping is relative to the watermark instead of expiring windows, like early versions of Beam. The other difference is that it track the latest event (some call it a "high water mark" because it is the highest datetime value seen) where Beam's watermark is an approximation of the earliest (some call it a "low water mark" because it is a guarantee that it will not dip lower). When I chatted about this with Amit in the early days, it was necessary to implement a Beam-style watermark using Spark state. I think that may still be the case, for correct results.
> 
> 
> In the Spark implementation I saw that watermark holds weren't wired at all to control Sparks watermarks and this was causing triggers to fire too early.
>  
> Also, I started a doc[2] to produce an updated blog post since the original SplittableDoFn blog from 2017 is out of date[3]. I was thinking of making this a new blog post and having the old blog post point to it. We could also remove the old blog post and or update it. Any thoughts?
> 
> New blog post w/ pointer from the old one.
> 
> Finally, I have a clean-up PR[4] that pushes the Read -> primitive Read expansion into each of the runners instead of having it within Read transform within beam-sdks-java-core.
> 
> Approved! I did CC a bunch of runner authors already. I think the important thing is if a default changes we should be sure everyone is OK with the perf changes, and everyone is confident that no incorrect results are produced. The abstractions between sdk-core, runners-core-*, and individual runners is important to me:
> 
>  - The SDK's job is to produce a portable, un-tweaked pipeline so moving flags out of SDK core (and IOs) ASAP is super important.
>  - The runner's job is to execute that pipeline, if they can, however they want. If a runner wants to run Read transforms differently/directly that is fine. If a runner is incapable of supporting SDF, then Read is better than nothing. Etc.
>  - The runners-core-* job is to just be internal libraries for runner authors to share code, and should not make any decisions about the Beam model, etc.
> 
> Kenn
> 
> 1: https://github.com/apache/beam/pull/12603 <https://github.com/apache/beam/pull/12603>
> 2: http://doc/1kpn0RxqZaoacUPVSMYhhnfmlo8fGT-p50fEblaFr2HE <http://doc/1kpn0RxqZaoacUPVSMYhhnfmlo8fGT-p50fEblaFr2HE>
> 3: https://beam.apache.org/blog/splittable-do-fn/ <https://beam.apache.org/blog/splittable-do-fn/>
> 4: https://github.com/apache/beam/pull/13006 <https://github.com/apache/beam/pull/13006>
> 
> 
> On Fri, Aug 28, 2020 at 1:45 AM Maximilian Michels <mxm@apache.org <ma...@apache.org>> wrote:
> Thanks Luke! I've had a pass.
> 
> -Max
> 
> On 28.08.20 01:22, Luke Cwik wrote:
> > As an update.
> > 
> > Direct and Twister2 are done.
> > Samza: is ready for review[1].
> > Flink: is almost ready for review. [2] lays all the groundwork for the 
> > migration and [3] finishes the migration (there is a timeout happening 
> > in FlinkSubmissionTest that I'm trying to figure out).
> > No further updates on Spark[4] or Jet[5].
> > 
> > @Maximilian Michels <mailto:mxm@apache.org <ma...@apache.org>> or @thw@apache.org <ma...@apache.org> 
> > <mailto:thomas.weise@gmail.com <ma...@gmail.com>>, can either of you take a look at the 
> > Flink PRs?
> > @ke.wu.cs@icloud.com <ma...@icloud.com> <mailto:ke.wu.cs@icloud.com <ma...@icloud.com>>, Since Xinyu delegated 
> > to you, can you take another look at the Samza PR?
> > 
> > 1: https://github.com/apache/beam/pull/12617 <https://github.com/apache/beam/pull/12617>
> > 2: https://github.com/apache/beam/pull/12706 <https://github.com/apache/beam/pull/12706>
> > 3: https://github.com/apache/beam/pull/12708 <https://github.com/apache/beam/pull/12708>
> > 4: https://github.com/apache/beam/pull/12603 <https://github.com/apache/beam/pull/12603>
> > 5: https://github.com/apache/beam/pull/12616 <https://github.com/apache/beam/pull/12616>
> > 
> > On Tue, Aug 18, 2020 at 11:42 AM Pulasthi Supun Wickramasinghe 
> > <pulasthi911@gmail.com <ma...@gmail.com> <mailto:pulasthi911@gmail.com <ma...@gmail.com>>> wrote:
> > 
> >     Hi Luke
> > 
> >     Will take a look at this as soon as possible and get back to you.
> > 
> >     Best Regards,
> >     Pulasthi
> > 
> >     On Tue, Aug 18, 2020 at 2:30 PM Luke Cwik <lcwik@google.com <ma...@google.com>
> >     <mailto:lcwik@google.com <ma...@google.com>>> wrote:
> > 
> >         I have made some good progress here and have gotten to the
> >         following state for non-portable runners:
> > 
> >         DirectRunner[1]: Merged. Supports Read.Bounded and Read.Unbounded.
> >         Twister2[2]: Ready for review. Supports Read.Bounded, the
> >         current runner doesn't support unbounded pipelines.
> >         Spark[3]: WIP. Supports Read.Bounded, Nexmark suite passes. Not
> >         certain about level of unbounded pipeline support coverage since
> >         Spark uses its own tiny suite of tests to get unbounded pipeline
> >         coverage instead of the validates runner set.
> >         Jet[4]: WIP. Supports Read.Bounded. Read.Unbounded definitely
> >         needs additional work.
> >         Sazma[5]: WIP. Supports Read.Bounded. Not certain about level of
> >         unbounded pipeline support coverage since Spark uses its own
> >         tiny suite of tests to get unbounded pipeline coverage instead
> >         of the validates runner set.
> >         Flink: Unstarted.
> > 
> >         @Pulasthi Supun Wickramasinghe <mailto:pulasthi911@gmail.com <ma...@gmail.com>> ,
> >         can you help me with the Twister2 PR[2]?
> >         @Ismaël Mejía <mailto:iemejia@gmail.com <ma...@gmail.com>>, is PR[3] the expected
> >         level of support for unbounded pipelines and hence ready for review?
> >         @Jozsef Bartok <mailto:jozsi@hazelcast.com <ma...@hazelcast.com>>, can you help me out
> >         to get support for unbounded splittable DoFn's into Jet[4]?
> >         @Xinyu Liu <mailto:xinyuliu.us@gmail.com <ma...@gmail.com>>, is PR[5] the expected
> >         level of support for unbounded pipelines and hence ready for review?
> > 
> >         1: https://github.com/apache/beam/pull/12519 <https://github.com/apache/beam/pull/12519>
> >         2: https://github.com/apache/beam/pull/12594 <https://github.com/apache/beam/pull/12594>
> >         3: https://github.com/apache/beam/pull/12603 <https://github.com/apache/beam/pull/12603>
> >         4: https://github.com/apache/beam/pull/12616 <https://github.com/apache/beam/pull/12616>
> >         5: https://github.com/apache/beam/pull/12617 <https://github.com/apache/beam/pull/12617>
> > 
> >         On Tue, Aug 11, 2020 at 10:55 AM Luke Cwik <lcwik@google.com <ma...@google.com>
> >         <mailto:lcwik@google.com <ma...@google.com>>> wrote:
> > 
> >             There shouldn't be any changes required since the wrapper
> >             will smoothly transition the execution to be run as an SDF.
> >             New IOs should strongly prefer to use SDF since it should be
> >             simpler to write and will be more flexible but they can use
> >             the "*Source"-based APIs. Eventually we'll deprecate the
> >             APIs but we will never stop supporting them. Eventually they
> >             should all be migrated to use SDF and if there is another
> >             major Beam version, we'll finally be able to remove them.
> > 
> >             On Tue, Aug 11, 2020 at 8:40 AM Alexey Romanenko
> >             <aromanenko.dev@gmail.com <ma...@gmail.com> <mailto:aromanenko.dev@gmail.com <ma...@gmail.com>>>
> >             wrote:
> > 
> >                 Hi Luke,
> > 
> >                 Great to hear about such progress on this!
> > 
> >                 Talking about opt-out for all runners in the future,
> >                 will it require any code changes for current
> >                 “*Source”-based IOs or the wrappers should completely
> >                 smooth this transition?
> >                 Do we need to require to create new IOs only based on
> >                 SDF or again, the wrappers should help to avoid this?
> > 
> >>                 On 10 Aug 2020, at 22:59, Luke Cwik <lcwik@google.com <ma...@google.com>
> >>                 <mailto:lcwik@google.com <ma...@google.com>>> wrote:
> >>
> >>                 In the past couple of months wrappers[1, 2] have been
> >>                 added to the Beam Java SDK which can execute
> >>                 BoundedSource and UnboundedSource as Splittable DoFns.
> >>                 These have been opt-out for portable pipelines (e.g.
> >>                 Dataflow runner v2, XLang pipelines on Flink/Spark)
> >>                 and opt-in using an experiment for all other pipelines.
> >>
> >>                 I would like to start making the non-portable
> >>                 pipelines starting with the DirectRunner[3] to be
> >>                 opt-out with the plan that eventually all runners will
> >>                 only execute splittable DoFns and the
> >>                 BoundedSource/UnboundedSource specific execution logic
> >>                 from the runners will be removed.
> >>
> >>                 Users will be able to opt-in any pipeline using the
> >>                 experiment 'use_sdf_read' and opt-out with the
> >>                 experiment 'use_deprecated_read'. (For portable
> >>                 pipelines these experiments were 'beam_fn_api' and
> >>                 'beam_fn_api_use_deprecated_read' respectively and I
> >>                 have added these two additional aliases to make the
> >>                 experience less confusing).
> >>
> >>                 1:
> >>                 https://github.com/apache/beam/blob/af1ce643d8fde5352d4519a558de4a2dfd24721d/sdks/java/core/src/main/java/org/apache/beam/sdk/io/Read.java#L275 <https://github.com/apache/beam/blob/af1ce643d8fde5352d4519a558de4a2dfd24721d/sdks/java/core/src/main/java/org/apache/beam/sdk/io/Read.java#L275>
> >>                 2:
> >>                 https://github.com/apache/beam/blob/af1ce643d8fde5352d4519a558de4a2dfd24721d/sdks/java/core/src/main/java/org/apache/beam/sdk/io/Read.java#L449 <https://github.com/apache/beam/blob/af1ce643d8fde5352d4519a558de4a2dfd24721d/sdks/java/core/src/main/java/org/apache/beam/sdk/io/Read.java#L449>
> >>                 3: https://github.com/apache/beam/pull/12519 <https://github.com/apache/beam/pull/12519>
> > 
> > 
> > 
> >     -- 
> >     Pulasthi S. Wickramasinghe
> >     PhD Candidate  | Research Assistant
> >     School of Informatics and Computing | Digital Science Center
> >     Indiana University, Bloomington
> >     cell: 224-386-9035 <tel:(224)%20386-9035> <tel:(224)%20386-9035>
> > 


Re: [DISCUSS][BEAM-10670] Migrating BoundedSource/UnboundedSource to execute as a Splittable DoFn for non-portable Java runners

Posted by Luke Cwik <lc...@google.com>.
I have a draft[1] off the blog ready. Please take a look.

1:
http://doc/1kpn0RxqZaoacUPVSMYhhnfmlo8fGT-p50fEblaFr2HE#heading=h.tbab2n97o3eo

On Mon, Oct 5, 2020 at 4:28 PM Luke Cwik <lc...@google.com> wrote:

>
>
> On Mon, Oct 5, 2020 at 3:45 PM Kenneth Knowles <ke...@apache.org> wrote:
>
>>
>>
>> On Mon, Oct 5, 2020 at 2:44 PM Luke Cwik <lc...@google.com> wrote:
>>
>>> For the 2.25 release the Java Direct, Flink, Jet, Samza, Twister2 will
>>> use SDF powered Read transforms. Users can opt-out
>>> with --experiments=use_deprecated_read.
>>>
>>
>> Huzzah! In our release notes maybe be clear about the expectations for
>> users:
>>
>> Done in https://github.com/apache/beam/pull/13015
>
>
>>  - semantics are expected to be the same: file bugs for any change in
>> results
>>  - perf may vary: file bugs or write to user@
>>
>> I was unable to get Spark done for 2.25 as I found out that Spark
>>> streaming doesn't support watermark holds[1]. If someone knows more about
>>> the watermark system in Spark I could use some guidance here as I believe I
>>> have a version of unbounded SDF support written for Spark (I get all the
>>> expected output from tests, just that watermarks aren't being held back so
>>> PAssert fails).
>>>
>>
>> Spark's watermarks are not comparable to Beam's. The rule as I understand
>> it is that any data that is later than `max(seen timestamps) -
>> allowedLateness` is dropped. One difference is that dropping is relative to
>> the watermark instead of expiring windows, like early versions of Beam. The
>> other difference is that it track the latest event (some call it a "high
>> water mark" because it is the highest datetime value seen) where Beam's
>> watermark is an approximation of the earliest (some call it a "low water
>> mark" because it is a guarantee that it will not dip lower). When I chatted
>> about this with Amit in the early days, it was necessary to implement a
>> Beam-style watermark using Spark state. I think that may still be the case,
>> for correct results.
>>
>>
> In the Spark implementation I saw that watermark holds weren't wired at
> all to control Sparks watermarks and this was causing triggers to fire too
> early.
>
>
>> Also, I started a doc[2] to produce an updated blog post since the
>>> original SplittableDoFn blog from 2017 is out of date[3]. I was thinking of
>>> making this a new blog post and having the old blog post point to it. We
>>> could also remove the old blog post and or update it. Any thoughts?
>>>
>>
>> New blog post w/ pointer from the old one.
>>
>> Finally, I have a clean-up PR[4] that pushes the Read -> primitive Read
>>> expansion into each of the runners instead of having it within Read
>>> transform within beam-sdks-java-core.
>>>
>>
>> Approved! I did CC a bunch of runner authors already. I think the
>> important thing is if a default changes we should be sure everyone is OK
>> with the perf changes, and everyone is confident that no incorrect results
>> are produced. The abstractions between sdk-core, runners-core-*, and
>> individual runners is important to me:
>>
>>  - The SDK's job is to produce a portable, un-tweaked pipeline so moving
>> flags out of SDK core (and IOs) ASAP is super important.
>>  - The runner's job is to execute that pipeline, if they can, however
>> they want. If a runner wants to run Read transforms differently/directly
>> that is fine. If a runner is incapable of supporting SDF, then Read is
>> better than nothing. Etc.
>>  - The runners-core-* job is to just be internal libraries for runner
>> authors to share code, and should not make any decisions about the Beam
>> model, etc.
>>
>> Kenn
>>
>> 1: https://github.com/apache/beam/pull/12603
>>> 2: http://doc/1kpn0RxqZaoacUPVSMYhhnfmlo8fGT-p50fEblaFr2HE
>>> 3: https://beam.apache.org/blog/splittable-do-fn/
>>> 4: https://github.com/apache/beam/pull/13006
>>>
>>>
>>> On Fri, Aug 28, 2020 at 1:45 AM Maximilian Michels <mx...@apache.org>
>>> wrote:
>>>
>>>> Thanks Luke! I've had a pass.
>>>>
>>>> -Max
>>>>
>>>> On 28.08.20 01:22, Luke Cwik wrote:
>>>> > As an update.
>>>> >
>>>> > Direct and Twister2 are done.
>>>> > Samza: is ready for review[1].
>>>> > Flink: is almost ready for review. [2] lays all the groundwork for
>>>> the
>>>> > migration and [3] finishes the migration (there is a timeout
>>>> happening
>>>> > in FlinkSubmissionTest that I'm trying to figure out).
>>>> > No further updates on Spark[4] or Jet[5].
>>>> >
>>>> > @Maximilian Michels <ma...@apache.org> or @thw@apache.org
>>>> > <ma...@gmail.com>, can either of you take a look at
>>>> the
>>>> > Flink PRs?
>>>> > @ke.wu.cs@icloud.com <ma...@icloud.com>, Since Xinyu
>>>> delegated
>>>> > to you, can you take another look at the Samza PR?
>>>> >
>>>> > 1: https://github.com/apache/beam/pull/12617
>>>> > 2: https://github.com/apache/beam/pull/12706
>>>> > 3: https://github.com/apache/beam/pull/12708
>>>> > 4: https://github.com/apache/beam/pull/12603
>>>> > 5: https://github.com/apache/beam/pull/12616
>>>> >
>>>> > On Tue, Aug 18, 2020 at 11:42 AM Pulasthi Supun Wickramasinghe
>>>> > <pulasthi911@gmail.com <ma...@gmail.com>> wrote:
>>>> >
>>>> >     Hi Luke
>>>> >
>>>> >     Will take a look at this as soon as possible and get back to you.
>>>> >
>>>> >     Best Regards,
>>>> >     Pulasthi
>>>> >
>>>> >     On Tue, Aug 18, 2020 at 2:30 PM Luke Cwik <lcwik@google.com
>>>> >     <ma...@google.com>> wrote:
>>>> >
>>>> >         I have made some good progress here and have gotten to the
>>>> >         following state for non-portable runners:
>>>> >
>>>> >         DirectRunner[1]: Merged. Supports Read.Bounded and
>>>> Read.Unbounded.
>>>> >         Twister2[2]: Ready for review. Supports Read.Bounded, the
>>>> >         current runner doesn't support unbounded pipelines.
>>>> >         Spark[3]: WIP. Supports Read.Bounded, Nexmark suite passes.
>>>> Not
>>>> >         certain about level of unbounded pipeline support coverage
>>>> since
>>>> >         Spark uses its own tiny suite of tests to get unbounded
>>>> pipeline
>>>> >         coverage instead of the validates runner set.
>>>> >         Jet[4]: WIP. Supports Read.Bounded. Read.Unbounded definitely
>>>> >         needs additional work.
>>>> >         Sazma[5]: WIP. Supports Read.Bounded. Not certain about level
>>>> of
>>>> >         unbounded pipeline support coverage since Spark uses its own
>>>> >         tiny suite of tests to get unbounded pipeline coverage instead
>>>> >         of the validates runner set.
>>>> >         Flink: Unstarted.
>>>> >
>>>> >         @Pulasthi Supun Wickramasinghe <mailto:pulasthi911@gmail.com
>>>> > ,
>>>> >         can you help me with the Twister2 PR[2]?
>>>> >         @Ismaël Mejía <ma...@gmail.com>, is PR[3] the
>>>> expected
>>>> >         level of support for unbounded pipelines and hence ready for
>>>> review?
>>>> >         @Jozsef Bartok <ma...@hazelcast.com>, can you help me
>>>> out
>>>> >         to get support for unbounded splittable DoFn's into Jet[4]?
>>>> >         @Xinyu Liu <ma...@gmail.com>, is PR[5] the
>>>> expected
>>>> >         level of support for unbounded pipelines and hence ready for
>>>> review?
>>>> >
>>>> >         1: https://github.com/apache/beam/pull/12519
>>>> >         2: https://github.com/apache/beam/pull/12594
>>>> >         3: https://github.com/apache/beam/pull/12603
>>>> >         4: https://github.com/apache/beam/pull/12616
>>>> >         5: https://github.com/apache/beam/pull/12617
>>>> >
>>>> >         On Tue, Aug 11, 2020 at 10:55 AM Luke Cwik <lcwik@google.com
>>>> >         <ma...@google.com>> wrote:
>>>> >
>>>> >             There shouldn't be any changes required since the wrapper
>>>> >             will smoothly transition the execution to be run as an
>>>> SDF.
>>>> >             New IOs should strongly prefer to use SDF since it should
>>>> be
>>>> >             simpler to write and will be more flexible but they can
>>>> use
>>>> >             the "*Source"-based APIs. Eventually we'll deprecate the
>>>> >             APIs but we will never stop supporting them. Eventually
>>>> they
>>>> >             should all be migrated to use SDF and if there is another
>>>> >             major Beam version, we'll finally be able to remove them.
>>>> >
>>>> >             On Tue, Aug 11, 2020 at 8:40 AM Alexey Romanenko
>>>> >             <aromanenko.dev@gmail.com <mailto:
>>>> aromanenko.dev@gmail.com>>
>>>> >             wrote:
>>>> >
>>>> >                 Hi Luke,
>>>> >
>>>> >                 Great to hear about such progress on this!
>>>> >
>>>> >                 Talking about opt-out for all runners in the future,
>>>> >                 will it require any code changes for current
>>>> >                 “*Source”-based IOs or the wrappers should completely
>>>> >                 smooth this transition?
>>>> >                 Do we need to require to create new IOs only based on
>>>> >                 SDF or again, the wrappers should help to avoid this?
>>>> >
>>>> >>                 On 10 Aug 2020, at 22:59, Luke Cwik <
>>>> lcwik@google.com
>>>> >>                 <ma...@google.com>> wrote:
>>>> >>
>>>> >>                 In the past couple of months wrappers[1, 2] have been
>>>> >>                 added to the Beam Java SDK which can execute
>>>> >>                 BoundedSource and UnboundedSource as Splittable
>>>> DoFns.
>>>> >>                 These have been opt-out for portable pipelines (e.g.
>>>> >>                 Dataflow runner v2, XLang pipelines on Flink/Spark)
>>>> >>                 and opt-in using an experiment for all other
>>>> pipelines.
>>>> >>
>>>> >>                 I would like to start making the non-portable
>>>> >>                 pipelines starting with the DirectRunner[3] to be
>>>> >>                 opt-out with the plan that eventually all runners
>>>> will
>>>> >>                 only execute splittable DoFns and the
>>>> >>                 BoundedSource/UnboundedSource specific execution
>>>> logic
>>>> >>                 from the runners will be removed.
>>>> >>
>>>> >>                 Users will be able to opt-in any pipeline using the
>>>> >>                 experiment 'use_sdf_read' and opt-out with the
>>>> >>                 experiment 'use_deprecated_read'. (For portable
>>>> >>                 pipelines these experiments were 'beam_fn_api' and
>>>> >>                 'beam_fn_api_use_deprecated_read' respectively and I
>>>> >>                 have added these two additional aliases to make the
>>>> >>                 experience less confusing).
>>>> >>
>>>> >>                 1:
>>>> >>
>>>> https://github.com/apache/beam/blob/af1ce643d8fde5352d4519a558de4a2dfd24721d/sdks/java/core/src/main/java/org/apache/beam/sdk/io/Read.java#L275
>>>> >>                 2:
>>>> >>
>>>> https://github.com/apache/beam/blob/af1ce643d8fde5352d4519a558de4a2dfd24721d/sdks/java/core/src/main/java/org/apache/beam/sdk/io/Read.java#L449
>>>> >>                 3: https://github.com/apache/beam/pull/12519
>>>> >
>>>> >
>>>> >
>>>> >     --
>>>> >     Pulasthi S. Wickramasinghe
>>>> >     PhD Candidate  | Research Assistant
>>>> >     School of Informatics and Computing | Digital Science Center
>>>> >     Indiana University, Bloomington
>>>> >     cell: 224-386-9035 <(224)%20386-9035> <tel:(224)%20386-9035>
>>>> >
>>>>
>>>

Re: [DISCUSS][BEAM-10670] Migrating BoundedSource/UnboundedSource to execute as a Splittable DoFn for non-portable Java runners

Posted by Luke Cwik <lc...@google.com>.
On Mon, Oct 5, 2020 at 3:45 PM Kenneth Knowles <ke...@apache.org> wrote:

>
>
> On Mon, Oct 5, 2020 at 2:44 PM Luke Cwik <lc...@google.com> wrote:
>
>> For the 2.25 release the Java Direct, Flink, Jet, Samza, Twister2 will
>> use SDF powered Read transforms. Users can opt-out
>> with --experiments=use_deprecated_read.
>>
>
> Huzzah! In our release notes maybe be clear about the expectations for
> users:
>
> Done in https://github.com/apache/beam/pull/13015


>  - semantics are expected to be the same: file bugs for any change in
> results
>  - perf may vary: file bugs or write to user@
>
> I was unable to get Spark done for 2.25 as I found out that Spark
>> streaming doesn't support watermark holds[1]. If someone knows more about
>> the watermark system in Spark I could use some guidance here as I believe I
>> have a version of unbounded SDF support written for Spark (I get all the
>> expected output from tests, just that watermarks aren't being held back so
>> PAssert fails).
>>
>
> Spark's watermarks are not comparable to Beam's. The rule as I understand
> it is that any data that is later than `max(seen timestamps) -
> allowedLateness` is dropped. One difference is that dropping is relative to
> the watermark instead of expiring windows, like early versions of Beam. The
> other difference is that it track the latest event (some call it a "high
> water mark" because it is the highest datetime value seen) where Beam's
> watermark is an approximation of the earliest (some call it a "low water
> mark" because it is a guarantee that it will not dip lower). When I chatted
> about this with Amit in the early days, it was necessary to implement a
> Beam-style watermark using Spark state. I think that may still be the case,
> for correct results.
>
>
In the Spark implementation I saw that watermark holds weren't wired at all
to control Sparks watermarks and this was causing triggers to fire too
early.


> Also, I started a doc[2] to produce an updated blog post since the
>> original SplittableDoFn blog from 2017 is out of date[3]. I was thinking of
>> making this a new blog post and having the old blog post point to it. We
>> could also remove the old blog post and or update it. Any thoughts?
>>
>
> New blog post w/ pointer from the old one.
>
> Finally, I have a clean-up PR[4] that pushes the Read -> primitive Read
>> expansion into each of the runners instead of having it within Read
>> transform within beam-sdks-java-core.
>>
>
> Approved! I did CC a bunch of runner authors already. I think the
> important thing is if a default changes we should be sure everyone is OK
> with the perf changes, and everyone is confident that no incorrect results
> are produced. The abstractions between sdk-core, runners-core-*, and
> individual runners is important to me:
>
>  - The SDK's job is to produce a portable, un-tweaked pipeline so moving
> flags out of SDK core (and IOs) ASAP is super important.
>  - The runner's job is to execute that pipeline, if they can, however they
> want. If a runner wants to run Read transforms differently/directly that is
> fine. If a runner is incapable of supporting SDF, then Read is better than
> nothing. Etc.
>  - The runners-core-* job is to just be internal libraries for runner
> authors to share code, and should not make any decisions about the Beam
> model, etc.
>
> Kenn
>
> 1: https://github.com/apache/beam/pull/12603
>> 2: http://doc/1kpn0RxqZaoacUPVSMYhhnfmlo8fGT-p50fEblaFr2HE
>> 3: https://beam.apache.org/blog/splittable-do-fn/
>> 4: https://github.com/apache/beam/pull/13006
>>
>>
>> On Fri, Aug 28, 2020 at 1:45 AM Maximilian Michels <mx...@apache.org>
>> wrote:
>>
>>> Thanks Luke! I've had a pass.
>>>
>>> -Max
>>>
>>> On 28.08.20 01:22, Luke Cwik wrote:
>>> > As an update.
>>> >
>>> > Direct and Twister2 are done.
>>> > Samza: is ready for review[1].
>>> > Flink: is almost ready for review. [2] lays all the groundwork for the
>>> > migration and [3] finishes the migration (there is a timeout happening
>>> > in FlinkSubmissionTest that I'm trying to figure out).
>>> > No further updates on Spark[4] or Jet[5].
>>> >
>>> > @Maximilian Michels <ma...@apache.org> or @thw@apache.org
>>> > <ma...@gmail.com>, can either of you take a look at the
>>> > Flink PRs?
>>> > @ke.wu.cs@icloud.com <ma...@icloud.com>, Since Xinyu
>>> delegated
>>> > to you, can you take another look at the Samza PR?
>>> >
>>> > 1: https://github.com/apache/beam/pull/12617
>>> > 2: https://github.com/apache/beam/pull/12706
>>> > 3: https://github.com/apache/beam/pull/12708
>>> > 4: https://github.com/apache/beam/pull/12603
>>> > 5: https://github.com/apache/beam/pull/12616
>>> >
>>> > On Tue, Aug 18, 2020 at 11:42 AM Pulasthi Supun Wickramasinghe
>>> > <pulasthi911@gmail.com <ma...@gmail.com>> wrote:
>>> >
>>> >     Hi Luke
>>> >
>>> >     Will take a look at this as soon as possible and get back to you.
>>> >
>>> >     Best Regards,
>>> >     Pulasthi
>>> >
>>> >     On Tue, Aug 18, 2020 at 2:30 PM Luke Cwik <lcwik@google.com
>>> >     <ma...@google.com>> wrote:
>>> >
>>> >         I have made some good progress here and have gotten to the
>>> >         following state for non-portable runners:
>>> >
>>> >         DirectRunner[1]: Merged. Supports Read.Bounded and
>>> Read.Unbounded.
>>> >         Twister2[2]: Ready for review. Supports Read.Bounded, the
>>> >         current runner doesn't support unbounded pipelines.
>>> >         Spark[3]: WIP. Supports Read.Bounded, Nexmark suite passes. Not
>>> >         certain about level of unbounded pipeline support coverage
>>> since
>>> >         Spark uses its own tiny suite of tests to get unbounded
>>> pipeline
>>> >         coverage instead of the validates runner set.
>>> >         Jet[4]: WIP. Supports Read.Bounded. Read.Unbounded definitely
>>> >         needs additional work.
>>> >         Sazma[5]: WIP. Supports Read.Bounded. Not certain about level
>>> of
>>> >         unbounded pipeline support coverage since Spark uses its own
>>> >         tiny suite of tests to get unbounded pipeline coverage instead
>>> >         of the validates runner set.
>>> >         Flink: Unstarted.
>>> >
>>> >         @Pulasthi Supun Wickramasinghe <mailto:pulasthi911@gmail.com
>>> > ,
>>> >         can you help me with the Twister2 PR[2]?
>>> >         @Ismaël Mejía <ma...@gmail.com>, is PR[3] the
>>> expected
>>> >         level of support for unbounded pipelines and hence ready for
>>> review?
>>> >         @Jozsef Bartok <ma...@hazelcast.com>, can you help me
>>> out
>>> >         to get support for unbounded splittable DoFn's into Jet[4]?
>>> >         @Xinyu Liu <ma...@gmail.com>, is PR[5] the
>>> expected
>>> >         level of support for unbounded pipelines and hence ready for
>>> review?
>>> >
>>> >         1: https://github.com/apache/beam/pull/12519
>>> >         2: https://github.com/apache/beam/pull/12594
>>> >         3: https://github.com/apache/beam/pull/12603
>>> >         4: https://github.com/apache/beam/pull/12616
>>> >         5: https://github.com/apache/beam/pull/12617
>>> >
>>> >         On Tue, Aug 11, 2020 at 10:55 AM Luke Cwik <lcwik@google.com
>>> >         <ma...@google.com>> wrote:
>>> >
>>> >             There shouldn't be any changes required since the wrapper
>>> >             will smoothly transition the execution to be run as an SDF.
>>> >             New IOs should strongly prefer to use SDF since it should
>>> be
>>> >             simpler to write and will be more flexible but they can use
>>> >             the "*Source"-based APIs. Eventually we'll deprecate the
>>> >             APIs but we will never stop supporting them. Eventually
>>> they
>>> >             should all be migrated to use SDF and if there is another
>>> >             major Beam version, we'll finally be able to remove them.
>>> >
>>> >             On Tue, Aug 11, 2020 at 8:40 AM Alexey Romanenko
>>> >             <aromanenko.dev@gmail.com <mailto:aromanenko.dev@gmail.com
>>> >>
>>> >             wrote:
>>> >
>>> >                 Hi Luke,
>>> >
>>> >                 Great to hear about such progress on this!
>>> >
>>> >                 Talking about opt-out for all runners in the future,
>>> >                 will it require any code changes for current
>>> >                 “*Source”-based IOs or the wrappers should completely
>>> >                 smooth this transition?
>>> >                 Do we need to require to create new IOs only based on
>>> >                 SDF or again, the wrappers should help to avoid this?
>>> >
>>> >>                 On 10 Aug 2020, at 22:59, Luke Cwik <lcwik@google.com
>>> >>                 <ma...@google.com>> wrote:
>>> >>
>>> >>                 In the past couple of months wrappers[1, 2] have been
>>> >>                 added to the Beam Java SDK which can execute
>>> >>                 BoundedSource and UnboundedSource as Splittable DoFns.
>>> >>                 These have been opt-out for portable pipelines (e.g.
>>> >>                 Dataflow runner v2, XLang pipelines on Flink/Spark)
>>> >>                 and opt-in using an experiment for all other
>>> pipelines.
>>> >>
>>> >>                 I would like to start making the non-portable
>>> >>                 pipelines starting with the DirectRunner[3] to be
>>> >>                 opt-out with the plan that eventually all runners will
>>> >>                 only execute splittable DoFns and the
>>> >>                 BoundedSource/UnboundedSource specific execution logic
>>> >>                 from the runners will be removed.
>>> >>
>>> >>                 Users will be able to opt-in any pipeline using the
>>> >>                 experiment 'use_sdf_read' and opt-out with the
>>> >>                 experiment 'use_deprecated_read'. (For portable
>>> >>                 pipelines these experiments were 'beam_fn_api' and
>>> >>                 'beam_fn_api_use_deprecated_read' respectively and I
>>> >>                 have added these two additional aliases to make the
>>> >>                 experience less confusing).
>>> >>
>>> >>                 1:
>>> >>
>>> https://github.com/apache/beam/blob/af1ce643d8fde5352d4519a558de4a2dfd24721d/sdks/java/core/src/main/java/org/apache/beam/sdk/io/Read.java#L275
>>> >>                 2:
>>> >>
>>> https://github.com/apache/beam/blob/af1ce643d8fde5352d4519a558de4a2dfd24721d/sdks/java/core/src/main/java/org/apache/beam/sdk/io/Read.java#L449
>>> >>                 3: https://github.com/apache/beam/pull/12519
>>> >
>>> >
>>> >
>>> >     --
>>> >     Pulasthi S. Wickramasinghe
>>> >     PhD Candidate  | Research Assistant
>>> >     School of Informatics and Computing | Digital Science Center
>>> >     Indiana University, Bloomington
>>> >     cell: 224-386-9035 <(224)%20386-9035> <tel:(224)%20386-9035>
>>> >
>>>
>>

Re: [DISCUSS][BEAM-10670] Migrating BoundedSource/UnboundedSource to execute as a Splittable DoFn for non-portable Java runners

Posted by Kenneth Knowles <ke...@apache.org>.
On Mon, Oct 5, 2020 at 2:44 PM Luke Cwik <lc...@google.com> wrote:

> For the 2.25 release the Java Direct, Flink, Jet, Samza, Twister2 will use
> SDF powered Read transforms. Users can opt-out
> with --experiments=use_deprecated_read.
>

Huzzah! In our release notes maybe be clear about the expectations for
users:

 - semantics are expected to be the same: file bugs for any change in
results
 - perf may vary: file bugs or write to user@

I was unable to get Spark done for 2.25 as I found out that Spark streaming
> doesn't support watermark holds[1]. If someone knows more about the
> watermark system in Spark I could use some guidance here as I believe I
> have a version of unbounded SDF support written for Spark (I get all the
> expected output from tests, just that watermarks aren't being held back so
> PAssert fails).
>

Spark's watermarks are not comparable to Beam's. The rule as I understand
it is that any data that is later than `max(seen timestamps) -
allowedLateness` is dropped. One difference is that dropping is relative to
the watermark instead of expiring windows, like early versions of Beam. The
other difference is that it track the latest event (some call it a "high
water mark" because it is the highest datetime value seen) where Beam's
watermark is an approximation of the earliest (some call it a "low water
mark" because it is a guarantee that it will not dip lower). When I chatted
about this with Amit in the early days, it was necessary to implement a
Beam-style watermark using Spark state. I think that may still be the case,
for correct results.

Also, I started a doc[2] to produce an updated blog post since the original
> SplittableDoFn blog from 2017 is out of date[3]. I was thinking of making
> this a new blog post and having the old blog post point to it. We could
> also remove the old blog post and or update it. Any thoughts?
>

New blog post w/ pointer from the old one.

Finally, I have a clean-up PR[4] that pushes the Read -> primitive Read
> expansion into each of the runners instead of having it within Read
> transform within beam-sdks-java-core.
>

Approved! I did CC a bunch of runner authors already. I think the important
thing is if a default changes we should be sure everyone is OK with the
perf changes, and everyone is confident that no incorrect results are
produced. The abstractions between sdk-core, runners-core-*, and individual
runners is important to me:

 - The SDK's job is to produce a portable, un-tweaked pipeline so moving
flags out of SDK core (and IOs) ASAP is super important.
 - The runner's job is to execute that pipeline, if they can, however they
want. If a runner wants to run Read transforms differently/directly that is
fine. If a runner is incapable of supporting SDF, then Read is better than
nothing. Etc.
 - The runners-core-* job is to just be internal libraries for runner
authors to share code, and should not make any decisions about the Beam
model, etc.

Kenn

1: https://github.com/apache/beam/pull/12603
> 2: http://doc/1kpn0RxqZaoacUPVSMYhhnfmlo8fGT-p50fEblaFr2HE
> 3: https://beam.apache.org/blog/splittable-do-fn/
> 4: https://github.com/apache/beam/pull/13006
>
>
> On Fri, Aug 28, 2020 at 1:45 AM Maximilian Michels <mx...@apache.org> wrote:
>
>> Thanks Luke! I've had a pass.
>>
>> -Max
>>
>> On 28.08.20 01:22, Luke Cwik wrote:
>> > As an update.
>> >
>> > Direct and Twister2 are done.
>> > Samza: is ready for review[1].
>> > Flink: is almost ready for review. [2] lays all the groundwork for the
>> > migration and [3] finishes the migration (there is a timeout happening
>> > in FlinkSubmissionTest that I'm trying to figure out).
>> > No further updates on Spark[4] or Jet[5].
>> >
>> > @Maximilian Michels <ma...@apache.org> or @thw@apache.org
>> > <ma...@gmail.com>, can either of you take a look at the
>> > Flink PRs?
>> > @ke.wu.cs@icloud.com <ma...@icloud.com>, Since Xinyu
>> delegated
>> > to you, can you take another look at the Samza PR?
>> >
>> > 1: https://github.com/apache/beam/pull/12617
>> > 2: https://github.com/apache/beam/pull/12706
>> > 3: https://github.com/apache/beam/pull/12708
>> > 4: https://github.com/apache/beam/pull/12603
>> > 5: https://github.com/apache/beam/pull/12616
>> >
>> > On Tue, Aug 18, 2020 at 11:42 AM Pulasthi Supun Wickramasinghe
>> > <pulasthi911@gmail.com <ma...@gmail.com>> wrote:
>> >
>> >     Hi Luke
>> >
>> >     Will take a look at this as soon as possible and get back to you.
>> >
>> >     Best Regards,
>> >     Pulasthi
>> >
>> >     On Tue, Aug 18, 2020 at 2:30 PM Luke Cwik <lcwik@google.com
>> >     <ma...@google.com>> wrote:
>> >
>> >         I have made some good progress here and have gotten to the
>> >         following state for non-portable runners:
>> >
>> >         DirectRunner[1]: Merged. Supports Read.Bounded and
>> Read.Unbounded.
>> >         Twister2[2]: Ready for review. Supports Read.Bounded, the
>> >         current runner doesn't support unbounded pipelines.
>> >         Spark[3]: WIP. Supports Read.Bounded, Nexmark suite passes. Not
>> >         certain about level of unbounded pipeline support coverage since
>> >         Spark uses its own tiny suite of tests to get unbounded pipeline
>> >         coverage instead of the validates runner set.
>> >         Jet[4]: WIP. Supports Read.Bounded. Read.Unbounded definitely
>> >         needs additional work.
>> >         Sazma[5]: WIP. Supports Read.Bounded. Not certain about level of
>> >         unbounded pipeline support coverage since Spark uses its own
>> >         tiny suite of tests to get unbounded pipeline coverage instead
>> >         of the validates runner set.
>> >         Flink: Unstarted.
>> >
>> >         @Pulasthi Supun Wickramasinghe <ma...@gmail.com> ,
>> >         can you help me with the Twister2 PR[2]?
>> >         @Ismaël Mejía <ma...@gmail.com>, is PR[3] the expected
>> >         level of support for unbounded pipelines and hence ready for
>> review?
>> >         @Jozsef Bartok <ma...@hazelcast.com>, can you help me
>> out
>> >         to get support for unbounded splittable DoFn's into Jet[4]?
>> >         @Xinyu Liu <ma...@gmail.com>, is PR[5] the
>> expected
>> >         level of support for unbounded pipelines and hence ready for
>> review?
>> >
>> >         1: https://github.com/apache/beam/pull/12519
>> >         2: https://github.com/apache/beam/pull/12594
>> >         3: https://github.com/apache/beam/pull/12603
>> >         4: https://github.com/apache/beam/pull/12616
>> >         5: https://github.com/apache/beam/pull/12617
>> >
>> >         On Tue, Aug 11, 2020 at 10:55 AM Luke Cwik <lcwik@google.com
>> >         <ma...@google.com>> wrote:
>> >
>> >             There shouldn't be any changes required since the wrapper
>> >             will smoothly transition the execution to be run as an SDF.
>> >             New IOs should strongly prefer to use SDF since it should be
>> >             simpler to write and will be more flexible but they can use
>> >             the "*Source"-based APIs. Eventually we'll deprecate the
>> >             APIs but we will never stop supporting them. Eventually they
>> >             should all be migrated to use SDF and if there is another
>> >             major Beam version, we'll finally be able to remove them.
>> >
>> >             On Tue, Aug 11, 2020 at 8:40 AM Alexey Romanenko
>> >             <aromanenko.dev@gmail.com <mailto:aromanenko.dev@gmail.com
>> >>
>> >             wrote:
>> >
>> >                 Hi Luke,
>> >
>> >                 Great to hear about such progress on this!
>> >
>> >                 Talking about opt-out for all runners in the future,
>> >                 will it require any code changes for current
>> >                 “*Source”-based IOs or the wrappers should completely
>> >                 smooth this transition?
>> >                 Do we need to require to create new IOs only based on
>> >                 SDF or again, the wrappers should help to avoid this?
>> >
>> >>                 On 10 Aug 2020, at 22:59, Luke Cwik <lcwik@google.com
>> >>                 <ma...@google.com>> wrote:
>> >>
>> >>                 In the past couple of months wrappers[1, 2] have been
>> >>                 added to the Beam Java SDK which can execute
>> >>                 BoundedSource and UnboundedSource as Splittable DoFns.
>> >>                 These have been opt-out for portable pipelines (e.g.
>> >>                 Dataflow runner v2, XLang pipelines on Flink/Spark)
>> >>                 and opt-in using an experiment for all other pipelines.
>> >>
>> >>                 I would like to start making the non-portable
>> >>                 pipelines starting with the DirectRunner[3] to be
>> >>                 opt-out with the plan that eventually all runners will
>> >>                 only execute splittable DoFns and the
>> >>                 BoundedSource/UnboundedSource specific execution logic
>> >>                 from the runners will be removed.
>> >>
>> >>                 Users will be able to opt-in any pipeline using the
>> >>                 experiment 'use_sdf_read' and opt-out with the
>> >>                 experiment 'use_deprecated_read'. (For portable
>> >>                 pipelines these experiments were 'beam_fn_api' and
>> >>                 'beam_fn_api_use_deprecated_read' respectively and I
>> >>                 have added these two additional aliases to make the
>> >>                 experience less confusing).
>> >>
>> >>                 1:
>> >>
>> https://github.com/apache/beam/blob/af1ce643d8fde5352d4519a558de4a2dfd24721d/sdks/java/core/src/main/java/org/apache/beam/sdk/io/Read.java#L275
>> >>                 2:
>> >>
>> https://github.com/apache/beam/blob/af1ce643d8fde5352d4519a558de4a2dfd24721d/sdks/java/core/src/main/java/org/apache/beam/sdk/io/Read.java#L449
>> >>                 3: https://github.com/apache/beam/pull/12519
>> >
>> >
>> >
>> >     --
>> >     Pulasthi S. Wickramasinghe
>> >     PhD Candidate  | Research Assistant
>> >     School of Informatics and Computing | Digital Science Center
>> >     Indiana University, Bloomington
>> >     cell: 224-386-9035 <(224)%20386-9035> <tel:(224)%20386-9035>
>> >
>>
>

Re: [DISCUSS][BEAM-10670] Migrating BoundedSource/UnboundedSource to execute as a Splittable DoFn for non-portable Java runners

Posted by Luke Cwik <lc...@google.com>.
For the 2.25 release the Java Direct, Flink, Jet, Samza, Twister2 will use
SDF powered Read transforms. Users can opt-out
with --experiments=use_deprecated_read.

I was unable to get Spark done for 2.25 as I found out that Spark streaming
doesn't support watermark holds[1]. If someone knows more about the
watermark system in Spark I could use some guidance here as I believe I
have a version of unbounded SDF support written for Spark (I get all the
expected output from tests, just that watermarks aren't being held back so
PAssert fails).

Also, I started a doc[2] to produce an updated blog post since the original
SplittableDoFn blog from 2017 is out of date[3]. I was thinking of making
this a new blog post and having the old blog post point to it. We could
also remove the old blog post and or update it. Any thoughts?

Finally, I have a clean-up PR[4] that pushes the Read -> primitive Read
expansion into each of the runners instead of having it within Read
transform within beam-sdks-java-core.

1: https://github.com/apache/beam/pull/12603
2: http://doc/1kpn0RxqZaoacUPVSMYhhnfmlo8fGT-p50fEblaFr2HE
3: https://beam.apache.org/blog/splittable-do-fn/
4: https://github.com/apache/beam/pull/13006


On Fri, Aug 28, 2020 at 1:45 AM Maximilian Michels <mx...@apache.org> wrote:

> Thanks Luke! I've had a pass.
>
> -Max
>
> On 28.08.20 01:22, Luke Cwik wrote:
> > As an update.
> >
> > Direct and Twister2 are done.
> > Samza: is ready for review[1].
> > Flink: is almost ready for review. [2] lays all the groundwork for the
> > migration and [3] finishes the migration (there is a timeout happening
> > in FlinkSubmissionTest that I'm trying to figure out).
> > No further updates on Spark[4] or Jet[5].
> >
> > @Maximilian Michels <ma...@apache.org> or @thw@apache.org
> > <ma...@gmail.com>, can either of you take a look at the
> > Flink PRs?
> > @ke.wu.cs@icloud.com <ma...@icloud.com>, Since Xinyu
> delegated
> > to you, can you take another look at the Samza PR?
> >
> > 1: https://github.com/apache/beam/pull/12617
> > 2: https://github.com/apache/beam/pull/12706
> > 3: https://github.com/apache/beam/pull/12708
> > 4: https://github.com/apache/beam/pull/12603
> > 5: https://github.com/apache/beam/pull/12616
> >
> > On Tue, Aug 18, 2020 at 11:42 AM Pulasthi Supun Wickramasinghe
> > <pulasthi911@gmail.com <ma...@gmail.com>> wrote:
> >
> >     Hi Luke
> >
> >     Will take a look at this as soon as possible and get back to you.
> >
> >     Best Regards,
> >     Pulasthi
> >
> >     On Tue, Aug 18, 2020 at 2:30 PM Luke Cwik <lcwik@google.com
> >     <ma...@google.com>> wrote:
> >
> >         I have made some good progress here and have gotten to the
> >         following state for non-portable runners:
> >
> >         DirectRunner[1]: Merged. Supports Read.Bounded and
> Read.Unbounded.
> >         Twister2[2]: Ready for review. Supports Read.Bounded, the
> >         current runner doesn't support unbounded pipelines.
> >         Spark[3]: WIP. Supports Read.Bounded, Nexmark suite passes. Not
> >         certain about level of unbounded pipeline support coverage since
> >         Spark uses its own tiny suite of tests to get unbounded pipeline
> >         coverage instead of the validates runner set.
> >         Jet[4]: WIP. Supports Read.Bounded. Read.Unbounded definitely
> >         needs additional work.
> >         Sazma[5]: WIP. Supports Read.Bounded. Not certain about level of
> >         unbounded pipeline support coverage since Spark uses its own
> >         tiny suite of tests to get unbounded pipeline coverage instead
> >         of the validates runner set.
> >         Flink: Unstarted.
> >
> >         @Pulasthi Supun Wickramasinghe <ma...@gmail.com> ,
> >         can you help me with the Twister2 PR[2]?
> >         @Ismaël Mejía <ma...@gmail.com>, is PR[3] the expected
> >         level of support for unbounded pipelines and hence ready for
> review?
> >         @Jozsef Bartok <ma...@hazelcast.com>, can you help me out
> >         to get support for unbounded splittable DoFn's into Jet[4]?
> >         @Xinyu Liu <ma...@gmail.com>, is PR[5] the expected
> >         level of support for unbounded pipelines and hence ready for
> review?
> >
> >         1: https://github.com/apache/beam/pull/12519
> >         2: https://github.com/apache/beam/pull/12594
> >         3: https://github.com/apache/beam/pull/12603
> >         4: https://github.com/apache/beam/pull/12616
> >         5: https://github.com/apache/beam/pull/12617
> >
> >         On Tue, Aug 11, 2020 at 10:55 AM Luke Cwik <lcwik@google.com
> >         <ma...@google.com>> wrote:
> >
> >             There shouldn't be any changes required since the wrapper
> >             will smoothly transition the execution to be run as an SDF.
> >             New IOs should strongly prefer to use SDF since it should be
> >             simpler to write and will be more flexible but they can use
> >             the "*Source"-based APIs. Eventually we'll deprecate the
> >             APIs but we will never stop supporting them. Eventually they
> >             should all be migrated to use SDF and if there is another
> >             major Beam version, we'll finally be able to remove them.
> >
> >             On Tue, Aug 11, 2020 at 8:40 AM Alexey Romanenko
> >             <aromanenko.dev@gmail.com <ma...@gmail.com>>
> >             wrote:
> >
> >                 Hi Luke,
> >
> >                 Great to hear about such progress on this!
> >
> >                 Talking about opt-out for all runners in the future,
> >                 will it require any code changes for current
> >                 “*Source”-based IOs or the wrappers should completely
> >                 smooth this transition?
> >                 Do we need to require to create new IOs only based on
> >                 SDF or again, the wrappers should help to avoid this?
> >
> >>                 On 10 Aug 2020, at 22:59, Luke Cwik <lcwik@google.com
> >>                 <ma...@google.com>> wrote:
> >>
> >>                 In the past couple of months wrappers[1, 2] have been
> >>                 added to the Beam Java SDK which can execute
> >>                 BoundedSource and UnboundedSource as Splittable DoFns.
> >>                 These have been opt-out for portable pipelines (e.g.
> >>                 Dataflow runner v2, XLang pipelines on Flink/Spark)
> >>                 and opt-in using an experiment for all other pipelines.
> >>
> >>                 I would like to start making the non-portable
> >>                 pipelines starting with the DirectRunner[3] to be
> >>                 opt-out with the plan that eventually all runners will
> >>                 only execute splittable DoFns and the
> >>                 BoundedSource/UnboundedSource specific execution logic
> >>                 from the runners will be removed.
> >>
> >>                 Users will be able to opt-in any pipeline using the
> >>                 experiment 'use_sdf_read' and opt-out with the
> >>                 experiment 'use_deprecated_read'. (For portable
> >>                 pipelines these experiments were 'beam_fn_api' and
> >>                 'beam_fn_api_use_deprecated_read' respectively and I
> >>                 have added these two additional aliases to make the
> >>                 experience less confusing).
> >>
> >>                 1:
> >>
> https://github.com/apache/beam/blob/af1ce643d8fde5352d4519a558de4a2dfd24721d/sdks/java/core/src/main/java/org/apache/beam/sdk/io/Read.java#L275
> >>                 2:
> >>
> https://github.com/apache/beam/blob/af1ce643d8fde5352d4519a558de4a2dfd24721d/sdks/java/core/src/main/java/org/apache/beam/sdk/io/Read.java#L449
> >>                 3: https://github.com/apache/beam/pull/12519
> >
> >
> >
> >     --
> >     Pulasthi S. Wickramasinghe
> >     PhD Candidate  | Research Assistant
> >     School of Informatics and Computing | Digital Science Center
> >     Indiana University, Bloomington
> >     cell: 224-386-9035 <(224)%20386-9035> <tel:(224)%20386-9035>
> >
>

Re: [DISCUSS][BEAM-10670] Migrating BoundedSource/UnboundedSource to execute as a Splittable DoFn for non-portable Java runners

Posted by Maximilian Michels <mx...@apache.org>.
Thanks Luke! I've had a pass.

-Max

On 28.08.20 01:22, Luke Cwik wrote:
> As an update.
> 
> Direct and Twister2 are done.
> Samza: is ready for review[1].
> Flink: is almost ready for review. [2] lays all the groundwork for the 
> migration and [3] finishes the migration (there is a timeout happening 
> in FlinkSubmissionTest that I'm trying to figure out).
> No further updates on Spark[4] or Jet[5].
> 
> @Maximilian Michels <ma...@apache.org> or @thw@apache.org 
> <ma...@gmail.com>, can either of you take a look at the 
> Flink PRs?
> @ke.wu.cs@icloud.com <ma...@icloud.com>, Since Xinyu delegated 
> to you, can you take another look at the Samza PR?
> 
> 1: https://github.com/apache/beam/pull/12617
> 2: https://github.com/apache/beam/pull/12706
> 3: https://github.com/apache/beam/pull/12708
> 4: https://github.com/apache/beam/pull/12603
> 5: https://github.com/apache/beam/pull/12616
> 
> On Tue, Aug 18, 2020 at 11:42 AM Pulasthi Supun Wickramasinghe 
> <pulasthi911@gmail.com <ma...@gmail.com>> wrote:
> 
>     Hi Luke
> 
>     Will take a look at this as soon as possible and get back to you.
> 
>     Best Regards,
>     Pulasthi
> 
>     On Tue, Aug 18, 2020 at 2:30 PM Luke Cwik <lcwik@google.com
>     <ma...@google.com>> wrote:
> 
>         I have made some good progress here and have gotten to the
>         following state for non-portable runners:
> 
>         DirectRunner[1]: Merged. Supports Read.Bounded and Read.Unbounded.
>         Twister2[2]: Ready for review. Supports Read.Bounded, the
>         current runner doesn't support unbounded pipelines.
>         Spark[3]: WIP. Supports Read.Bounded, Nexmark suite passes. Not
>         certain about level of unbounded pipeline support coverage since
>         Spark uses its own tiny suite of tests to get unbounded pipeline
>         coverage instead of the validates runner set.
>         Jet[4]: WIP. Supports Read.Bounded. Read.Unbounded definitely
>         needs additional work.
>         Sazma[5]: WIP. Supports Read.Bounded. Not certain about level of
>         unbounded pipeline support coverage since Spark uses its own
>         tiny suite of tests to get unbounded pipeline coverage instead
>         of the validates runner set.
>         Flink: Unstarted.
> 
>         @Pulasthi Supun Wickramasinghe <ma...@gmail.com> ,
>         can you help me with the Twister2 PR[2]?
>         @Ismaël Mejía <ma...@gmail.com>, is PR[3] the expected
>         level of support for unbounded pipelines and hence ready for review?
>         @Jozsef Bartok <ma...@hazelcast.com>, can you help me out
>         to get support for unbounded splittable DoFn's into Jet[4]?
>         @Xinyu Liu <ma...@gmail.com>, is PR[5] the expected
>         level of support for unbounded pipelines and hence ready for review?
> 
>         1: https://github.com/apache/beam/pull/12519
>         2: https://github.com/apache/beam/pull/12594
>         3: https://github.com/apache/beam/pull/12603
>         4: https://github.com/apache/beam/pull/12616
>         5: https://github.com/apache/beam/pull/12617
> 
>         On Tue, Aug 11, 2020 at 10:55 AM Luke Cwik <lcwik@google.com
>         <ma...@google.com>> wrote:
> 
>             There shouldn't be any changes required since the wrapper
>             will smoothly transition the execution to be run as an SDF.
>             New IOs should strongly prefer to use SDF since it should be
>             simpler to write and will be more flexible but they can use
>             the "*Source"-based APIs. Eventually we'll deprecate the
>             APIs but we will never stop supporting them. Eventually they
>             should all be migrated to use SDF and if there is another
>             major Beam version, we'll finally be able to remove them.
> 
>             On Tue, Aug 11, 2020 at 8:40 AM Alexey Romanenko
>             <aromanenko.dev@gmail.com <ma...@gmail.com>>
>             wrote:
> 
>                 Hi Luke,
> 
>                 Great to hear about such progress on this!
> 
>                 Talking about opt-out for all runners in the future,
>                 will it require any code changes for current
>                 “*Source”-based IOs or the wrappers should completely
>                 smooth this transition?
>                 Do we need to require to create new IOs only based on
>                 SDF or again, the wrappers should help to avoid this?
> 
>>                 On 10 Aug 2020, at 22:59, Luke Cwik <lcwik@google.com
>>                 <ma...@google.com>> wrote:
>>
>>                 In the past couple of months wrappers[1, 2] have been
>>                 added to the Beam Java SDK which can execute
>>                 BoundedSource and UnboundedSource as Splittable DoFns.
>>                 These have been opt-out for portable pipelines (e.g.
>>                 Dataflow runner v2, XLang pipelines on Flink/Spark)
>>                 and opt-in using an experiment for all other pipelines.
>>
>>                 I would like to start making the non-portable
>>                 pipelines starting with the DirectRunner[3] to be
>>                 opt-out with the plan that eventually all runners will
>>                 only execute splittable DoFns and the
>>                 BoundedSource/UnboundedSource specific execution logic
>>                 from the runners will be removed.
>>
>>                 Users will be able to opt-in any pipeline using the
>>                 experiment 'use_sdf_read' and opt-out with the
>>                 experiment 'use_deprecated_read'. (For portable
>>                 pipelines these experiments were 'beam_fn_api' and
>>                 'beam_fn_api_use_deprecated_read' respectively and I
>>                 have added these two additional aliases to make the
>>                 experience less confusing).
>>
>>                 1:
>>                 https://github.com/apache/beam/blob/af1ce643d8fde5352d4519a558de4a2dfd24721d/sdks/java/core/src/main/java/org/apache/beam/sdk/io/Read.java#L275
>>                 2:
>>                 https://github.com/apache/beam/blob/af1ce643d8fde5352d4519a558de4a2dfd24721d/sdks/java/core/src/main/java/org/apache/beam/sdk/io/Read.java#L449
>>                 3: https://github.com/apache/beam/pull/12519
> 
> 
> 
>     -- 
>     Pulasthi S. Wickramasinghe
>     PhD Candidate  | Research Assistant
>     School of Informatics and Computing | Digital Science Center
>     Indiana University, Bloomington
>     cell: 224-386-9035 <tel:(224)%20386-9035>
> 

Re: [DISCUSS][BEAM-10670] Migrating BoundedSource/UnboundedSource to execute as a Splittable DoFn for non-portable Java runners

Posted by Luke Cwik <lc...@google.com>.
As an update.

Direct and Twister2 are done.
Samza: is ready for review[1].
Flink: is almost ready for review. [2] lays all the groundwork for the
migration and [3] finishes the migration (there is a timeout happening in
FlinkSubmissionTest that I'm trying to figure out).
No further updates on Spark[4] or Jet[5].

@Maximilian Michels <mx...@apache.org> or @thw@apache.org
<th...@gmail.com>, can either of you take a look at the Flink PRs?
@ke.wu.cs@icloud.com <ke...@icloud.com>, Since Xinyu delegated to you,
can you take another look at the Samza PR?

1: https://github.com/apache/beam/pull/12617
2: https://github.com/apache/beam/pull/12706
3: https://github.com/apache/beam/pull/12708
4: https://github.com/apache/beam/pull/12603
5: https://github.com/apache/beam/pull/12616

On Tue, Aug 18, 2020 at 11:42 AM Pulasthi Supun Wickramasinghe <
pulasthi911@gmail.com> wrote:

> Hi Luke
>
> Will take a look at this as soon as possible and get back to you.
>
> Best Regards,
> Pulasthi
>
> On Tue, Aug 18, 2020 at 2:30 PM Luke Cwik <lc...@google.com> wrote:
>
>> I have made some good progress here and have gotten to the following
>> state for non-portable runners:
>>
>> DirectRunner[1]: Merged. Supports Read.Bounded and Read.Unbounded.
>> Twister2[2]: Ready for review. Supports Read.Bounded, the current runner
>> doesn't support unbounded pipelines.
>> Spark[3]: WIP. Supports Read.Bounded, Nexmark suite passes. Not certain
>> about level of unbounded pipeline support coverage since Spark uses its own
>> tiny suite of tests to get unbounded pipeline coverage instead of the
>> validates runner set.
>> Jet[4]: WIP. Supports Read.Bounded. Read.Unbounded definitely needs
>> additional work.
>> Sazma[5]: WIP. Supports Read.Bounded. Not certain about level of
>> unbounded pipeline support coverage since Spark uses its own tiny suite of
>> tests to get unbounded pipeline coverage instead of the validates runner
>> set.
>> Flink: Unstarted.
>>
>> @Pulasthi Supun Wickramasinghe <pu...@gmail.com> , can you help me
>> with the Twister2 PR[2]?
>> @Ismaël Mejía <ie...@gmail.com>, is PR[3] the expected level of
>> support for unbounded pipelines and hence ready for review?
>> @Jozsef Bartok <jo...@hazelcast.com>, can you help me out to get support
>> for unbounded splittable DoFn's into Jet[4]?
>> @Xinyu Liu <xi...@gmail.com>, is PR[5] the expected level of
>> support for unbounded pipelines and hence ready for review?
>>
>> 1: https://github.com/apache/beam/pull/12519
>> 2: https://github.com/apache/beam/pull/12594
>> 3: https://github.com/apache/beam/pull/12603
>> 4: https://github.com/apache/beam/pull/12616
>> 5: https://github.com/apache/beam/pull/12617
>>
>> On Tue, Aug 11, 2020 at 10:55 AM Luke Cwik <lc...@google.com> wrote:
>>
>>> There shouldn't be any changes required since the wrapper will smoothly
>>> transition the execution to be run as an SDF. New IOs should strongly
>>> prefer to use SDF since it should be simpler to write and will be more
>>> flexible but they can use the "*Source"-based APIs. Eventually we'll
>>> deprecate the APIs but we will never stop supporting them. Eventually they
>>> should all be migrated to use SDF and if there is another major Beam
>>> version, we'll finally be able to remove them.
>>>
>>> On Tue, Aug 11, 2020 at 8:40 AM Alexey Romanenko <
>>> aromanenko.dev@gmail.com> wrote:
>>>
>>>> Hi Luke,
>>>>
>>>> Great to hear about such progress on this!
>>>>
>>>> Talking about opt-out for all runners in the future, will it require
>>>> any code changes for current “*Source”-based IOs or the wrappers should
>>>> completely smooth this transition?
>>>> Do we need to require to create new IOs only based on SDF or again, the
>>>> wrappers should help to avoid this?
>>>>
>>>> On 10 Aug 2020, at 22:59, Luke Cwik <lc...@google.com> wrote:
>>>>
>>>> In the past couple of months wrappers[1, 2] have been added to the Beam
>>>> Java SDK which can execute BoundedSource and UnboundedSource as Splittable
>>>> DoFns. These have been opt-out for portable pipelines (e.g. Dataflow runner
>>>> v2, XLang pipelines on Flink/Spark) and opt-in using an experiment for all
>>>> other pipelines.
>>>>
>>>> I would like to start making the non-portable pipelines starting with
>>>> the DirectRunner[3] to be opt-out with the plan that eventually all runners
>>>> will only execute splittable DoFns and the BoundedSource/UnboundedSource
>>>> specific execution logic from the runners will be removed.
>>>>
>>>> Users will be able to opt-in any pipeline using the experiment
>>>> 'use_sdf_read' and opt-out with the experiment 'use_deprecated_read'. (For
>>>> portable pipelines these experiments were 'beam_fn_api' and
>>>> 'beam_fn_api_use_deprecated_read' respectively and I have added these two
>>>> additional aliases to make the experience less confusing).
>>>>
>>>> 1:
>>>> https://github.com/apache/beam/blob/af1ce643d8fde5352d4519a558de4a2dfd24721d/sdks/java/core/src/main/java/org/apache/beam/sdk/io/Read.java#L275
>>>> 2:
>>>> https://github.com/apache/beam/blob/af1ce643d8fde5352d4519a558de4a2dfd24721d/sdks/java/core/src/main/java/org/apache/beam/sdk/io/Read.java#L449
>>>> 3: https://github.com/apache/beam/pull/12519
>>>>
>>>>
>>>>
>
> --
> Pulasthi S. Wickramasinghe
> PhD Candidate  | Research Assistant
> School of Informatics and Computing | Digital Science Center
> Indiana University, Bloomington
> cell: 224-386-9035 <(224)%20386-9035>
>

Re: [DISCUSS][BEAM-10670] Migrating BoundedSource/UnboundedSource to execute as a Splittable DoFn for non-portable Java runners

Posted by Pulasthi Supun Wickramasinghe <pu...@gmail.com>.
Hi Luke

Will take a look at this as soon as possible and get back to you.

Best Regards,
Pulasthi

On Tue, Aug 18, 2020 at 2:30 PM Luke Cwik <lc...@google.com> wrote:

> I have made some good progress here and have gotten to the following state
> for non-portable runners:
>
> DirectRunner[1]: Merged. Supports Read.Bounded and Read.Unbounded.
> Twister2[2]: Ready for review. Supports Read.Bounded, the current runner
> doesn't support unbounded pipelines.
> Spark[3]: WIP. Supports Read.Bounded, Nexmark suite passes. Not certain
> about level of unbounded pipeline support coverage since Spark uses its own
> tiny suite of tests to get unbounded pipeline coverage instead of the
> validates runner set.
> Jet[4]: WIP. Supports Read.Bounded. Read.Unbounded definitely needs
> additional work.
> Sazma[5]: WIP. Supports Read.Bounded. Not certain about level of unbounded
> pipeline support coverage since Spark uses its own tiny suite of tests to
> get unbounded pipeline coverage instead of the validates runner set.
> Flink: Unstarted.
>
> @Pulasthi Supun Wickramasinghe <pu...@gmail.com> , can you help me
> with the Twister2 PR[2]?
> @Ismaël Mejía <ie...@gmail.com>, is PR[3] the expected level of support
> for unbounded pipelines and hence ready for review?
> @Jozsef Bartok <jo...@hazelcast.com>, can you help me out to get support
> for unbounded splittable DoFn's into Jet[4]?
> @Xinyu Liu <xi...@gmail.com>, is PR[5] the expected level of
> support for unbounded pipelines and hence ready for review?
>
> 1: https://github.com/apache/beam/pull/12519
> 2: https://github.com/apache/beam/pull/12594
> 3: https://github.com/apache/beam/pull/12603
> 4: https://github.com/apache/beam/pull/12616
> 5: https://github.com/apache/beam/pull/12617
>
> On Tue, Aug 11, 2020 at 10:55 AM Luke Cwik <lc...@google.com> wrote:
>
>> There shouldn't be any changes required since the wrapper will smoothly
>> transition the execution to be run as an SDF. New IOs should strongly
>> prefer to use SDF since it should be simpler to write and will be more
>> flexible but they can use the "*Source"-based APIs. Eventually we'll
>> deprecate the APIs but we will never stop supporting them. Eventually they
>> should all be migrated to use SDF and if there is another major Beam
>> version, we'll finally be able to remove them.
>>
>> On Tue, Aug 11, 2020 at 8:40 AM Alexey Romanenko <
>> aromanenko.dev@gmail.com> wrote:
>>
>>> Hi Luke,
>>>
>>> Great to hear about such progress on this!
>>>
>>> Talking about opt-out for all runners in the future, will it require any
>>> code changes for current “*Source”-based IOs or the wrappers should
>>> completely smooth this transition?
>>> Do we need to require to create new IOs only based on SDF or again, the
>>> wrappers should help to avoid this?
>>>
>>> On 10 Aug 2020, at 22:59, Luke Cwik <lc...@google.com> wrote:
>>>
>>> In the past couple of months wrappers[1, 2] have been added to the Beam
>>> Java SDK which can execute BoundedSource and UnboundedSource as Splittable
>>> DoFns. These have been opt-out for portable pipelines (e.g. Dataflow runner
>>> v2, XLang pipelines on Flink/Spark) and opt-in using an experiment for all
>>> other pipelines.
>>>
>>> I would like to start making the non-portable pipelines starting with
>>> the DirectRunner[3] to be opt-out with the plan that eventually all runners
>>> will only execute splittable DoFns and the BoundedSource/UnboundedSource
>>> specific execution logic from the runners will be removed.
>>>
>>> Users will be able to opt-in any pipeline using the experiment
>>> 'use_sdf_read' and opt-out with the experiment 'use_deprecated_read'. (For
>>> portable pipelines these experiments were 'beam_fn_api' and
>>> 'beam_fn_api_use_deprecated_read' respectively and I have added these two
>>> additional aliases to make the experience less confusing).
>>>
>>> 1:
>>> https://github.com/apache/beam/blob/af1ce643d8fde5352d4519a558de4a2dfd24721d/sdks/java/core/src/main/java/org/apache/beam/sdk/io/Read.java#L275
>>> 2:
>>> https://github.com/apache/beam/blob/af1ce643d8fde5352d4519a558de4a2dfd24721d/sdks/java/core/src/main/java/org/apache/beam/sdk/io/Read.java#L449
>>> 3: https://github.com/apache/beam/pull/12519
>>>
>>>
>>>

-- 
Pulasthi S. Wickramasinghe
PhD Candidate  | Research Assistant
School of Informatics and Computing | Digital Science Center
Indiana University, Bloomington
cell: 224-386-9035

Re: [DISCUSS][BEAM-10670] Migrating BoundedSource/UnboundedSource to execute as a Splittable DoFn for non-portable Java runners

Posted by Luke Cwik <lc...@google.com>.
I have made some good progress here and have gotten to the following state
for non-portable runners:

DirectRunner[1]: Merged. Supports Read.Bounded and Read.Unbounded.
Twister2[2]: Ready for review. Supports Read.Bounded, the current runner
doesn't support unbounded pipelines.
Spark[3]: WIP. Supports Read.Bounded, Nexmark suite passes. Not certain
about level of unbounded pipeline support coverage since Spark uses its own
tiny suite of tests to get unbounded pipeline coverage instead of the
validates runner set.
Jet[4]: WIP. Supports Read.Bounded. Read.Unbounded definitely needs
additional work.
Sazma[5]: WIP. Supports Read.Bounded. Not certain about level of unbounded
pipeline support coverage since Spark uses its own tiny suite of tests to
get unbounded pipeline coverage instead of the validates runner set.
Flink: Unstarted.

@Pulasthi Supun Wickramasinghe <pu...@gmail.com> , can you help me
with the Twister2 PR[2]?
@Ismaël Mejía <ie...@gmail.com>, is PR[3] the expected level of support
for unbounded pipelines and hence ready for review?
@Jozsef Bartok <jo...@hazelcast.com>, can you help me out to get support
for unbounded splittable DoFn's into Jet[4]?
@Xinyu Liu <xi...@gmail.com>, is PR[5] the expected level of support
for unbounded pipelines and hence ready for review?

1: https://github.com/apache/beam/pull/12519
2: https://github.com/apache/beam/pull/12594
3: https://github.com/apache/beam/pull/12603
4: https://github.com/apache/beam/pull/12616
5: https://github.com/apache/beam/pull/12617

On Tue, Aug 11, 2020 at 10:55 AM Luke Cwik <lc...@google.com> wrote:

> There shouldn't be any changes required since the wrapper will smoothly
> transition the execution to be run as an SDF. New IOs should strongly
> prefer to use SDF since it should be simpler to write and will be more
> flexible but they can use the "*Source"-based APIs. Eventually we'll
> deprecate the APIs but we will never stop supporting them. Eventually they
> should all be migrated to use SDF and if there is another major Beam
> version, we'll finally be able to remove them.
>
> On Tue, Aug 11, 2020 at 8:40 AM Alexey Romanenko <ar...@gmail.com>
> wrote:
>
>> Hi Luke,
>>
>> Great to hear about such progress on this!
>>
>> Talking about opt-out for all runners in the future, will it require any
>> code changes for current “*Source”-based IOs or the wrappers should
>> completely smooth this transition?
>> Do we need to require to create new IOs only based on SDF or again, the
>> wrappers should help to avoid this?
>>
>> On 10 Aug 2020, at 22:59, Luke Cwik <lc...@google.com> wrote:
>>
>> In the past couple of months wrappers[1, 2] have been added to the Beam
>> Java SDK which can execute BoundedSource and UnboundedSource as Splittable
>> DoFns. These have been opt-out for portable pipelines (e.g. Dataflow runner
>> v2, XLang pipelines on Flink/Spark) and opt-in using an experiment for all
>> other pipelines.
>>
>> I would like to start making the non-portable pipelines starting with the
>> DirectRunner[3] to be opt-out with the plan that eventually all runners
>> will only execute splittable DoFns and the BoundedSource/UnboundedSource
>> specific execution logic from the runners will be removed.
>>
>> Users will be able to opt-in any pipeline using the experiment
>> 'use_sdf_read' and opt-out with the experiment 'use_deprecated_read'. (For
>> portable pipelines these experiments were 'beam_fn_api' and
>> 'beam_fn_api_use_deprecated_read' respectively and I have added these two
>> additional aliases to make the experience less confusing).
>>
>> 1:
>> https://github.com/apache/beam/blob/af1ce643d8fde5352d4519a558de4a2dfd24721d/sdks/java/core/src/main/java/org/apache/beam/sdk/io/Read.java#L275
>> 2:
>> https://github.com/apache/beam/blob/af1ce643d8fde5352d4519a558de4a2dfd24721d/sdks/java/core/src/main/java/org/apache/beam/sdk/io/Read.java#L449
>> 3: https://github.com/apache/beam/pull/12519
>>
>>
>>

Re: [DISCUSS][BEAM-10670] Migrating BoundedSource/UnboundedSource to execute as a Splittable DoFn for non-portable Java runners

Posted by Luke Cwik <lc...@google.com>.
There shouldn't be any changes required since the wrapper will smoothly
transition the execution to be run as an SDF. New IOs should strongly
prefer to use SDF since it should be simpler to write and will be more
flexible but they can use the "*Source"-based APIs. Eventually we'll
deprecate the APIs but we will never stop supporting them. Eventually they
should all be migrated to use SDF and if there is another major Beam
version, we'll finally be able to remove them.

On Tue, Aug 11, 2020 at 8:40 AM Alexey Romanenko <ar...@gmail.com>
wrote:

> Hi Luke,
>
> Great to hear about such progress on this!
>
> Talking about opt-out for all runners in the future, will it require any
> code changes for current “*Source”-based IOs or the wrappers should
> completely smooth this transition?
> Do we need to require to create new IOs only based on SDF or again, the
> wrappers should help to avoid this?
>
> On 10 Aug 2020, at 22:59, Luke Cwik <lc...@google.com> wrote:
>
> In the past couple of months wrappers[1, 2] have been added to the Beam
> Java SDK which can execute BoundedSource and UnboundedSource as Splittable
> DoFns. These have been opt-out for portable pipelines (e.g. Dataflow runner
> v2, XLang pipelines on Flink/Spark) and opt-in using an experiment for all
> other pipelines.
>
> I would like to start making the non-portable pipelines starting with the
> DirectRunner[3] to be opt-out with the plan that eventually all runners
> will only execute splittable DoFns and the BoundedSource/UnboundedSource
> specific execution logic from the runners will be removed.
>
> Users will be able to opt-in any pipeline using the experiment
> 'use_sdf_read' and opt-out with the experiment 'use_deprecated_read'. (For
> portable pipelines these experiments were 'beam_fn_api' and
> 'beam_fn_api_use_deprecated_read' respectively and I have added these two
> additional aliases to make the experience less confusing).
>
> 1:
> https://github.com/apache/beam/blob/af1ce643d8fde5352d4519a558de4a2dfd24721d/sdks/java/core/src/main/java/org/apache/beam/sdk/io/Read.java#L275
> 2:
> https://github.com/apache/beam/blob/af1ce643d8fde5352d4519a558de4a2dfd24721d/sdks/java/core/src/main/java/org/apache/beam/sdk/io/Read.java#L449
> 3: https://github.com/apache/beam/pull/12519
>
>
>

Re: [DISCUSS][BEAM-10670] Migrating BoundedSource/UnboundedSource to execute as a Splittable DoFn for non-portable Java runners

Posted by Alexey Romanenko <ar...@gmail.com>.
Hi Luke,

Great to hear about such progress on this! 

Talking about opt-out for all runners in the future, will it require any code changes for current “*Source”-based IOs or the wrappers should completely smooth this transition? 
Do we need to require to create new IOs only based on SDF or again, the wrappers should help to avoid this? 

> On 10 Aug 2020, at 22:59, Luke Cwik <lc...@google.com> wrote:
> 
> In the past couple of months wrappers[1, 2] have been added to the Beam Java SDK which can execute BoundedSource and UnboundedSource as Splittable DoFns. These have been opt-out for portable pipelines (e.g. Dataflow runner v2, XLang pipelines on Flink/Spark) and opt-in using an experiment for all other pipelines.
> 
> I would like to start making the non-portable pipelines starting with the DirectRunner[3] to be opt-out with the plan that eventually all runners will only execute splittable DoFns and the BoundedSource/UnboundedSource specific execution logic from the runners will be removed.
> 
> Users will be able to opt-in any pipeline using the experiment 'use_sdf_read' and opt-out with the experiment 'use_deprecated_read'. (For portable pipelines these experiments were 'beam_fn_api' and 'beam_fn_api_use_deprecated_read' respectively and I have added these two additional aliases to make the experience less confusing).
> 
> 1: https://github.com/apache/beam/blob/af1ce643d8fde5352d4519a558de4a2dfd24721d/sdks/java/core/src/main/java/org/apache/beam/sdk/io/Read.java#L275 <https://github.com/apache/beam/blob/af1ce643d8fde5352d4519a558de4a2dfd24721d/sdks/java/core/src/main/java/org/apache/beam/sdk/io/Read.java#L275>
> 2: https://github.com/apache/beam/blob/af1ce643d8fde5352d4519a558de4a2dfd24721d/sdks/java/core/src/main/java/org/apache/beam/sdk/io/Read.java#L449 <https://github.com/apache/beam/blob/af1ce643d8fde5352d4519a558de4a2dfd24721d/sdks/java/core/src/main/java/org/apache/beam/sdk/io/Read.java#L449>
> 3: https://github.com/apache/beam/pull/12519 <https://github.com/apache/beam/pull/12519>