You are viewing a plain text version of this content. The canonical link for it is here.
Posted to dev@beam.apache.org by Shen Li <cs...@gmail.com> on 2018/04/02 20:41:11 UTC

How can runners make use of sink parallelism?

Hi,

It seems that there is no Sink base class. Some IO connectors (e.g.,
KafkaIO and TextIO) provide a getNumShard() API. But it is not generally
available for all existing Beam IO connectors and potential custom
ones. Although
some IO connectors are implemented using ParDo/GBK, it is unclear whether
the runner can directly parallelize those transforms (e.g., what if it only
writes to a single file). Is there a general way for runners to take
advantage of sink parallelism?

Thanks,
Shen

Re: How can runners make use of sink parallelism?

Posted by Shen Li <cs...@gmail.com>.
Thanks for the explanation!

Shen

On Wed, Apr 4, 2018 at 12:38 AM, Eugene Kirpichov <ki...@google.com>
wrote:

> Hi Shen,
>
> There is no "IO connector API" in Beam (not counting the deprecated Source
> API), IO is merely an informal term for a PTransform that interacts in some
> way with some external storage system. So whatever question you're asking
> about IO connectors, you might as well be asking it about PTransforms in
> general. See https://conferences.oreilly.com/strata/strata-ca/
> public/schedule/detail/63696
>
> To answer your question, then: is it responsibility of a PTransform author
> to make sure their code works correctly when different elements of various
> PCollection's are processed by downstream ParDo's in parallel? Yes, of
> course.
>
> Things like "writing to a single file" are simply implemented by
> non-parallel code - e.g. GBK the data onto a single key, and write a ParDo
> that takes the single KV<K, Iterable<V>> and writes the Iterable to the
> file. This is, by definition, sequential (modulo windowing/triggering -
> different windows and different firings for the same key can still be
> processed in parallel).
>
> On Tue, Apr 3, 2018 at 8:56 PM Shen Li <cs...@gmail.com> wrote:
>
>> Hi Kenn,
>>
>> Thanks for the response.
>>
>> I haven't hit any specific issue yet. I think if the IO connector
>> implementation does take parallelism into consideration, runners can
>> parallelize primitive transforms in the connector (key-partitioned for GBK
>> and stateful ParDo, and round robin for stateless ParDo). For example,
>> TextIO first writes a temp file for every bundle, then uses a void key to
>> prevent parallelism, and then finalizes the result. It should work properly
>> in a distributed environment.
>>
>> But applications can provide any custom IO connectors, and the runner
>> does not know whether a connector can be safely parallelized. Can I assume
>> that it is the applications' responsibility to make sure their IO connector
>> works correctly when running in parallel?
>>
>> Thanks,
>> Shen
>>
>> On Tue, Apr 3, 2018 at 6:11 PM, Kenneth Knowles <kl...@google.com> wrote:
>>
>>> The runner should generally not need to be aware of any getNumShard()
>>> API on a connector. The connector itself is probably a composite transform
>>> (with a ParDo or two or three somewhere doing the actual writes) and should
>>> be designed to expose available parallelism. Specifying the number of
>>> shards actually usually limits the parallelism, versus letting the runner
>>> use the maximum allowed parallelism.
>>>
>>> If the connector does a GBK to gather input elements into a single
>>> iterable, then it is a single element and cannot be processed in parallel
>>> (except through splittable DoFn, but in that case you may not need to do
>>> the GBK in the first place). And converse to that, if the connector does
>>> not do a GBK to gather input elements, then the runner is permitted to
>>> bundle them any way it wants and process all of them as though in parallel
>>> (except for stateful DoFn, in which case you probably don't need the GBK).
>>>
>>> Bundling is an important way that this works, too, since the
>>> @FinishBundle method is really a "flush" method, with @ProcessElement
>>> perhaps buffering up elements to be written to e.g. the same file shard. It
>>> is not this simple in practice but that gives the idea of how even with
>>> unrestricted elementwise parallelism you don't get one shard per element.
>>>
>>> These are all just ideas, and I'm not the connector expert. But I think
>>> the TL;DR is that a runner shouldn't need to know this - have you hit
>>> specific issues with a particular connector? That could make this a very
>>> productive discussion.
>>>
>>> Kenn
>>>
>>> On Mon, Apr 2, 2018 at 1:41 PM Shen Li <cs...@gmail.com> wrote:
>>>
>>>> Hi,
>>>>
>>>> It seems that there is no Sink base class. Some IO connectors (e.g.,
>>>> KafkaIO and TextIO) provide a getNumShard() API. But it is not generally
>>>> available for all existing Beam IO connectors and potential custom ones. Although
>>>> some IO connectors are implemented using ParDo/GBK, it is unclear whether
>>>> the runner can directly parallelize those transforms (e.g., what if it only
>>>> writes to a single file). Is there a general way for runners to take
>>>> advantage of sink parallelism?
>>>>
>>>> Thanks,
>>>> Shen
>>>>
>>>>
>>>>
>>

Re: How can runners make use of sink parallelism?

Posted by Eugene Kirpichov <ki...@google.com>.
Hi Shen,

There is no "IO connector API" in Beam (not counting the deprecated Source
API), IO is merely an informal term for a PTransform that interacts in some
way with some external storage system. So whatever question you're asking
about IO connectors, you might as well be asking it about PTransforms in
general. See
https://conferences.oreilly.com/strata/strata-ca/public/schedule/detail/63696


To answer your question, then: is it responsibility of a PTransform author
to make sure their code works correctly when different elements of various
PCollection's are processed by downstream ParDo's in parallel? Yes, of
course.

Things like "writing to a single file" are simply implemented by
non-parallel code - e.g. GBK the data onto a single key, and write a ParDo
that takes the single KV<K, Iterable<V>> and writes the Iterable to the
file. This is, by definition, sequential (modulo windowing/triggering -
different windows and different firings for the same key can still be
processed in parallel).

On Tue, Apr 3, 2018 at 8:56 PM Shen Li <cs...@gmail.com> wrote:

> Hi Kenn,
>
> Thanks for the response.
>
> I haven't hit any specific issue yet. I think if the IO connector
> implementation does take parallelism into consideration, runners can
> parallelize primitive transforms in the connector (key-partitioned for GBK
> and stateful ParDo, and round robin for stateless ParDo). For example,
> TextIO first writes a temp file for every bundle, then uses a void key to
> prevent parallelism, and then finalizes the result. It should work properly
> in a distributed environment.
>
> But applications can provide any custom IO connectors, and the runner does
> not know whether a connector can be safely parallelized. Can I assume that
> it is the applications' responsibility to make sure their IO connector
> works correctly when running in parallel?
>
> Thanks,
> Shen
>
> On Tue, Apr 3, 2018 at 6:11 PM, Kenneth Knowles <kl...@google.com> wrote:
>
>> The runner should generally not need to be aware of any getNumShard() API
>> on a connector. The connector itself is probably a composite transform
>> (with a ParDo or two or three somewhere doing the actual writes) and should
>> be designed to expose available parallelism. Specifying the number of
>> shards actually usually limits the parallelism, versus letting the runner
>> use the maximum allowed parallelism.
>>
>> If the connector does a GBK to gather input elements into a single
>> iterable, then it is a single element and cannot be processed in parallel
>> (except through splittable DoFn, but in that case you may not need to do
>> the GBK in the first place). And converse to that, if the connector does
>> not do a GBK to gather input elements, then the runner is permitted to
>> bundle them any way it wants and process all of them as though in parallel
>> (except for stateful DoFn, in which case you probably don't need the GBK).
>>
>> Bundling is an important way that this works, too, since the
>> @FinishBundle method is really a "flush" method, with @ProcessElement
>> perhaps buffering up elements to be written to e.g. the same file shard. It
>> is not this simple in practice but that gives the idea of how even with
>> unrestricted elementwise parallelism you don't get one shard per element.
>>
>> These are all just ideas, and I'm not the connector expert. But I think
>> the TL;DR is that a runner shouldn't need to know this - have you hit
>> specific issues with a particular connector? That could make this a very
>> productive discussion.
>>
>> Kenn
>>
>> On Mon, Apr 2, 2018 at 1:41 PM Shen Li <cs...@gmail.com> wrote:
>>
>>> Hi,
>>>
>>> It seems that there is no Sink base class. Some IO connectors (e.g.,
>>> KafkaIO and TextIO) provide a getNumShard() API. But it is not generally
>>> available for all existing Beam IO connectors and potential custom ones. Although
>>> some IO connectors are implemented using ParDo/GBK, it is unclear whether
>>> the runner can directly parallelize those transforms (e.g., what if it only
>>> writes to a single file). Is there a general way for runners to take
>>> advantage of sink parallelism?
>>>
>>> Thanks,
>>> Shen
>>>
>>>
>>>
>

Re: How can runners make use of sink parallelism?

Posted by Shen Li <cs...@gmail.com>.
Hi Kenn,

Thanks for the response.

I haven't hit any specific issue yet. I think if the IO connector
implementation does take parallelism into consideration, runners can
parallelize primitive transforms in the connector (key-partitioned for GBK
and stateful ParDo, and round robin for stateless ParDo). For example,
TextIO first writes a temp file for every bundle, then uses a void key to
prevent parallelism, and then finalizes the result. It should work properly
in a distributed environment.

But applications can provide any custom IO connectors, and the runner does
not know whether a connector can be safely parallelized. Can I assume that
it is the applications' responsibility to make sure their IO connector
works correctly when running in parallel?

Thanks,
Shen

On Tue, Apr 3, 2018 at 6:11 PM, Kenneth Knowles <kl...@google.com> wrote:

> The runner should generally not need to be aware of any getNumShard() API
> on a connector. The connector itself is probably a composite transform
> (with a ParDo or two or three somewhere doing the actual writes) and should
> be designed to expose available parallelism. Specifying the number of
> shards actually usually limits the parallelism, versus letting the runner
> use the maximum allowed parallelism.
>
> If the connector does a GBK to gather input elements into a single
> iterable, then it is a single element and cannot be processed in parallel
> (except through splittable DoFn, but in that case you may not need to do
> the GBK in the first place). And converse to that, if the connector does
> not do a GBK to gather input elements, then the runner is permitted to
> bundle them any way it wants and process all of them as though in parallel
> (except for stateful DoFn, in which case you probably don't need the GBK).
>
> Bundling is an important way that this works, too, since the @FinishBundle
> method is really a "flush" method, with @ProcessElement perhaps buffering
> up elements to be written to e.g. the same file shard. It is not this
> simple in practice but that gives the idea of how even with unrestricted
> elementwise parallelism you don't get one shard per element.
>
> These are all just ideas, and I'm not the connector expert. But I think
> the TL;DR is that a runner shouldn't need to know this - have you hit
> specific issues with a particular connector? That could make this a very
> productive discussion.
>
> Kenn
>
> On Mon, Apr 2, 2018 at 1:41 PM Shen Li <cs...@gmail.com> wrote:
>
>> Hi,
>>
>> It seems that there is no Sink base class. Some IO connectors (e.g.,
>> KafkaIO and TextIO) provide a getNumShard() API. But it is not generally
>> available for all existing Beam IO connectors and potential custom ones. Although
>> some IO connectors are implemented using ParDo/GBK, it is unclear whether
>> the runner can directly parallelize those transforms (e.g., what if it only
>> writes to a single file). Is there a general way for runners to take
>> advantage of sink parallelism?
>>
>> Thanks,
>> Shen
>>
>>
>>

Re: How can runners make use of sink parallelism?

Posted by Kenneth Knowles <kl...@google.com>.
The runner should generally not need to be aware of any getNumShard() API
on a connector. The connector itself is probably a composite transform
(with a ParDo or two or three somewhere doing the actual writes) and should
be designed to expose available parallelism. Specifying the number of
shards actually usually limits the parallelism, versus letting the runner
use the maximum allowed parallelism.

If the connector does a GBK to gather input elements into a single
iterable, then it is a single element and cannot be processed in parallel
(except through splittable DoFn, but in that case you may not need to do
the GBK in the first place). And converse to that, if the connector does
not do a GBK to gather input elements, then the runner is permitted to
bundle them any way it wants and process all of them as though in parallel
(except for stateful DoFn, in which case you probably don't need the GBK).

Bundling is an important way that this works, too, since the @FinishBundle
method is really a "flush" method, with @ProcessElement perhaps buffering
up elements to be written to e.g. the same file shard. It is not this
simple in practice but that gives the idea of how even with unrestricted
elementwise parallelism you don't get one shard per element.

These are all just ideas, and I'm not the connector expert. But I think the
TL;DR is that a runner shouldn't need to know this - have you hit specific
issues with a particular connector? That could make this a very productive
discussion.

Kenn

On Mon, Apr 2, 2018 at 1:41 PM Shen Li <cs...@gmail.com> wrote:

> Hi,
>
> It seems that there is no Sink base class. Some IO connectors (e.g.,
> KafkaIO and TextIO) provide a getNumShard() API. But it is not generally
> available for all existing Beam IO connectors and potential custom ones. Although
> some IO connectors are implemented using ParDo/GBK, it is unclear whether
> the runner can directly parallelize those transforms (e.g., what if it only
> writes to a single file). Is there a general way for runners to take
> advantage of sink parallelism?
>
> Thanks,
> Shen
>
>
>