You are viewing a plain text version of this content. The canonical link for it is here.
Posted to dev@beam.apache.org by Matthew Jadczak <mn...@cam.ac.uk> on 2017/01/25 17:36:24 UTC

Conceptually, what are bundles?

Hi,

I’m a finalist CompSci student at the University of Cambridge, and for my final project/dissertation I am writing an implementation of the Beam SDK in Elixir [1]. Given that the Beam project is obviously still very much WIP, it’s still somewhat difficult to find good conceptual overviews of parts of the system, which is crucial when translating the OOP architecture to something completely different. However I have found many of the design docs scattered around the JIRA and here very helpful. (Incidentally, perhaps it would be helpful to maintain a list of them, to help any contributors acquaint themselves with the conceptual vision of the implementation?)

One thing which I have not yet been able to work out is the significance of “bundles” in the SDK. On the one hand, it seems that they are simply an implementation detail, effectively a way to do micro-batch processing efficiently, and indeed they are not mentioned at all in the original Dataflow paper or anywhere in the Beam docs (except in passing). On the other hand, it seems most of the key transforms in the SDK core have a concept of bundles and operate in their terms in practice, while all conceptually being described as just operating on elements.

Do bundles have semantic meaning in the Beam Model? Are there any guidelines as to how a given transform should split its output up into bundles? Should any runner/SDK implementing the Model have that concept, even when other primitives for streaming data processing including things like efficiently transmitting individual elements between stages with backpressure are available in the language/standard libraries? Are there any insights here that I am missing, i.e. were problems present in early versions of the runners solved by adding the concept of bundles?

Thanks so much,
Matt

[1] http://elixir-lang.org/

Re: Conceptually, what are bundles?

Posted by Etienne Chauchot <ec...@gmail.com>.

Le 25/01/2017 � 20:34, Kenneth Knowles a �crit :
> There's actually not a JIRA filed beyond BEAM-25 for what Eugene is
> referring to. Context: Prior to windowing and streaming it was safe to
> buffer elements in @ProcessElement and then actually perform output in
> @FinishBundle. This pattern is only suitable for global windowing, flushing
> to external systems, or requires perhaps complex manual window hackery. So
> now we'll need a new callback @OnWindowExpiration that occurs
> per-resident-window, before @FinishBundle, for producing output based on
> remaining state before it is discarded.
+1 This is exactly what I need for BEAM-135. Lets imagine that we have a 
collection of elements artificially timestamped every 10 seconds and a 
fixed windowing of 1 minute. Then each window contains 6 elements. If we 
were to buffer the elements by batches of 5 elements, then for each 
window we expect to get 2 batches (one of 5 elements, one of 1 element). 
For that to append, we need the @OnWindowExpiration on the DoFn to 
materialize the batch of 1 element.
>
>
> On Wed, Jan 25, 2017 at 11:00 AM, Eugene Kirpichov <ki...@google.com>
> wrote:
>
>> One more thing.
>>
>> I think ideally, bundles should not leak into the model at all - e.g.
>> ideally, startBundle/finishBundle methods in DoFn should not exist. They
>> interact poorly with windowing.
>> The proper way to address what is commonly done in these methods is either
>> Setup/Teardown methods, or a (to be designed) "window close" callback -
>> there's a JIRA about the latter but I couldn't find it, perhaps +Kenn
>> Knowles <kl...@google.com> remembers what it is.
>>
>> On Wed, Jan 25, 2017 at 10:41 AM Amit Sela <am...@gmail.com> wrote:
>>
>>> On Wed, Jan 25, 2017 at 8:23 PM Thomas Groh <tg...@google.com.invalid>
>>> wrote:
>>>
>>>> I have a couple of points in addition to what Robert said
>>>>
>>>> Runners are permitted to determine bundle sizes as appropriate to their
>>>> implementation, so long as bundles are atomically committed. The
>>> contents
>>>> of a PCollection are independent of the bundling of that PCollection.
>>>>
>>>> Runners can process all elements within their own bundles (e.g.
>>>> https://github.com/apache/beam/blob/a6810372b003adf24bdbe34ed764a6
>>>>
>>>> 3841af9b99/runners/flink/runner/src/main/java/org/
>>> apache/beam/runners/flink/
>>>> translation/wrappers/streaming/DoFnOperator.java#L289
>>>> <https://github.com/apache/beam/blob/a6810372b003adf24bdbe34ed764a6
>>> 3841af9b99/runners/flink/runner/src/main/java/org/
>>> apache/beam/runners/flink/translation/wrappers/
>>> streaming/DoFnOperator.java#L289>),
>>>> the entire input
>>>> data, or anywhere in between.
>>>>
>>> Or, as Thomas mentioned, a runner could process an entire
>>> <https://github.com/apache/beam/blob/master/runners/
>>> spark/src/main/java/org/apache/beam/runners/spark/translation/
>>> SparkProcessContext.java#L57>
>>> partition of the data as a bundle. It basically depends on the runner's
>>> internal processing model.
>>>
>>>> On Wed, Jan 25, 2017 at 10:05 AM, Robert Bradshaw <
>>>> robertwb@google.com.invalid> wrote:
>>>>
>>>>> Bundles are simply the unit of commitment (retry) in the Beam SDK.
>>>>> They're not really a model concept, but do leak from the
>>>>> implementation into the API as it's not feasible to checkpoint every
>>>>> individual process call, and this allows some state/compute/... to be
>>>>> safely amortized across elements (either the results of all processed
>>>>> elements in a bundle are sent downstream, or none are and the entire
>>>>> bundle is retried).
>>>>>
>>>>> On Wed, Jan 25, 2017 at 9:36 AM, Matthew Jadczak <mn...@cam.ac.uk>
>>>> wrote:
>>>>>> Hi,
>>>>>>
>>>>>> I\u2019m a finalist CompSci student at the University of Cambridge, and
>>> for
>>>>> my final project/dissertation I am writing an implementation of the
>>> Beam
>>>>> SDK in Elixir [1]. Given that the Beam project is obviously still very
>>>> much
>>>>> WIP, it\u2019s still somewhat difficult to find good conceptual overviews
>>> of
>>>>> parts of the system, which is crucial when translating the OOP
>>>> architecture
>>>>> to something completely different. However I have found many of the
>>>> design
>>>>> docs scattered around the JIRA and here very helpful. (Incidentally,
>>>>> perhaps it would be helpful to maintain a list of them, to help any
>>>>> contributors acquaint themselves with the conceptual vision of the
>>>>> implementation?)
>>>>>> One thing which I have not yet been able to work out is the
>>>> significance
>>>>> of \u201cbundles\u201d in the SDK. On the one hand, it seems that they are
>>> simply
>>>> an
>>>>> implementation detail, effectively a way to do micro-batch processing
>>>>> efficiently, and indeed they are not mentioned at all in the original
>>>>> Dataflow paper or anywhere in the Beam docs (except in passing). On
>>> the
>>>>> other hand, it seems most of the key transforms in the SDK core have a
>>>>> concept of bundles and operate in their terms in practice, while all
>>>>> conceptually being described as just operating on elements.
>>>>>> Do bundles have semantic meaning in the Beam Model? Are there any
>>>>> guidelines as to how a given transform should split its output up into
>>>>> bundles? Should any runner/SDK implementing the Model have that
>>> concept,
>>>>> even when other primitives for streaming data processing including
>>> things
>>>>> like efficiently transmitting individual elements between stages with
>>>>> backpressure are available in the language/standard libraries? Are
>>> there
>>>>> any insights here that I am missing, i.e. were problems present in
>>> early
>>>>> versions of the runners solved by adding the concept of bundles?
>>>>>> Thanks so much,
>>>>>> Matt
>>>>>>
>>>>>> [1] http://elixir-lang.org/


Re: Conceptually, what are bundles?

Posted by Jean-Baptiste Onofré <jb...@nanthrax.net>.
It makes sense.

Agreed.

Regards
JB

On 01/25/2017 08:34 PM, Kenneth Knowles wrote:
> There's actually not a JIRA filed beyond BEAM-25 for what Eugene is
> referring to. Context: Prior to windowing and streaming it was safe to
> buffer elements in @ProcessElement and then actually perform output in
> @FinishBundle. This pattern is only suitable for global windowing, flushing
> to external systems, or requires perhaps complex manual window hackery. So
> now we'll need a new callback @OnWindowExpiration that occurs
> per-resident-window, before @FinishBundle, for producing output based on
> remaining state before it is discarded.
>
>
> On Wed, Jan 25, 2017 at 11:00 AM, Eugene Kirpichov <ki...@google.com>
> wrote:
>
>> One more thing.
>>
>> I think ideally, bundles should not leak into the model at all - e.g.
>> ideally, startBundle/finishBundle methods in DoFn should not exist. They
>> interact poorly with windowing.
>> The proper way to address what is commonly done in these methods is either
>> Setup/Teardown methods, or a (to be designed) "window close" callback -
>> there's a JIRA about the latter but I couldn't find it, perhaps +Kenn
>> Knowles <kl...@google.com> remembers what it is.
>>
>> On Wed, Jan 25, 2017 at 10:41 AM Amit Sela <am...@gmail.com> wrote:
>>
>>> On Wed, Jan 25, 2017 at 8:23 PM Thomas Groh <tg...@google.com.invalid>
>>> wrote:
>>>
>>>> I have a couple of points in addition to what Robert said
>>>>
>>>> Runners are permitted to determine bundle sizes as appropriate to their
>>>> implementation, so long as bundles are atomically committed. The
>>> contents
>>>> of a PCollection are independent of the bundling of that PCollection.
>>>>
>>>> Runners can process all elements within their own bundles (e.g.
>>>> https://github.com/apache/beam/blob/a6810372b003adf24bdbe34ed764a6
>>>>
>>>> 3841af9b99/runners/flink/runner/src/main/java/org/
>>> apache/beam/runners/flink/
>>>> translation/wrappers/streaming/DoFnOperator.java#L289
>>>> <https://github.com/apache/beam/blob/a6810372b003adf24bdbe34ed764a6
>>> 3841af9b99/runners/flink/runner/src/main/java/org/
>>> apache/beam/runners/flink/translation/wrappers/
>>> streaming/DoFnOperator.java#L289>),
>>>> the entire input
>>>> data, or anywhere in between.
>>>>
>>> Or, as Thomas mentioned, a runner could process an entire
>>> <https://github.com/apache/beam/blob/master/runners/
>>> spark/src/main/java/org/apache/beam/runners/spark/translation/
>>> SparkProcessContext.java#L57>
>>> partition of the data as a bundle. It basically depends on the runner's
>>> internal processing model.
>>>
>>>>
>>>> On Wed, Jan 25, 2017 at 10:05 AM, Robert Bradshaw <
>>>> robertwb@google.com.invalid> wrote:
>>>>
>>>>> Bundles are simply the unit of commitment (retry) in the Beam SDK.
>>>>> They're not really a model concept, but do leak from the
>>>>> implementation into the API as it's not feasible to checkpoint every
>>>>> individual process call, and this allows some state/compute/... to be
>>>>> safely amortized across elements (either the results of all processed
>>>>> elements in a bundle are sent downstream, or none are and the entire
>>>>> bundle is retried).
>>>>>
>>>>> On Wed, Jan 25, 2017 at 9:36 AM, Matthew Jadczak <mn...@cam.ac.uk>
>>>> wrote:
>>>>>> Hi,
>>>>>>
>>>>>> I\u2019m a finalist CompSci student at the University of Cambridge, and
>>> for
>>>>> my final project/dissertation I am writing an implementation of the
>>> Beam
>>>>> SDK in Elixir [1]. Given that the Beam project is obviously still very
>>>> much
>>>>> WIP, it\u2019s still somewhat difficult to find good conceptual overviews
>>> of
>>>>> parts of the system, which is crucial when translating the OOP
>>>> architecture
>>>>> to something completely different. However I have found many of the
>>>> design
>>>>> docs scattered around the JIRA and here very helpful. (Incidentally,
>>>>> perhaps it would be helpful to maintain a list of them, to help any
>>>>> contributors acquaint themselves with the conceptual vision of the
>>>>> implementation?)
>>>>>>
>>>>>> One thing which I have not yet been able to work out is the
>>>> significance
>>>>> of \u201cbundles\u201d in the SDK. On the one hand, it seems that they are
>>> simply
>>>> an
>>>>> implementation detail, effectively a way to do micro-batch processing
>>>>> efficiently, and indeed they are not mentioned at all in the original
>>>>> Dataflow paper or anywhere in the Beam docs (except in passing). On
>>> the
>>>>> other hand, it seems most of the key transforms in the SDK core have a
>>>>> concept of bundles and operate in their terms in practice, while all
>>>>> conceptually being described as just operating on elements.
>>>>>>
>>>>>> Do bundles have semantic meaning in the Beam Model? Are there any
>>>>> guidelines as to how a given transform should split its output up into
>>>>> bundles? Should any runner/SDK implementing the Model have that
>>> concept,
>>>>> even when other primitives for streaming data processing including
>>> things
>>>>> like efficiently transmitting individual elements between stages with
>>>>> backpressure are available in the language/standard libraries? Are
>>> there
>>>>> any insights here that I am missing, i.e. were problems present in
>>> early
>>>>> versions of the runners solved by adding the concept of bundles?
>>>>>>
>>>>>> Thanks so much,
>>>>>> Matt
>>>>>>
>>>>>> [1] http://elixir-lang.org/
>>>>>
>>>>
>>>
>>
>

-- 
Jean-Baptiste Onofr�
jbonofre@apache.org
http://blog.nanthrax.net
Talend - http://www.talend.com

Re: Conceptually, what are bundles?

Posted by Kenneth Knowles <kl...@google.com.INVALID>.
There's actually not a JIRA filed beyond BEAM-25 for what Eugene is
referring to. Context: Prior to windowing and streaming it was safe to
buffer elements in @ProcessElement and then actually perform output in
@FinishBundle. This pattern is only suitable for global windowing, flushing
to external systems, or requires perhaps complex manual window hackery. So
now we'll need a new callback @OnWindowExpiration that occurs
per-resident-window, before @FinishBundle, for producing output based on
remaining state before it is discarded.


On Wed, Jan 25, 2017 at 11:00 AM, Eugene Kirpichov <ki...@google.com>
wrote:

> One more thing.
>
> I think ideally, bundles should not leak into the model at all - e.g.
> ideally, startBundle/finishBundle methods in DoFn should not exist. They
> interact poorly with windowing.
> The proper way to address what is commonly done in these methods is either
> Setup/Teardown methods, or a (to be designed) "window close" callback -
> there's a JIRA about the latter but I couldn't find it, perhaps +Kenn
> Knowles <kl...@google.com> remembers what it is.
>
> On Wed, Jan 25, 2017 at 10:41 AM Amit Sela <am...@gmail.com> wrote:
>
>> On Wed, Jan 25, 2017 at 8:23 PM Thomas Groh <tg...@google.com.invalid>
>> wrote:
>>
>> > I have a couple of points in addition to what Robert said
>> >
>> > Runners are permitted to determine bundle sizes as appropriate to their
>> > implementation, so long as bundles are atomically committed. The
>> contents
>> > of a PCollection are independent of the bundling of that PCollection.
>> >
>> > Runners can process all elements within their own bundles (e.g.
>> > https://github.com/apache/beam/blob/a6810372b003adf24bdbe34ed764a6
>> >
>> > 3841af9b99/runners/flink/runner/src/main/java/org/
>> apache/beam/runners/flink/
>> > translation/wrappers/streaming/DoFnOperator.java#L289
>> > <https://github.com/apache/beam/blob/a6810372b003adf24bdbe34ed764a6
>> 3841af9b99/runners/flink/runner/src/main/java/org/
>> apache/beam/runners/flink/translation/wrappers/
>> streaming/DoFnOperator.java#L289>),
>> > the entire input
>> > data, or anywhere in between.
>> >
>> Or, as Thomas mentioned, a runner could process an entire
>> <https://github.com/apache/beam/blob/master/runners/
>> spark/src/main/java/org/apache/beam/runners/spark/translation/
>> SparkProcessContext.java#L57>
>> partition of the data as a bundle. It basically depends on the runner's
>> internal processing model.
>>
>> >
>> > On Wed, Jan 25, 2017 at 10:05 AM, Robert Bradshaw <
>> > robertwb@google.com.invalid> wrote:
>> >
>> > > Bundles are simply the unit of commitment (retry) in the Beam SDK.
>> > > They're not really a model concept, but do leak from the
>> > > implementation into the API as it's not feasible to checkpoint every
>> > > individual process call, and this allows some state/compute/... to be
>> > > safely amortized across elements (either the results of all processed
>> > > elements in a bundle are sent downstream, or none are and the entire
>> > > bundle is retried).
>> > >
>> > > On Wed, Jan 25, 2017 at 9:36 AM, Matthew Jadczak <mn...@cam.ac.uk>
>> > wrote:
>> > > > Hi,
>> > > >
>> > > > I’m a finalist CompSci student at the University of Cambridge, and
>> for
>> > > my final project/dissertation I am writing an implementation of the
>> Beam
>> > > SDK in Elixir [1]. Given that the Beam project is obviously still very
>> > much
>> > > WIP, it’s still somewhat difficult to find good conceptual overviews
>> of
>> > > parts of the system, which is crucial when translating the OOP
>> > architecture
>> > > to something completely different. However I have found many of the
>> > design
>> > > docs scattered around the JIRA and here very helpful. (Incidentally,
>> > > perhaps it would be helpful to maintain a list of them, to help any
>> > > contributors acquaint themselves with the conceptual vision of the
>> > > implementation?)
>> > > >
>> > > > One thing which I have not yet been able to work out is the
>> > significance
>> > > of “bundles” in the SDK. On the one hand, it seems that they are
>> simply
>> > an
>> > > implementation detail, effectively a way to do micro-batch processing
>> > > efficiently, and indeed they are not mentioned at all in the original
>> > > Dataflow paper or anywhere in the Beam docs (except in passing). On
>> the
>> > > other hand, it seems most of the key transforms in the SDK core have a
>> > > concept of bundles and operate in their terms in practice, while all
>> > > conceptually being described as just operating on elements.
>> > > >
>> > > > Do bundles have semantic meaning in the Beam Model? Are there any
>> > > guidelines as to how a given transform should split its output up into
>> > > bundles? Should any runner/SDK implementing the Model have that
>> concept,
>> > > even when other primitives for streaming data processing including
>> things
>> > > like efficiently transmitting individual elements between stages with
>> > > backpressure are available in the language/standard libraries? Are
>> there
>> > > any insights here that I am missing, i.e. were problems present in
>> early
>> > > versions of the runners solved by adding the concept of bundles?
>> > > >
>> > > > Thanks so much,
>> > > > Matt
>> > > >
>> > > > [1] http://elixir-lang.org/
>> > >
>> >
>>
>

Re: Conceptually, what are bundles?

Posted by Eugene Kirpichov <ki...@google.com.INVALID>.
One more thing.

I think ideally, bundles should not leak into the model at all - e.g.
ideally, startBundle/finishBundle methods in DoFn should not exist. They
interact poorly with windowing.
The proper way to address what is commonly done in these methods is either
Setup/Teardown methods, or a (to be designed) "window close" callback -
there's a JIRA about the latter but I couldn't find it, perhaps +Kenn
Knowles <kl...@google.com> remembers what it is.

On Wed, Jan 25, 2017 at 10:41 AM Amit Sela <am...@gmail.com> wrote:

> On Wed, Jan 25, 2017 at 8:23 PM Thomas Groh <tg...@google.com.invalid>
> wrote:
>
> > I have a couple of points in addition to what Robert said
> >
> > Runners are permitted to determine bundle sizes as appropriate to their
> > implementation, so long as bundles are atomically committed. The contents
> > of a PCollection are independent of the bundling of that PCollection.
> >
> > Runners can process all elements within their own bundles (e.g.
> > https://github.com/apache/beam/blob/a6810372b003adf24bdbe34ed764a6
> >
> >
> 3841af9b99/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/
> > translation/wrappers/streaming/DoFnOperator.java#L289
> > <
> https://github.com/apache/beam/blob/a6810372b003adf24bdbe34ed764a63841af9b99/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/DoFnOperator.java#L289
> >),
> > the entire input
> > data, or anywhere in between.
> >
> Or, as Thomas mentioned, a runner could process an entire
> <
> https://github.com/apache/beam/blob/master/runners/spark/src/main/java/org/apache/beam/runners/spark/translation/SparkProcessContext.java#L57
> >
> partition of the data as a bundle. It basically depends on the runner's
> internal processing model.
>
> >
> > On Wed, Jan 25, 2017 at 10:05 AM, Robert Bradshaw <
> > robertwb@google.com.invalid> wrote:
> >
> > > Bundles are simply the unit of commitment (retry) in the Beam SDK.
> > > They're not really a model concept, but do leak from the
> > > implementation into the API as it's not feasible to checkpoint every
> > > individual process call, and this allows some state/compute/... to be
> > > safely amortized across elements (either the results of all processed
> > > elements in a bundle are sent downstream, or none are and the entire
> > > bundle is retried).
> > >
> > > On Wed, Jan 25, 2017 at 9:36 AM, Matthew Jadczak <mn...@cam.ac.uk>
> > wrote:
> > > > Hi,
> > > >
> > > > I’m a finalist CompSci student at the University of Cambridge, and
> for
> > > my final project/dissertation I am writing an implementation of the
> Beam
> > > SDK in Elixir [1]. Given that the Beam project is obviously still very
> > much
> > > WIP, it’s still somewhat difficult to find good conceptual overviews of
> > > parts of the system, which is crucial when translating the OOP
> > architecture
> > > to something completely different. However I have found many of the
> > design
> > > docs scattered around the JIRA and here very helpful. (Incidentally,
> > > perhaps it would be helpful to maintain a list of them, to help any
> > > contributors acquaint themselves with the conceptual vision of the
> > > implementation?)
> > > >
> > > > One thing which I have not yet been able to work out is the
> > significance
> > > of “bundles” in the SDK. On the one hand, it seems that they are simply
> > an
> > > implementation detail, effectively a way to do micro-batch processing
> > > efficiently, and indeed they are not mentioned at all in the original
> > > Dataflow paper or anywhere in the Beam docs (except in passing). On the
> > > other hand, it seems most of the key transforms in the SDK core have a
> > > concept of bundles and operate in their terms in practice, while all
> > > conceptually being described as just operating on elements.
> > > >
> > > > Do bundles have semantic meaning in the Beam Model? Are there any
> > > guidelines as to how a given transform should split its output up into
> > > bundles? Should any runner/SDK implementing the Model have that
> concept,
> > > even when other primitives for streaming data processing including
> things
> > > like efficiently transmitting individual elements between stages with
> > > backpressure are available in the language/standard libraries? Are
> there
> > > any insights here that I am missing, i.e. were problems present in
> early
> > > versions of the runners solved by adding the concept of bundles?
> > > >
> > > > Thanks so much,
> > > > Matt
> > > >
> > > > [1] http://elixir-lang.org/
> > >
> >
>

Re: Conceptually, what are bundles?

Posted by Amit Sela <am...@gmail.com>.
On Wed, Jan 25, 2017 at 8:23 PM Thomas Groh <tg...@google.com.invalid>
wrote:

> I have a couple of points in addition to what Robert said
>
> Runners are permitted to determine bundle sizes as appropriate to their
> implementation, so long as bundles are atomically committed. The contents
> of a PCollection are independent of the bundling of that PCollection.
>
> Runners can process all elements within their own bundles (e.g.
> https://github.com/apache/beam/blob/a6810372b003adf24bdbe34ed764a6
>
> 3841af9b99/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/
> translation/wrappers/streaming/DoFnOperator.java#L289
> <https://github.com/apache/beam/blob/a6810372b003adf24bdbe34ed764a63841af9b99/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/DoFnOperator.java#L289>),
> the entire input
> data, or anywhere in between.
>
Or, as Thomas mentioned, a runner could process an entire
<https://github.com/apache/beam/blob/master/runners/spark/src/main/java/org/apache/beam/runners/spark/translation/SparkProcessContext.java#L57>
partition of the data as a bundle. It basically depends on the runner's
internal processing model.

>
> On Wed, Jan 25, 2017 at 10:05 AM, Robert Bradshaw <
> robertwb@google.com.invalid> wrote:
>
> > Bundles are simply the unit of commitment (retry) in the Beam SDK.
> > They're not really a model concept, but do leak from the
> > implementation into the API as it's not feasible to checkpoint every
> > individual process call, and this allows some state/compute/... to be
> > safely amortized across elements (either the results of all processed
> > elements in a bundle are sent downstream, or none are and the entire
> > bundle is retried).
> >
> > On Wed, Jan 25, 2017 at 9:36 AM, Matthew Jadczak <mn...@cam.ac.uk>
> wrote:
> > > Hi,
> > >
> > > I’m a finalist CompSci student at the University of Cambridge, and for
> > my final project/dissertation I am writing an implementation of the Beam
> > SDK in Elixir [1]. Given that the Beam project is obviously still very
> much
> > WIP, it’s still somewhat difficult to find good conceptual overviews of
> > parts of the system, which is crucial when translating the OOP
> architecture
> > to something completely different. However I have found many of the
> design
> > docs scattered around the JIRA and here very helpful. (Incidentally,
> > perhaps it would be helpful to maintain a list of them, to help any
> > contributors acquaint themselves with the conceptual vision of the
> > implementation?)
> > >
> > > One thing which I have not yet been able to work out is the
> significance
> > of “bundles” in the SDK. On the one hand, it seems that they are simply
> an
> > implementation detail, effectively a way to do micro-batch processing
> > efficiently, and indeed they are not mentioned at all in the original
> > Dataflow paper or anywhere in the Beam docs (except in passing). On the
> > other hand, it seems most of the key transforms in the SDK core have a
> > concept of bundles and operate in their terms in practice, while all
> > conceptually being described as just operating on elements.
> > >
> > > Do bundles have semantic meaning in the Beam Model? Are there any
> > guidelines as to how a given transform should split its output up into
> > bundles? Should any runner/SDK implementing the Model have that concept,
> > even when other primitives for streaming data processing including things
> > like efficiently transmitting individual elements between stages with
> > backpressure are available in the language/standard libraries? Are there
> > any insights here that I am missing, i.e. were problems present in early
> > versions of the runners solved by adding the concept of bundles?
> > >
> > > Thanks so much,
> > > Matt
> > >
> > > [1] http://elixir-lang.org/
> >
>

Re: Conceptually, what are bundles?

Posted by Thomas Groh <tg...@google.com.INVALID>.
I have a couple of points in addition to what Robert said

Runners are permitted to determine bundle sizes as appropriate to their
implementation, so long as bundles are atomically committed. The contents
of a PCollection are independent of the bundling of that PCollection.

Runners can process all elements within their own bundles (e.g.
https://github.com/apache/beam/blob/a6810372b003adf24bdbe34ed764a6
3841af9b99/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/
translation/wrappers/streaming/DoFnOperator.java#L289), the entire input
data, or anywhere in between.

On Wed, Jan 25, 2017 at 10:05 AM, Robert Bradshaw <
robertwb@google.com.invalid> wrote:

> Bundles are simply the unit of commitment (retry) in the Beam SDK.
> They're not really a model concept, but do leak from the
> implementation into the API as it's not feasible to checkpoint every
> individual process call, and this allows some state/compute/... to be
> safely amortized across elements (either the results of all processed
> elements in a bundle are sent downstream, or none are and the entire
> bundle is retried).
>
> On Wed, Jan 25, 2017 at 9:36 AM, Matthew Jadczak <mn...@cam.ac.uk> wrote:
> > Hi,
> >
> > I’m a finalist CompSci student at the University of Cambridge, and for
> my final project/dissertation I am writing an implementation of the Beam
> SDK in Elixir [1]. Given that the Beam project is obviously still very much
> WIP, it’s still somewhat difficult to find good conceptual overviews of
> parts of the system, which is crucial when translating the OOP architecture
> to something completely different. However I have found many of the design
> docs scattered around the JIRA and here very helpful. (Incidentally,
> perhaps it would be helpful to maintain a list of them, to help any
> contributors acquaint themselves with the conceptual vision of the
> implementation?)
> >
> > One thing which I have not yet been able to work out is the significance
> of “bundles” in the SDK. On the one hand, it seems that they are simply an
> implementation detail, effectively a way to do micro-batch processing
> efficiently, and indeed they are not mentioned at all in the original
> Dataflow paper or anywhere in the Beam docs (except in passing). On the
> other hand, it seems most of the key transforms in the SDK core have a
> concept of bundles and operate in their terms in practice, while all
> conceptually being described as just operating on elements.
> >
> > Do bundles have semantic meaning in the Beam Model? Are there any
> guidelines as to how a given transform should split its output up into
> bundles? Should any runner/SDK implementing the Model have that concept,
> even when other primitives for streaming data processing including things
> like efficiently transmitting individual elements between stages with
> backpressure are available in the language/standard libraries? Are there
> any insights here that I am missing, i.e. were problems present in early
> versions of the runners solved by adding the concept of bundles?
> >
> > Thanks so much,
> > Matt
> >
> > [1] http://elixir-lang.org/
>

Re: Conceptually, what are bundles?

Posted by Robert Bradshaw <ro...@google.com.INVALID>.
Bundles are simply the unit of commitment (retry) in the Beam SDK.
They're not really a model concept, but do leak from the
implementation into the API as it's not feasible to checkpoint every
individual process call, and this allows some state/compute/... to be
safely amortized across elements (either the results of all processed
elements in a bundle are sent downstream, or none are and the entire
bundle is retried).

On Wed, Jan 25, 2017 at 9:36 AM, Matthew Jadczak <mn...@cam.ac.uk> wrote:
> Hi,
>
> I’m a finalist CompSci student at the University of Cambridge, and for my final project/dissertation I am writing an implementation of the Beam SDK in Elixir [1]. Given that the Beam project is obviously still very much WIP, it’s still somewhat difficult to find good conceptual overviews of parts of the system, which is crucial when translating the OOP architecture to something completely different. However I have found many of the design docs scattered around the JIRA and here very helpful. (Incidentally, perhaps it would be helpful to maintain a list of them, to help any contributors acquaint themselves with the conceptual vision of the implementation?)
>
> One thing which I have not yet been able to work out is the significance of “bundles” in the SDK. On the one hand, it seems that they are simply an implementation detail, effectively a way to do micro-batch processing efficiently, and indeed they are not mentioned at all in the original Dataflow paper or anywhere in the Beam docs (except in passing). On the other hand, it seems most of the key transforms in the SDK core have a concept of bundles and operate in their terms in practice, while all conceptually being described as just operating on elements.
>
> Do bundles have semantic meaning in the Beam Model? Are there any guidelines as to how a given transform should split its output up into bundles? Should any runner/SDK implementing the Model have that concept, even when other primitives for streaming data processing including things like efficiently transmitting individual elements between stages with backpressure are available in the language/standard libraries? Are there any insights here that I am missing, i.e. were problems present in early versions of the runners solved by adding the concept of bundles?
>
> Thanks so much,
> Matt
>
> [1] http://elixir-lang.org/