You are viewing a plain text version of this content. The canonical link for it is here.
Posted to dev@beam.apache.org by Steve Niemitz <sn...@apache.org> on 2020/05/15 14:08:10 UTC

Writing a new IO on beam, should I use the source API or SDF?

I'm going to be writing a new IO (in java) for reading files in a custom
format, and want to make it splittable.  It seems like I have a choice
between the "legacy" source API, and newer experimental SDF API.  Is there
any guidance on which I should use?  I can likely tolerate some API churn
as well in the SDF APIs.

My target runner is dataflow.

Thanks!

Re: Writing a new IO on beam, should I use the source API or SDF?

Posted by Boyuan Zhang <bo...@google.com>.
Hi Steve,

Yes that's correct.

On Fri, May 15, 2020 at 2:11 PM Steve Niemitz <sn...@apache.org> wrote:

> ah! ok awesome, I think that was the piece I was misunderstanding.  So I
> _can_ use a SDF to split the work initially (like I was manually doing in
> #1), but it just won't be further split dynamically on dataflow v1 right
> now.  Is my understanding there correct?
>
> On Fri, May 15, 2020 at 5:03 PM Luke Cwik <lc...@google.com> wrote:
>
>> #3 is the best when you implement @SplitRestriction on the SDF.
>>
>> The size of each restriction is used to better balance the splits within
>> Dataflow runner v2 so it is less susceptible to the too many or unbalanced
>> split problem.
>> For example, if you have 4 workers and make 20 splits, the splits will be
>> grouped based upon their sizes. So if 19 of those splits are small and 1 is
>> big, the 1 will execute by itself while the 19 will be done by the 3 other
>> workers.
>>
>> Also, dynamic work rebalancing isn't meant to replace those initial
>> splits but helps a lot with worker rebalancing since a few workers are
>> usually stragglers and will need some help at the end of a pipeline.
>>
>> On Fri, May 15, 2020 at 1:54 PM Steve Niemitz <sn...@apache.org>
>> wrote:
>>
>>> Thanks for the replies so far.  I should have specifically mentioned
>>> above, I am building a bounded source.
>>>
>>> While I was thinking this through, I realized that I might not actually
>>> need any fancy splitting, since I can calculate all my split points up
>>> front.  I think this goes well with Ismaël's suggestion as well.
>>>
>>> I'm curious what the pros and cons would be of these options:
>>> 1) Presplit each file into N pieces (based on a target bundle size,
>>> similar to how it looks like the avro reader does it), using a
>>> standard DoFn to read each split.
>>> 2) Presplit, but use a SDF to support further splitting once it's
>>> supported in dataflow.  (this would also help if I have files that can't be
>>> split up front)
>>> 3) Don't pre-split, but use a SDF.
>>> 4) Use the source API
>>>
>>> I think we've covered 2 and 4 pretty well already, but curious
>>> specifically about the pre-split approach.  Thanks again so far!
>>>
>>> On Fri, May 15, 2020 at 1:11 PM Ismaël Mejía <ie...@gmail.com> wrote:
>>>
>>>> For the Bounded case if you do not have a straight forward way to split
>>>> at
>>>> fractions, or simply if you do not care about Dynamic Work Rebalancing.
>>>> You can
>>>> get away implementing a simple DoFn (without Restrictions) based
>>>> implementation
>>>> and evolve from it. More and more IOs at Beam are becoming DoFn based
>>>> (even if
>>>> not SDF) because you win the composability advantages.
>>>>
>>>> An interesting question is when should we start deprecating the Source
>>>> API and
>>>> encourage people to write only DoFn based IOs. I think we are getting
>>>> to the
>>>> maturity point where we can start this discussion.
>>>>
>>>> On Fri, May 15, 2020 at 4:59 PM Luke Cwik <lc...@google.com> wrote:
>>>> >
>>>> > If it is an unbounded source then SDF is a winner since you are not
>>>> giving up anything with it when compared to the legacy UnboundedSource API
>>>> since Dataflow doesn't support dynamic splitting of unbounded SDFs or
>>>> UnboundedSources (only initial splitting). You gain the ability to compose
>>>> sources and the initial splitting is done at pipeline execution for SDFs vs
>>>> pipeline construction time for UnboundedSource.
>>>> >
>>>> > If it is bounded, my gut is to still go with SDF since:
>>>> > * Dataflow runner V2 supports SDF fully
>>>> > * The Java/Python SDF APIs have gone through the majority of churn
>>>> already, there are some minor clean-ups and then I would like to remove the
>>>> @Experimental annotations from them after a discussion on dev@ about it
>>>> > * Being able to compose "sources" is immensely powerful
>>>> >
>>>> > The caveat is that Dataflow runner V1 doesn't support dynamic
>>>> splitting of SDFs today and depending on how well runner v2 rollout
>>>> happens, may never. The big plus with the legacy source API is that there
>>>> are already bounded/unbounded source wrappers that will convert them into
>>>> SDFs so you get all of runner v1 and runner v2 support for what the legacy
>>>> source API can do today but give up the composability and any splitting
>>>> support for unbounded SDFs that will come later.
>>>> >
>>>> > Finally, there is a way to get limited support for dynamic splitting
>>>> of bounded and unbounded SDFs for other runners using the composability of
>>>> SDFs and the limited depth splitting proposal[1].
>>>> >
>>>> > 1:
>>>> https://docs.google.com/document/d/1cKOB9ToasfYs1kLWQgffzvIbJx2Smy4svlodPRhFrk4/edit#heading=h.wkwslng744mv
>>>> >
>>>> > On Fri, May 15, 2020 at 7:08 AM Steve Niemitz <sn...@apache.org>
>>>> wrote:
>>>> >>
>>>> >> I'm going to be writing a new IO (in java) for reading files in a
>>>> custom format, and want to make it splittable.  It seems like I have a
>>>> choice between the "legacy" source API, and newer experimental SDF API.  Is
>>>> there any guidance on which I should use?  I can likely tolerate some API
>>>> churn as well in the SDF APIs.
>>>> >>
>>>> >> My target runner is dataflow.
>>>> >>
>>>> >> Thanks!
>>>>
>>>

Re: Writing a new IO on beam, should I use the source API or SDF?

Posted by Steve Niemitz <sn...@apache.org>.
ah! ok awesome, I think that was the piece I was misunderstanding.  So I
_can_ use a SDF to split the work initially (like I was manually doing in
#1), but it just won't be further split dynamically on dataflow v1 right
now.  Is my understanding there correct?

On Fri, May 15, 2020 at 5:03 PM Luke Cwik <lc...@google.com> wrote:

> #3 is the best when you implement @SplitRestriction on the SDF.
>
> The size of each restriction is used to better balance the splits within
> Dataflow runner v2 so it is less susceptible to the too many or unbalanced
> split problem.
> For example, if you have 4 workers and make 20 splits, the splits will be
> grouped based upon their sizes. So if 19 of those splits are small and 1 is
> big, the 1 will execute by itself while the 19 will be done by the 3 other
> workers.
>
> Also, dynamic work rebalancing isn't meant to replace those initial splits
> but helps a lot with worker rebalancing since a few workers are usually
> stragglers and will need some help at the end of a pipeline.
>
> On Fri, May 15, 2020 at 1:54 PM Steve Niemitz <sn...@apache.org> wrote:
>
>> Thanks for the replies so far.  I should have specifically mentioned
>> above, I am building a bounded source.
>>
>> While I was thinking this through, I realized that I might not actually
>> need any fancy splitting, since I can calculate all my split points up
>> front.  I think this goes well with Ismaël's suggestion as well.
>>
>> I'm curious what the pros and cons would be of these options:
>> 1) Presplit each file into N pieces (based on a target bundle size,
>> similar to how it looks like the avro reader does it), using a
>> standard DoFn to read each split.
>> 2) Presplit, but use a SDF to support further splitting once it's
>> supported in dataflow.  (this would also help if I have files that can't be
>> split up front)
>> 3) Don't pre-split, but use a SDF.
>> 4) Use the source API
>>
>> I think we've covered 2 and 4 pretty well already, but curious
>> specifically about the pre-split approach.  Thanks again so far!
>>
>> On Fri, May 15, 2020 at 1:11 PM Ismaël Mejía <ie...@gmail.com> wrote:
>>
>>> For the Bounded case if you do not have a straight forward way to split
>>> at
>>> fractions, or simply if you do not care about Dynamic Work Rebalancing.
>>> You can
>>> get away implementing a simple DoFn (without Restrictions) based
>>> implementation
>>> and evolve from it. More and more IOs at Beam are becoming DoFn based
>>> (even if
>>> not SDF) because you win the composability advantages.
>>>
>>> An interesting question is when should we start deprecating the Source
>>> API and
>>> encourage people to write only DoFn based IOs. I think we are getting to
>>> the
>>> maturity point where we can start this discussion.
>>>
>>> On Fri, May 15, 2020 at 4:59 PM Luke Cwik <lc...@google.com> wrote:
>>> >
>>> > If it is an unbounded source then SDF is a winner since you are not
>>> giving up anything with it when compared to the legacy UnboundedSource API
>>> since Dataflow doesn't support dynamic splitting of unbounded SDFs or
>>> UnboundedSources (only initial splitting). You gain the ability to compose
>>> sources and the initial splitting is done at pipeline execution for SDFs vs
>>> pipeline construction time for UnboundedSource.
>>> >
>>> > If it is bounded, my gut is to still go with SDF since:
>>> > * Dataflow runner V2 supports SDF fully
>>> > * The Java/Python SDF APIs have gone through the majority of churn
>>> already, there are some minor clean-ups and then I would like to remove the
>>> @Experimental annotations from them after a discussion on dev@ about it
>>> > * Being able to compose "sources" is immensely powerful
>>> >
>>> > The caveat is that Dataflow runner V1 doesn't support dynamic
>>> splitting of SDFs today and depending on how well runner v2 rollout
>>> happens, may never. The big plus with the legacy source API is that there
>>> are already bounded/unbounded source wrappers that will convert them into
>>> SDFs so you get all of runner v1 and runner v2 support for what the legacy
>>> source API can do today but give up the composability and any splitting
>>> support for unbounded SDFs that will come later.
>>> >
>>> > Finally, there is a way to get limited support for dynamic splitting
>>> of bounded and unbounded SDFs for other runners using the composability of
>>> SDFs and the limited depth splitting proposal[1].
>>> >
>>> > 1:
>>> https://docs.google.com/document/d/1cKOB9ToasfYs1kLWQgffzvIbJx2Smy4svlodPRhFrk4/edit#heading=h.wkwslng744mv
>>> >
>>> > On Fri, May 15, 2020 at 7:08 AM Steve Niemitz <sn...@apache.org>
>>> wrote:
>>> >>
>>> >> I'm going to be writing a new IO (in java) for reading files in a
>>> custom format, and want to make it splittable.  It seems like I have a
>>> choice between the "legacy" source API, and newer experimental SDF API.  Is
>>> there any guidance on which I should use?  I can likely tolerate some API
>>> churn as well in the SDF APIs.
>>> >>
>>> >> My target runner is dataflow.
>>> >>
>>> >> Thanks!
>>>
>>

Re: Writing a new IO on beam, should I use the source API or SDF?

Posted by Luke Cwik <lc...@google.com>.
#3 is the best when you implement @SplitRestriction on the SDF.

The size of each restriction is used to better balance the splits within
Dataflow runner v2 so it is less susceptible to the too many or unbalanced
split problem.
For example, if you have 4 workers and make 20 splits, the splits will be
grouped based upon their sizes. So if 19 of those splits are small and 1 is
big, the 1 will execute by itself while the 19 will be done by the 3 other
workers.

Also, dynamic work rebalancing isn't meant to replace those initial splits
but helps a lot with worker rebalancing since a few workers are usually
stragglers and will need some help at the end of a pipeline.

On Fri, May 15, 2020 at 1:54 PM Steve Niemitz <sn...@apache.org> wrote:

> Thanks for the replies so far.  I should have specifically mentioned
> above, I am building a bounded source.
>
> While I was thinking this through, I realized that I might not actually
> need any fancy splitting, since I can calculate all my split points up
> front.  I think this goes well with Ismaël's suggestion as well.
>
> I'm curious what the pros and cons would be of these options:
> 1) Presplit each file into N pieces (based on a target bundle size,
> similar to how it looks like the avro reader does it), using a
> standard DoFn to read each split.
> 2) Presplit, but use a SDF to support further splitting once it's
> supported in dataflow.  (this would also help if I have files that can't be
> split up front)
> 3) Don't pre-split, but use a SDF.
> 4) Use the source API
>
> I think we've covered 2 and 4 pretty well already, but curious
> specifically about the pre-split approach.  Thanks again so far!
>
> On Fri, May 15, 2020 at 1:11 PM Ismaël Mejía <ie...@gmail.com> wrote:
>
>> For the Bounded case if you do not have a straight forward way to split at
>> fractions, or simply if you do not care about Dynamic Work Rebalancing.
>> You can
>> get away implementing a simple DoFn (without Restrictions) based
>> implementation
>> and evolve from it. More and more IOs at Beam are becoming DoFn based
>> (even if
>> not SDF) because you win the composability advantages.
>>
>> An interesting question is when should we start deprecating the Source
>> API and
>> encourage people to write only DoFn based IOs. I think we are getting to
>> the
>> maturity point where we can start this discussion.
>>
>> On Fri, May 15, 2020 at 4:59 PM Luke Cwik <lc...@google.com> wrote:
>> >
>> > If it is an unbounded source then SDF is a winner since you are not
>> giving up anything with it when compared to the legacy UnboundedSource API
>> since Dataflow doesn't support dynamic splitting of unbounded SDFs or
>> UnboundedSources (only initial splitting). You gain the ability to compose
>> sources and the initial splitting is done at pipeline execution for SDFs vs
>> pipeline construction time for UnboundedSource.
>> >
>> > If it is bounded, my gut is to still go with SDF since:
>> > * Dataflow runner V2 supports SDF fully
>> > * The Java/Python SDF APIs have gone through the majority of churn
>> already, there are some minor clean-ups and then I would like to remove the
>> @Experimental annotations from them after a discussion on dev@ about it
>> > * Being able to compose "sources" is immensely powerful
>> >
>> > The caveat is that Dataflow runner V1 doesn't support dynamic splitting
>> of SDFs today and depending on how well runner v2 rollout happens, may
>> never. The big plus with the legacy source API is that there are already
>> bounded/unbounded source wrappers that will convert them into SDFs so you
>> get all of runner v1 and runner v2 support for what the legacy source API
>> can do today but give up the composability and any splitting support for
>> unbounded SDFs that will come later.
>> >
>> > Finally, there is a way to get limited support for dynamic splitting of
>> bounded and unbounded SDFs for other runners using the composability of
>> SDFs and the limited depth splitting proposal[1].
>> >
>> > 1:
>> https://docs.google.com/document/d/1cKOB9ToasfYs1kLWQgffzvIbJx2Smy4svlodPRhFrk4/edit#heading=h.wkwslng744mv
>> >
>> > On Fri, May 15, 2020 at 7:08 AM Steve Niemitz <sn...@apache.org>
>> wrote:
>> >>
>> >> I'm going to be writing a new IO (in java) for reading files in a
>> custom format, and want to make it splittable.  It seems like I have a
>> choice between the "legacy" source API, and newer experimental SDF API.  Is
>> there any guidance on which I should use?  I can likely tolerate some API
>> churn as well in the SDF APIs.
>> >>
>> >> My target runner is dataflow.
>> >>
>> >> Thanks!
>>
>

Re: Writing a new IO on beam, should I use the source API or SDF?

Posted by Steve Niemitz <sn...@apache.org>.
Thanks for the replies so far.  I should have specifically mentioned above,
I am building a bounded source.

While I was thinking this through, I realized that I might not actually
need any fancy splitting, since I can calculate all my split points up
front.  I think this goes well with Ismaël's suggestion as well.

I'm curious what the pros and cons would be of these options:
1) Presplit each file into N pieces (based on a target bundle size, similar
to how it looks like the avro reader does it), using a standard DoFn to
read each split.
2) Presplit, but use a SDF to support further splitting once it's supported
in dataflow.  (this would also help if I have files that can't be split up
front)
3) Don't pre-split, but use a SDF.
4) Use the source API

I think we've covered 2 and 4 pretty well already, but curious specifically
about the pre-split approach.  Thanks again so far!

On Fri, May 15, 2020 at 1:11 PM Ismaël Mejía <ie...@gmail.com> wrote:

> For the Bounded case if you do not have a straight forward way to split at
> fractions, or simply if you do not care about Dynamic Work Rebalancing.
> You can
> get away implementing a simple DoFn (without Restrictions) based
> implementation
> and evolve from it. More and more IOs at Beam are becoming DoFn based
> (even if
> not SDF) because you win the composability advantages.
>
> An interesting question is when should we start deprecating the Source API
> and
> encourage people to write only DoFn based IOs. I think we are getting to
> the
> maturity point where we can start this discussion.
>
> On Fri, May 15, 2020 at 4:59 PM Luke Cwik <lc...@google.com> wrote:
> >
> > If it is an unbounded source then SDF is a winner since you are not
> giving up anything with it when compared to the legacy UnboundedSource API
> since Dataflow doesn't support dynamic splitting of unbounded SDFs or
> UnboundedSources (only initial splitting). You gain the ability to compose
> sources and the initial splitting is done at pipeline execution for SDFs vs
> pipeline construction time for UnboundedSource.
> >
> > If it is bounded, my gut is to still go with SDF since:
> > * Dataflow runner V2 supports SDF fully
> > * The Java/Python SDF APIs have gone through the majority of churn
> already, there are some minor clean-ups and then I would like to remove the
> @Experimental annotations from them after a discussion on dev@ about it
> > * Being able to compose "sources" is immensely powerful
> >
> > The caveat is that Dataflow runner V1 doesn't support dynamic splitting
> of SDFs today and depending on how well runner v2 rollout happens, may
> never. The big plus with the legacy source API is that there are already
> bounded/unbounded source wrappers that will convert them into SDFs so you
> get all of runner v1 and runner v2 support for what the legacy source API
> can do today but give up the composability and any splitting support for
> unbounded SDFs that will come later.
> >
> > Finally, there is a way to get limited support for dynamic splitting of
> bounded and unbounded SDFs for other runners using the composability of
> SDFs and the limited depth splitting proposal[1].
> >
> > 1:
> https://docs.google.com/document/d/1cKOB9ToasfYs1kLWQgffzvIbJx2Smy4svlodPRhFrk4/edit#heading=h.wkwslng744mv
> >
> > On Fri, May 15, 2020 at 7:08 AM Steve Niemitz <sn...@apache.org>
> wrote:
> >>
> >> I'm going to be writing a new IO (in java) for reading files in a
> custom format, and want to make it splittable.  It seems like I have a
> choice between the "legacy" source API, and newer experimental SDF API.  Is
> there any guidance on which I should use?  I can likely tolerate some API
> churn as well in the SDF APIs.
> >>
> >> My target runner is dataflow.
> >>
> >> Thanks!
>

Re: Writing a new IO on beam, should I use the source API or SDF?

Posted by Ismaël Mejía <ie...@gmail.com>.
For the Bounded case if you do not have a straight forward way to split at
fractions, or simply if you do not care about Dynamic Work Rebalancing. You can
get away implementing a simple DoFn (without Restrictions) based implementation
and evolve from it. More and more IOs at Beam are becoming DoFn based (even if
not SDF) because you win the composability advantages.

An interesting question is when should we start deprecating the Source API and
encourage people to write only DoFn based IOs. I think we are getting to the
maturity point where we can start this discussion.

On Fri, May 15, 2020 at 4:59 PM Luke Cwik <lc...@google.com> wrote:
>
> If it is an unbounded source then SDF is a winner since you are not giving up anything with it when compared to the legacy UnboundedSource API since Dataflow doesn't support dynamic splitting of unbounded SDFs or UnboundedSources (only initial splitting). You gain the ability to compose sources and the initial splitting is done at pipeline execution for SDFs vs pipeline construction time for UnboundedSource.
>
> If it is bounded, my gut is to still go with SDF since:
> * Dataflow runner V2 supports SDF fully
> * The Java/Python SDF APIs have gone through the majority of churn already, there are some minor clean-ups and then I would like to remove the @Experimental annotations from them after a discussion on dev@ about it
> * Being able to compose "sources" is immensely powerful
>
> The caveat is that Dataflow runner V1 doesn't support dynamic splitting of SDFs today and depending on how well runner v2 rollout happens, may never. The big plus with the legacy source API is that there are already bounded/unbounded source wrappers that will convert them into SDFs so you get all of runner v1 and runner v2 support for what the legacy source API can do today but give up the composability and any splitting support for unbounded SDFs that will come later.
>
> Finally, there is a way to get limited support for dynamic splitting of bounded and unbounded SDFs for other runners using the composability of SDFs and the limited depth splitting proposal[1].
>
> 1: https://docs.google.com/document/d/1cKOB9ToasfYs1kLWQgffzvIbJx2Smy4svlodPRhFrk4/edit#heading=h.wkwslng744mv
>
> On Fri, May 15, 2020 at 7:08 AM Steve Niemitz <sn...@apache.org> wrote:
>>
>> I'm going to be writing a new IO (in java) for reading files in a custom format, and want to make it splittable.  It seems like I have a choice between the "legacy" source API, and newer experimental SDF API.  Is there any guidance on which I should use?  I can likely tolerate some API churn as well in the SDF APIs.
>>
>> My target runner is dataflow.
>>
>> Thanks!

Re: Writing a new IO on beam, should I use the source API or SDF?

Posted by Luke Cwik <lc...@google.com>.
If it is an unbounded source then SDF is a winner since you are not giving
up anything with it when compared to the legacy UnboundedSource API since
Dataflow doesn't support dynamic splitting of unbounded SDFs or
UnboundedSources (only initial splitting). You gain the ability to compose
sources and the initial splitting is done at pipeline execution for SDFs vs
pipeline construction time for UnboundedSource.

If it is bounded, my gut is to still go with SDF since:
* Dataflow runner V2 supports SDF fully
* The Java/Python SDF APIs have gone through the majority of churn already,
there are some minor clean-ups and then I would like to remove the
@Experimental annotations from them after a discussion on dev@ about it
* Being able to compose "sources" is immensely powerful

The caveat is that Dataflow runner V1 doesn't support dynamic splitting of
SDFs today and depending on how well runner v2 rollout happens, may never.
The big plus with the legacy source API is that there are already
bounded/unbounded source wrappers that will convert them into SDFs so you
get all of runner v1 and runner v2 support for what the legacy source API
can do today but give up the composability and any splitting support for
unbounded SDFs that will come later.

Finally, there is a way to get limited support for dynamic splitting of
bounded and unbounded SDFs for other runners using the composability of
SDFs and the limited depth splitting proposal[1].

1:
https://docs.google.com/document/d/1cKOB9ToasfYs1kLWQgffzvIbJx2Smy4svlodPRhFrk4/edit#heading=h.wkwslng744mv

On Fri, May 15, 2020 at 7:08 AM Steve Niemitz <sn...@apache.org> wrote:

> I'm going to be writing a new IO (in java) for reading files in a custom
> format, and want to make it splittable.  It seems like I have a choice
> between the "legacy" source API, and newer experimental SDF API.  Is there
> any guidance on which I should use?  I can likely tolerate some API churn
> as well in the SDF APIs.
>
> My target runner is dataflow.
>
> Thanks!
>