You are viewing a plain text version of this content. The canonical link for it is here.
Posted to dev@beam.apache.org by Thomas Groh <tg...@google.com.INVALID> on 2016/06/07 18:44:36 UTC

DoFn Reuse

Hey everyone;

I'm starting to work on BEAM-38 (
https://issues.apache.org/jira/browse/BEAM-38), which enables an
optimization for runners with many small bundles. BEAM-38 allows runners to
reuse DoFn instances so long as that DoFn has not terminated abnormally.
This replaces the previous requirement that a DoFn be used for only a
single bundle if either of startBundle or finishBundle have been
overwritten.

DoFn deserialization-per-bundle can be a significance performance
bottleneck when there are many small bundles, as is common in streaming
executions. It has also surfaced as the cause of much of the current
slowness in the new InProcessRunner.

Existing Runners do not require any changes; they may choose to take
advantage of of the new optimization opportunity. However, user DoFns may
need to be revised to properly set up and tear down state in startBundle
and finishBundle, respectively, if the depended on only being used for a
single bundle.

The first two updates are already in pull requests:

PR #419 (https://github.com/apache/incubator-beam/pull/419) updates the
Javadoc to the new spec
PR #418 (https://github.com/apache/incubator-beam/pull/418) updates the
DirectRunner to reuse DoFns according to the new policy.

Yours,

Thomas

Re: DoFn Reuse

Posted by Thomas Groh <tg...@google.com.INVALID>.
A Bundle is an arbitrary collection of elements. A PCollection is divided
into bundles at the discretion of the runner. However, the bundles must
partition the input PCollection; each element is in exactly one bundle, and
each bundle is successfully committed exactly once in a successful pipeline.

Ben's distinction is useful - notably, in the second sequence, as the
bundle has been committed, the elements will not (and can not) be
reprocessed, and outputs can be entirely lost.


For ParDo, the existing sequence was <startBundle> <processElement>*
<finishBundle>, and the new sequence is (<startBundle> <processElement>*
<finishBundle>)*

The documentation for the earlier sequence is at
https://github.com/apache/incubator-beam/blob/0393a7917318baaa1e580259a74bff2c1dcbe6b8/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/ParDo.java#L88
finishBundle is noted as being called whenever an input bundle is completed.

There is also documentation that permits the system to run multiple copies
of a DoFn, starting at
https://github.com/apache/incubator-beam/blob/master/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/ParDo.java#L400;
in either case, completion includes the execution of the finishBundle()
method.
"
* Sometimes two or more {@link DoFn} instances will be running on the
* same bundle simultaneously, with the system taking the results of
* the first instance to complete successfully
"

On Wed, Jun 8, 2016 at 10:13 AM, Ben Chambers <bc...@google.com.invalid>
wrote:

> I think there is a difference:
>
> - If failure occurs after finishBundle() but before the consumption is
> committed, then the bundle may be reprocessed, which leads to duplicated
> calls to processElement() and finishBundle().
> - If failure occurs after consumption is committed but before
> finishBundle(), then those elements which may have buffered state in the
> DoFn but not had their side-effects fully processed (since the
> finishBundle() was responsible for that) are lost.
>
>
>
> On Wed, Jun 8, 2016 at 10:09 AM Raghu Angadi <ra...@google.com.invalid>
> wrote:
>
> > On Wed, Jun 8, 2016 at 10:05 AM, Raghu Angadi <ra...@google.com>
> wrote:
> > >
> > > I thought finishBundle() exists simply as best-effort indication from
> the
> > > runner to user some chunk of records have been processed..
> >
> > also to help with DoFn's own clean up if there is any.
> >
>

Re: DoFn Reuse

Posted by Maximilian Michels <mx...@apache.org>.
Thanks for the clarification, Thomas. I think we have to improve the
bundle execution of the Flink Runner. It is also not uniform among
batch/streaming execution and different operators.

On Wed, Jun 8, 2016 at 7:43 PM, Raghu Angadi <ra...@google.com.invalid> wrote:
> On Wed, Jun 8, 2016 at 10:39 AM, Ben Chambers <bc...@google.com.invalid>
> wrote:
>
>>
>> To clarify -- this case is actually not allowed by the beam model. The
>> guarantee is that either a bundle is successfully completed (startBundle,
>> processElement*, finishBundle, commit) or not. If it isn't, then the bundle
>> is reprocessed. So, if a `DoFn` instance builds up any state while
>> processing a bundle and a failure happens at any point prior to the commit,
>> it will be retried. Even though the actual state in the first `DoFn` was
>> lost, the second attempt will build up the same state.
>
>
> Makes sense. I missed the fact that finshBundle(Context) could emit more
> records, which affects the pipeline state.

Re: DoFn Reuse

Posted by Raghu Angadi <ra...@google.com.INVALID>.
On Wed, Jun 8, 2016 at 10:39 AM, Ben Chambers <bc...@google.com.invalid>
wrote:

>
> To clarify -- this case is actually not allowed by the beam model. The
> guarantee is that either a bundle is successfully completed (startBundle,
> processElement*, finishBundle, commit) or not. If it isn't, then the bundle
> is reprocessed. So, if a `DoFn` instance builds up any state while
> processing a bundle and a failure happens at any point prior to the commit,
> it will be retried. Even though the actual state in the first `DoFn` was
> lost, the second attempt will build up the same state.


Makes sense. I missed the fact that finshBundle(Context) could emit more
records, which affects the pipeline state.

Re: DoFn Reuse

Posted by Ben Chambers <bc...@google.com.INVALID>.
On Wed, Jun 8, 2016 at 10:29 AM Raghu Angadi <ra...@google.com.invalid>
wrote:

> On Wed, Jun 8, 2016 at 10:13 AM, Ben Chambers <bchambers@google.com.invalid
> >
> wrote:
>
> > - If failure occurs after finishBundle() but before the consumption is
> > committed, then the bundle may be reprocessed, which leads to duplicated
> > calls to processElement() and finishBundle().
> >
>
>
>
> > - If failure occurs after consumption is committed but before
> > finishBundle(), then those elements which may have buffered state in the
> > DoFn but not had their side-effects fully processed (since the
> > finishBundle() was responsible for that) are lost.
> >

I am trying to understand this better. Does this mean during
> recovery/replay after a failure, the particular instance of DoFn that
> existed before the worker failure would not be discarded, but might still
> receive elements?  If a DoFn is caching some internal state, it should
> always assume the worker its on might abruptly fail anytime and the state
> would be lost, right?
>

To clarify -- this case is actually not allowed by the beam model. The
guarantee is that either a bundle is successfully completed (startBundle,
processElement*, finishBundle, commit) or not. If it isn't, then the bundle
is reprocessed. So, if a `DoFn` instance builds up any state while
processing a bundle and a failure happens at any point prior to the commit,
it will be retried. Even though the actual state in the first `DoFn` was
lost, the second attempt will build up the same state.

Re: DoFn Reuse

Posted by Thomas Groh <tg...@google.com.INVALID>.
In the case of failure, a DoFn instance will not be reused; however, in the
case of failure either the inputs will be retried, or the pipeline will
fail, allowing a newly deserialized instance of the DoFn to reprocess the
inputs (which should produce the same result, meaning there is no data
loss).

On Wed, Jun 8, 2016 at 10:29 AM, Raghu Angadi <ra...@google.com.invalid>
wrote:

> On Wed, Jun 8, 2016 at 10:13 AM, Ben Chambers <bchambers@google.com.invalid
> >
> wrote:
>
> > - If failure occurs after finishBundle() but before the consumption is
> > committed, then the bundle may be reprocessed, which leads to duplicated
> > calls to processElement() and finishBundle().
> >
>
>
>
> > - If failure occurs after consumption is committed but before
> > finishBundle(), then those elements which may have buffered state in the
> > DoFn but not had their side-effects fully processed (since the
> > finishBundle() was responsible for that) are lost.
> >
>
> I am trying to understand this better. Does this mean during
> recovery/replay after a failure, the particular instance of DoFn that
> existed before the worker failure would not be discarded, but might still
> receive elements?  If a DoFn is caching some internal state, it should
> always assume the worker its on might abruptly fail anytime and the state
> would be lost, right?
>

Re: DoFn Reuse

Posted by Raghu Angadi <ra...@google.com.INVALID>.
On Wed, Jun 8, 2016 at 10:13 AM, Ben Chambers <bc...@google.com.invalid>
wrote:

> - If failure occurs after finishBundle() but before the consumption is
> committed, then the bundle may be reprocessed, which leads to duplicated
> calls to processElement() and finishBundle().
>



> - If failure occurs after consumption is committed but before
> finishBundle(), then those elements which may have buffered state in the
> DoFn but not had their side-effects fully processed (since the
> finishBundle() was responsible for that) are lost.
>

I am trying to understand this better. Does this mean during
recovery/replay after a failure, the particular instance of DoFn that
existed before the worker failure would not be discarded, but might still
receive elements?  If a DoFn is caching some internal state, it should
always assume the worker its on might abruptly fail anytime and the state
would be lost, right?

Re: DoFn Reuse

Posted by Ben Chambers <bc...@google.com.INVALID>.
I think there is a difference:

- If failure occurs after finishBundle() but before the consumption is
committed, then the bundle may be reprocessed, which leads to duplicated
calls to processElement() and finishBundle().
- If failure occurs after consumption is committed but before
finishBundle(), then those elements which may have buffered state in the
DoFn but not had their side-effects fully processed (since the
finishBundle() was responsible for that) are lost.



On Wed, Jun 8, 2016 at 10:09 AM Raghu Angadi <ra...@google.com.invalid>
wrote:

> On Wed, Jun 8, 2016 at 10:05 AM, Raghu Angadi <ra...@google.com> wrote:
> >
> > I thought finishBundle() exists simply as best-effort indication from the
> > runner to user some chunk of records have been processed..
>
> also to help with DoFn's own clean up if there is any.
>

Re: DoFn Reuse

Posted by Raghu Angadi <ra...@google.com.INVALID>.
On Wed, Jun 8, 2016 at 10:05 AM, Raghu Angadi <ra...@google.com> wrote:
>
> I thought finishBundle() exists simply as best-effort indication from the
> runner to user some chunk of records have been processed..

also to help with DoFn's own clean up if there is any.

Re: DoFn Reuse

Posted by Robert Bradshaw <ro...@google.com.INVALID>.
The unit of commit is the bundle.

Consider a DoFn that does batching (e.g. to interact with some
external service less frequently). Items may be buffered during
process() but these buffered items must be processed and the results
emitted in finishBundle(). If inputs are committed as being consumed
before finishBundle is called (and its outputs committed) this
buffered state would be lost but the inputs not replayed.

Put another way, the elements are partitioned into bundles, and the
exactly once guarantee states that the output is the union of exactly
one processing of each bundle. (Bundles may be retried and/or
partially processed; such outputs are discarded.)

On Wed, Jun 8, 2016 at 10:05 AM, Raghu Angadi
<ra...@google.com.invalid> wrote:
> Such data loss can still occur if the worker dies after finishBundle()
> returns, but before the consumption is committed. I thought finishBundle()
> exists simply as best-effort indication from the runner to user some chunk
> of records have been processed.. not part of processing guarantees. Also
> the term "bundle" itself is fairly loosely defined (may be intentionally).
>
> On Wed, Jun 8, 2016 at 8:47 AM, Thomas Groh <tg...@google.com.invalid>
> wrote:
>
>> finishBundle() **must** be called before any input consumption is committed
>> (i.e. marking inputs as completed, which incldues committing any elements
>> they produced). Doing otherwise can cause data loss, as the state of the
>> DoFn is lost if a worker dies, but the input elements will never be
>> reprocessed to recreate the DoFn state. If this occurs, any buffered
>> outputs are lost.
>>
>> On Wed, Jun 8, 2016 at 8:21 AM, Bobby Evans <ev...@yahoo-inc.com.invalid>
>> wrote:
>>
>> > The local java runner does arbitrary batching of 10 elements.
>> >
>> > I'm not sure if flink exposes this or not, but couldn't you use the
>> > checkpoint triggers to also start/finish a bundle?
>> >  - Bobby
>> >
>> >     On Wednesday, June 8, 2016 10:17 AM, Aljoscha Krettek <
>> > aljoscha@apache.org> wrote:
>> >
>> >
>> >  Ahh, what we could do is artificially induce bundles using either count
>> or
>> > processing time or both. Just so that finishBundle() is called once in a
>> > while.
>> >
>> > On Wed, 8 Jun 2016 at 17:12 Aljoscha Krettek <al...@apache.org>
>> wrote:
>> >
>> > > Pretty sure, yes. The Iterable in a MapPartitionFunction should give
>> you
>> > > all the values in a given partition.
>> > >
>> > > I checked again for streaming execution. We're doing the opposite,
>> right
>> > > now: every element is a bundle in itself, startBundle()/finishBundle()
>> > are
>> > > called for every element which seems a bit wasteful. The only other
>> > option
>> > > is to see all elements as one bundle, because Flink does not
>> bundle/micro
>> > > batch elements in streaming execution.
>> > >
>> > > On Wed, 8 Jun 2016 at 16:38 Bobby Evans <ev...@yahoo-inc.com.invalid>
>> > > wrote:
>> > >
>> > >> Are you sure about that for Flink?  I thought the iterable finished
>> when
>> > >> you processed a maximum number of elements or the input queue was
>> empty
>> > so
>> > >> that it could returned control back to akka for better sharing of the
>> > >> thread pool.
>> > >>
>> > >>
>> > >>
>> >
>> https://github.com/apache/incubator-beam/blob/af8f5935ca1866012ceb102b9472c8b1ef102d73/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/functions/FlinkDoFnFunction.java#L99
>> > >> Also in the javadocs for DoFn.Context it explicitly states that you
>> can
>> > >> emit from the finishBundle method.
>> > >>
>> > >>
>> > >>
>> >
>> https://github.com/apache/incubator-beam/blob/master/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/DoFn.java#L104-L110
>> > >> I thought I had seen some example of this being used for batching
>> output
>> > >> to something downstream, like HDFS or Kafka, but I'm not sure on that.
>> > If
>> > >> you can emit from finsihBundle and an new instance of the DoFn will be
>> > >> created around each bundle then I can see some people trying to do
>> > >> aggregations inside a DoFn and then emitting them at the end of the
>> > bundle
>> > >> knowing that if a batch fails or is rolled back the system will handle
>> > it.
>> > >> If that is not allowed we should really update the javadocs around it
>> to
>> > >> explain the pitfalls of doing this.
>> > >>  - Bobby
>> > >>
>> > >>    On Wednesday, June 8, 2016 4:24 AM, Aljoscha Krettek <
>> > >> aljoscha@apache.org> wrote:
>> > >>
>> > >>
>> > >>  Hi,
>> > >> a quick related question: In the Flink runner we basically see
>> > everything
>> > >> as one big bundle, i.e. we call startBundle() once at the beginning
>> and
>> > >> then keep processing indefinitely, never calling finishBundle(). Is
>> this
>> > >> also correct behavior?
>> > >>
>> > >> Best,
>> > >> Aljoscha
>> > >>
>> > >> On Tue, 7 Jun 2016 at 20:44 Thomas Groh <tg...@google.com.invalid>
>> > wrote:
>> > >>
>> > >> > Hey everyone;
>> > >> >
>> > >> > I'm starting to work on BEAM-38 (
>> > >> > https://issues.apache.org/jira/browse/BEAM-38), which enables an
>> > >> > optimization for runners with many small bundles. BEAM-38 allows
>> > >> runners to
>> > >> > reuse DoFn instances so long as that DoFn has not terminated
>> > abnormally.
>> > >> > This replaces the previous requirement that a DoFn be used for only
>> a
>> > >> > single bundle if either of startBundle or finishBundle have been
>> > >> > overwritten.
>> > >> >
>> > >> > DoFn deserialization-per-bundle can be a significance performance
>> > >> > bottleneck when there are many small bundles, as is common in
>> > streaming
>> > >> > executions. It has also surfaced as the cause of much of the current
>> > >> > slowness in the new InProcessRunner.
>> > >> >
>> > >> > Existing Runners do not require any changes; they may choose to take
>> > >> > advantage of of the new optimization opportunity. However, user
>> DoFns
>> > >> may
>> > >> > need to be revised to properly set up and tear down state in
>> > startBundle
>> > >> > and finishBundle, respectively, if the depended on only being used
>> > for a
>> > >> > single bundle.
>> > >> >
>> > >> > The first two updates are already in pull requests:
>> > >> >
>> > >> > PR #419 (https://github.com/apache/incubator-beam/pull/419) updates
>> > the
>> > >> > Javadoc to the new spec
>> > >> > PR #418 (https://github.com/apache/incubator-beam/pull/418) updates
>> > the
>> > >> > DirectRunner to reuse DoFns according to the new policy.
>> > >> >
>> > >> > Yours,
>> > >> >
>> > >> > Thomas
>> > >> >
>> > >>
>> > >>
>> > >>
>> > >
>> > >
>> >
>> >
>> >
>> >
>>

Re: DoFn Reuse

Posted by Raghu Angadi <ra...@google.com.INVALID>.
On Wed, Jun 8, 2016 at 10:15 AM, Dan Halperin <dh...@google.com.invalid>
wrote:

> > I thought finishBundle()
> > exists simply as best-effort indication from the runner to user some
> chunk
> > of records have been processed.. not part of processing guarantees. Also
> > the term "bundle" itself is fairly loosely defined (may be
> intentionally).
> >
>
> No, finish bundle MUST be called by a runner before it can commit any work.
> This
> is akin to flushing a stream before closing it -- the DoFn may have some
> elements
> cached or pending and if you don't call finish bundle you will not have
> fully
> processed or produced all the elements.


I see. finshBundle() includes context too (DoFn could output more elements
e.g.). Yeah it should be called before the runner can commit/checkpoint.

Re: DoFn Reuse

Posted by Dan Halperin <dh...@google.com.INVALID>.
On Wed, Jun 8, 2016 at 10:05 AM, Raghu Angadi <ra...@google.com.invalid>
wrote:

> Such data loss can still occur if the worker dies after finishBundle()
> returns, but before the consumption is committed.


If the runner is correctly implemented, there will not be data loss in this
case -- the runner
should retry the bundle (or all the elements that were in this bundle as
part of one or more new bundles)
as it has not committed the work.


> I thought finishBundle()
> exists simply as best-effort indication from the runner to user some chunk
> of records have been processed.. not part of processing guarantees. Also
> the term "bundle" itself is fairly loosely defined (may be intentionally).
>

No, finish bundle MUST be called by a runner before it can commit any work.
This
is akin to flushing a stream before closing it -- the DoFn may have some
elements
cached or pending and if you don't call finish bundle you will not have
fully
processed or produced all the elements.

Dan



>
> On Wed, Jun 8, 2016 at 8:47 AM, Thomas Groh <tg...@google.com.invalid>
> wrote:
>
> > finishBundle() **must** be called before any input consumption is
> committed
> > (i.e. marking inputs as completed, which incldues committing any elements
> > they produced). Doing otherwise can cause data loss, as the state of the
> > DoFn is lost if a worker dies, but the input elements will never be
> > reprocessed to recreate the DoFn state. If this occurs, any buffered
> > outputs are lost.
> >
> > On Wed, Jun 8, 2016 at 8:21 AM, Bobby Evans <evans@yahoo-inc.com.invalid
> >
> > wrote:
> >
> > > The local java runner does arbitrary batching of 10 elements.
> > >
> > > I'm not sure if flink exposes this or not, but couldn't you use the
> > > checkpoint triggers to also start/finish a bundle?
> > >  - Bobby
> > >
> > >     On Wednesday, June 8, 2016 10:17 AM, Aljoscha Krettek <
> > > aljoscha@apache.org> wrote:
> > >
> > >
> > >  Ahh, what we could do is artificially induce bundles using either
> count
> > or
> > > processing time or both. Just so that finishBundle() is called once in
> a
> > > while.
> > >
> > > On Wed, 8 Jun 2016 at 17:12 Aljoscha Krettek <al...@apache.org>
> > wrote:
> > >
> > > > Pretty sure, yes. The Iterable in a MapPartitionFunction should give
> > you
> > > > all the values in a given partition.
> > > >
> > > > I checked again for streaming execution. We're doing the opposite,
> > right
> > > > now: every element is a bundle in itself,
> startBundle()/finishBundle()
> > > are
> > > > called for every element which seems a bit wasteful. The only other
> > > option
> > > > is to see all elements as one bundle, because Flink does not
> > bundle/micro
> > > > batch elements in streaming execution.
> > > >
> > > > On Wed, 8 Jun 2016 at 16:38 Bobby Evans <evans@yahoo-inc.com.invalid
> >
> > > > wrote:
> > > >
> > > >> Are you sure about that for Flink?  I thought the iterable finished
> > when
> > > >> you processed a maximum number of elements or the input queue was
> > empty
> > > so
> > > >> that it could returned control back to akka for better sharing of
> the
> > > >> thread pool.
> > > >>
> > > >>
> > > >>
> > >
> >
> https://github.com/apache/incubator-beam/blob/af8f5935ca1866012ceb102b9472c8b1ef102d73/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/functions/FlinkDoFnFunction.java#L99
> > > >> Also in the javadocs for DoFn.Context it explicitly states that you
> > can
> > > >> emit from the finishBundle method.
> > > >>
> > > >>
> > > >>
> > >
> >
> https://github.com/apache/incubator-beam/blob/master/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/DoFn.java#L104-L110
> > > >> I thought I had seen some example of this being used for batching
> > output
> > > >> to something downstream, like HDFS or Kafka, but I'm not sure on
> that.
> > > If
> > > >> you can emit from finsihBundle and an new instance of the DoFn will
> be
> > > >> created around each bundle then I can see some people trying to do
> > > >> aggregations inside a DoFn and then emitting them at the end of the
> > > bundle
> > > >> knowing that if a batch fails or is rolled back the system will
> handle
> > > it.
> > > >> If that is not allowed we should really update the javadocs around
> it
> > to
> > > >> explain the pitfalls of doing this.
> > > >>  - Bobby
> > > >>
> > > >>    On Wednesday, June 8, 2016 4:24 AM, Aljoscha Krettek <
> > > >> aljoscha@apache.org> wrote:
> > > >>
> > > >>
> > > >>  Hi,
> > > >> a quick related question: In the Flink runner we basically see
> > > everything
> > > >> as one big bundle, i.e. we call startBundle() once at the beginning
> > and
> > > >> then keep processing indefinitely, never calling finishBundle(). Is
> > this
> > > >> also correct behavior?
> > > >>
> > > >> Best,
> > > >> Aljoscha
> > > >>
> > > >> On Tue, 7 Jun 2016 at 20:44 Thomas Groh <tg...@google.com.invalid>
> > > wrote:
> > > >>
> > > >> > Hey everyone;
> > > >> >
> > > >> > I'm starting to work on BEAM-38 (
> > > >> > https://issues.apache.org/jira/browse/BEAM-38), which enables an
> > > >> > optimization for runners with many small bundles. BEAM-38 allows
> > > >> runners to
> > > >> > reuse DoFn instances so long as that DoFn has not terminated
> > > abnormally.
> > > >> > This replaces the previous requirement that a DoFn be used for
> only
> > a
> > > >> > single bundle if either of startBundle or finishBundle have been
> > > >> > overwritten.
> > > >> >
> > > >> > DoFn deserialization-per-bundle can be a significance performance
> > > >> > bottleneck when there are many small bundles, as is common in
> > > streaming
> > > >> > executions. It has also surfaced as the cause of much of the
> current
> > > >> > slowness in the new InProcessRunner.
> > > >> >
> > > >> > Existing Runners do not require any changes; they may choose to
> take
> > > >> > advantage of of the new optimization opportunity. However, user
> > DoFns
> > > >> may
> > > >> > need to be revised to properly set up and tear down state in
> > > startBundle
> > > >> > and finishBundle, respectively, if the depended on only being used
> > > for a
> > > >> > single bundle.
> > > >> >
> > > >> > The first two updates are already in pull requests:
> > > >> >
> > > >> > PR #419 (https://github.com/apache/incubator-beam/pull/419)
> updates
> > > the
> > > >> > Javadoc to the new spec
> > > >> > PR #418 (https://github.com/apache/incubator-beam/pull/418)
> updates
> > > the
> > > >> > DirectRunner to reuse DoFns according to the new policy.
> > > >> >
> > > >> > Yours,
> > > >> >
> > > >> > Thomas
> > > >> >
> > > >>
> > > >>
> > > >>
> > > >
> > > >
> > >
> > >
> > >
> > >
> >
>

Re: DoFn Reuse

Posted by Raghu Angadi <ra...@google.com.INVALID>.
Such data loss can still occur if the worker dies after finishBundle()
returns, but before the consumption is committed. I thought finishBundle()
exists simply as best-effort indication from the runner to user some chunk
of records have been processed.. not part of processing guarantees. Also
the term "bundle" itself is fairly loosely defined (may be intentionally).

On Wed, Jun 8, 2016 at 8:47 AM, Thomas Groh <tg...@google.com.invalid>
wrote:

> finishBundle() **must** be called before any input consumption is committed
> (i.e. marking inputs as completed, which incldues committing any elements
> they produced). Doing otherwise can cause data loss, as the state of the
> DoFn is lost if a worker dies, but the input elements will never be
> reprocessed to recreate the DoFn state. If this occurs, any buffered
> outputs are lost.
>
> On Wed, Jun 8, 2016 at 8:21 AM, Bobby Evans <ev...@yahoo-inc.com.invalid>
> wrote:
>
> > The local java runner does arbitrary batching of 10 elements.
> >
> > I'm not sure if flink exposes this or not, but couldn't you use the
> > checkpoint triggers to also start/finish a bundle?
> >  - Bobby
> >
> >     On Wednesday, June 8, 2016 10:17 AM, Aljoscha Krettek <
> > aljoscha@apache.org> wrote:
> >
> >
> >  Ahh, what we could do is artificially induce bundles using either count
> or
> > processing time or both. Just so that finishBundle() is called once in a
> > while.
> >
> > On Wed, 8 Jun 2016 at 17:12 Aljoscha Krettek <al...@apache.org>
> wrote:
> >
> > > Pretty sure, yes. The Iterable in a MapPartitionFunction should give
> you
> > > all the values in a given partition.
> > >
> > > I checked again for streaming execution. We're doing the opposite,
> right
> > > now: every element is a bundle in itself, startBundle()/finishBundle()
> > are
> > > called for every element which seems a bit wasteful. The only other
> > option
> > > is to see all elements as one bundle, because Flink does not
> bundle/micro
> > > batch elements in streaming execution.
> > >
> > > On Wed, 8 Jun 2016 at 16:38 Bobby Evans <ev...@yahoo-inc.com.invalid>
> > > wrote:
> > >
> > >> Are you sure about that for Flink?  I thought the iterable finished
> when
> > >> you processed a maximum number of elements or the input queue was
> empty
> > so
> > >> that it could returned control back to akka for better sharing of the
> > >> thread pool.
> > >>
> > >>
> > >>
> >
> https://github.com/apache/incubator-beam/blob/af8f5935ca1866012ceb102b9472c8b1ef102d73/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/functions/FlinkDoFnFunction.java#L99
> > >> Also in the javadocs for DoFn.Context it explicitly states that you
> can
> > >> emit from the finishBundle method.
> > >>
> > >>
> > >>
> >
> https://github.com/apache/incubator-beam/blob/master/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/DoFn.java#L104-L110
> > >> I thought I had seen some example of this being used for batching
> output
> > >> to something downstream, like HDFS or Kafka, but I'm not sure on that.
> > If
> > >> you can emit from finsihBundle and an new instance of the DoFn will be
> > >> created around each bundle then I can see some people trying to do
> > >> aggregations inside a DoFn and then emitting them at the end of the
> > bundle
> > >> knowing that if a batch fails or is rolled back the system will handle
> > it.
> > >> If that is not allowed we should really update the javadocs around it
> to
> > >> explain the pitfalls of doing this.
> > >>  - Bobby
> > >>
> > >>    On Wednesday, June 8, 2016 4:24 AM, Aljoscha Krettek <
> > >> aljoscha@apache.org> wrote:
> > >>
> > >>
> > >>  Hi,
> > >> a quick related question: In the Flink runner we basically see
> > everything
> > >> as one big bundle, i.e. we call startBundle() once at the beginning
> and
> > >> then keep processing indefinitely, never calling finishBundle(). Is
> this
> > >> also correct behavior?
> > >>
> > >> Best,
> > >> Aljoscha
> > >>
> > >> On Tue, 7 Jun 2016 at 20:44 Thomas Groh <tg...@google.com.invalid>
> > wrote:
> > >>
> > >> > Hey everyone;
> > >> >
> > >> > I'm starting to work on BEAM-38 (
> > >> > https://issues.apache.org/jira/browse/BEAM-38), which enables an
> > >> > optimization for runners with many small bundles. BEAM-38 allows
> > >> runners to
> > >> > reuse DoFn instances so long as that DoFn has not terminated
> > abnormally.
> > >> > This replaces the previous requirement that a DoFn be used for only
> a
> > >> > single bundle if either of startBundle or finishBundle have been
> > >> > overwritten.
> > >> >
> > >> > DoFn deserialization-per-bundle can be a significance performance
> > >> > bottleneck when there are many small bundles, as is common in
> > streaming
> > >> > executions. It has also surfaced as the cause of much of the current
> > >> > slowness in the new InProcessRunner.
> > >> >
> > >> > Existing Runners do not require any changes; they may choose to take
> > >> > advantage of of the new optimization opportunity. However, user
> DoFns
> > >> may
> > >> > need to be revised to properly set up and tear down state in
> > startBundle
> > >> > and finishBundle, respectively, if the depended on only being used
> > for a
> > >> > single bundle.
> > >> >
> > >> > The first two updates are already in pull requests:
> > >> >
> > >> > PR #419 (https://github.com/apache/incubator-beam/pull/419) updates
> > the
> > >> > Javadoc to the new spec
> > >> > PR #418 (https://github.com/apache/incubator-beam/pull/418) updates
> > the
> > >> > DirectRunner to reuse DoFns according to the new policy.
> > >> >
> > >> > Yours,
> > >> >
> > >> > Thomas
> > >> >
> > >>
> > >>
> > >>
> > >
> > >
> >
> >
> >
> >
>

Re: DoFn Reuse

Posted by Thomas Groh <tg...@google.com.INVALID>.
finishBundle() **must** be called before any input consumption is committed
(i.e. marking inputs as completed, which incldues committing any elements
they produced). Doing otherwise can cause data loss, as the state of the
DoFn is lost if a worker dies, but the input elements will never be
reprocessed to recreate the DoFn state. If this occurs, any buffered
outputs are lost.

On Wed, Jun 8, 2016 at 8:21 AM, Bobby Evans <ev...@yahoo-inc.com.invalid>
wrote:

> The local java runner does arbitrary batching of 10 elements.
>
> I'm not sure if flink exposes this or not, but couldn't you use the
> checkpoint triggers to also start/finish a bundle?
>  - Bobby
>
>     On Wednesday, June 8, 2016 10:17 AM, Aljoscha Krettek <
> aljoscha@apache.org> wrote:
>
>
>  Ahh, what we could do is artificially induce bundles using either count or
> processing time or both. Just so that finishBundle() is called once in a
> while.
>
> On Wed, 8 Jun 2016 at 17:12 Aljoscha Krettek <al...@apache.org> wrote:
>
> > Pretty sure, yes. The Iterable in a MapPartitionFunction should give you
> > all the values in a given partition.
> >
> > I checked again for streaming execution. We're doing the opposite, right
> > now: every element is a bundle in itself, startBundle()/finishBundle()
> are
> > called for every element which seems a bit wasteful. The only other
> option
> > is to see all elements as one bundle, because Flink does not bundle/micro
> > batch elements in streaming execution.
> >
> > On Wed, 8 Jun 2016 at 16:38 Bobby Evans <ev...@yahoo-inc.com.invalid>
> > wrote:
> >
> >> Are you sure about that for Flink?  I thought the iterable finished when
> >> you processed a maximum number of elements or the input queue was empty
> so
> >> that it could returned control back to akka for better sharing of the
> >> thread pool.
> >>
> >>
> >>
> https://github.com/apache/incubator-beam/blob/af8f5935ca1866012ceb102b9472c8b1ef102d73/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/functions/FlinkDoFnFunction.java#L99
> >> Also in the javadocs for DoFn.Context it explicitly states that you can
> >> emit from the finishBundle method.
> >>
> >>
> >>
> https://github.com/apache/incubator-beam/blob/master/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/DoFn.java#L104-L110
> >> I thought I had seen some example of this being used for batching output
> >> to something downstream, like HDFS or Kafka, but I'm not sure on that.
> If
> >> you can emit from finsihBundle and an new instance of the DoFn will be
> >> created around each bundle then I can see some people trying to do
> >> aggregations inside a DoFn and then emitting them at the end of the
> bundle
> >> knowing that if a batch fails or is rolled back the system will handle
> it.
> >> If that is not allowed we should really update the javadocs around it to
> >> explain the pitfalls of doing this.
> >>  - Bobby
> >>
> >>    On Wednesday, June 8, 2016 4:24 AM, Aljoscha Krettek <
> >> aljoscha@apache.org> wrote:
> >>
> >>
> >>  Hi,
> >> a quick related question: In the Flink runner we basically see
> everything
> >> as one big bundle, i.e. we call startBundle() once at the beginning and
> >> then keep processing indefinitely, never calling finishBundle(). Is this
> >> also correct behavior?
> >>
> >> Best,
> >> Aljoscha
> >>
> >> On Tue, 7 Jun 2016 at 20:44 Thomas Groh <tg...@google.com.invalid>
> wrote:
> >>
> >> > Hey everyone;
> >> >
> >> > I'm starting to work on BEAM-38 (
> >> > https://issues.apache.org/jira/browse/BEAM-38), which enables an
> >> > optimization for runners with many small bundles. BEAM-38 allows
> >> runners to
> >> > reuse DoFn instances so long as that DoFn has not terminated
> abnormally.
> >> > This replaces the previous requirement that a DoFn be used for only a
> >> > single bundle if either of startBundle or finishBundle have been
> >> > overwritten.
> >> >
> >> > DoFn deserialization-per-bundle can be a significance performance
> >> > bottleneck when there are many small bundles, as is common in
> streaming
> >> > executions. It has also surfaced as the cause of much of the current
> >> > slowness in the new InProcessRunner.
> >> >
> >> > Existing Runners do not require any changes; they may choose to take
> >> > advantage of of the new optimization opportunity. However, user DoFns
> >> may
> >> > need to be revised to properly set up and tear down state in
> startBundle
> >> > and finishBundle, respectively, if the depended on only being used
> for a
> >> > single bundle.
> >> >
> >> > The first two updates are already in pull requests:
> >> >
> >> > PR #419 (https://github.com/apache/incubator-beam/pull/419) updates
> the
> >> > Javadoc to the new spec
> >> > PR #418 (https://github.com/apache/incubator-beam/pull/418) updates
> the
> >> > DirectRunner to reuse DoFns according to the new policy.
> >> >
> >> > Yours,
> >> >
> >> > Thomas
> >> >
> >>
> >>
> >>
> >
> >
>
>
>
>

Re: DoFn Reuse

Posted by Bobby Evans <ev...@yahoo-inc.com.INVALID>.
The local java runner does arbitrary batching of 10 elements.

I'm not sure if flink exposes this or not, but couldn't you use the checkpoint triggers to also start/finish a bundle?
 - Bobby 

    On Wednesday, June 8, 2016 10:17 AM, Aljoscha Krettek <al...@apache.org> wrote:
 

 Ahh, what we could do is artificially induce bundles using either count or
processing time or both. Just so that finishBundle() is called once in a
while.

On Wed, 8 Jun 2016 at 17:12 Aljoscha Krettek <al...@apache.org> wrote:

> Pretty sure, yes. The Iterable in a MapPartitionFunction should give you
> all the values in a given partition.
>
> I checked again for streaming execution. We're doing the opposite, right
> now: every element is a bundle in itself, startBundle()/finishBundle() are
> called for every element which seems a bit wasteful. The only other option
> is to see all elements as one bundle, because Flink does not bundle/micro
> batch elements in streaming execution.
>
> On Wed, 8 Jun 2016 at 16:38 Bobby Evans <ev...@yahoo-inc.com.invalid>
> wrote:
>
>> Are you sure about that for Flink?  I thought the iterable finished when
>> you processed a maximum number of elements or the input queue was empty so
>> that it could returned control back to akka for better sharing of the
>> thread pool.
>>
>>
>> https://github.com/apache/incubator-beam/blob/af8f5935ca1866012ceb102b9472c8b1ef102d73/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/functions/FlinkDoFnFunction.java#L99
>> Also in the javadocs for DoFn.Context it explicitly states that you can
>> emit from the finishBundle method.
>>
>>
>> https://github.com/apache/incubator-beam/blob/master/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/DoFn.java#L104-L110
>> I thought I had seen some example of this being used for batching output
>> to something downstream, like HDFS or Kafka, but I'm not sure on that.  If
>> you can emit from finsihBundle and an new instance of the DoFn will be
>> created around each bundle then I can see some people trying to do
>> aggregations inside a DoFn and then emitting them at the end of the bundle
>> knowing that if a batch fails or is rolled back the system will handle it.
>> If that is not allowed we should really update the javadocs around it to
>> explain the pitfalls of doing this.
>>  - Bobby
>>
>>    On Wednesday, June 8, 2016 4:24 AM, Aljoscha Krettek <
>> aljoscha@apache.org> wrote:
>>
>>
>>  Hi,
>> a quick related question: In the Flink runner we basically see everything
>> as one big bundle, i.e. we call startBundle() once at the beginning and
>> then keep processing indefinitely, never calling finishBundle(). Is this
>> also correct behavior?
>>
>> Best,
>> Aljoscha
>>
>> On Tue, 7 Jun 2016 at 20:44 Thomas Groh <tg...@google.com.invalid> wrote:
>>
>> > Hey everyone;
>> >
>> > I'm starting to work on BEAM-38 (
>> > https://issues.apache.org/jira/browse/BEAM-38), which enables an
>> > optimization for runners with many small bundles. BEAM-38 allows
>> runners to
>> > reuse DoFn instances so long as that DoFn has not terminated abnormally.
>> > This replaces the previous requirement that a DoFn be used for only a
>> > single bundle if either of startBundle or finishBundle have been
>> > overwritten.
>> >
>> > DoFn deserialization-per-bundle can be a significance performance
>> > bottleneck when there are many small bundles, as is common in streaming
>> > executions. It has also surfaced as the cause of much of the current
>> > slowness in the new InProcessRunner.
>> >
>> > Existing Runners do not require any changes; they may choose to take
>> > advantage of of the new optimization opportunity. However, user DoFns
>> may
>> > need to be revised to properly set up and tear down state in startBundle
>> > and finishBundle, respectively, if the depended on only being used for a
>> > single bundle.
>> >
>> > The first two updates are already in pull requests:
>> >
>> > PR #419 (https://github.com/apache/incubator-beam/pull/419) updates the
>> > Javadoc to the new spec
>> > PR #418 (https://github.com/apache/incubator-beam/pull/418) updates the
>> > DirectRunner to reuse DoFns according to the new policy.
>> >
>> > Yours,
>> >
>> > Thomas
>> >
>>
>>
>>
>
>


  

Re: DoFn Reuse

Posted by Aljoscha Krettek <al...@apache.org>.
Ahh, what we could do is artificially induce bundles using either count or
processing time or both. Just so that finishBundle() is called once in a
while.

On Wed, 8 Jun 2016 at 17:12 Aljoscha Krettek <al...@apache.org> wrote:

> Pretty sure, yes. The Iterable in a MapPartitionFunction should give you
> all the values in a given partition.
>
> I checked again for streaming execution. We're doing the opposite, right
> now: every element is a bundle in itself, startBundle()/finishBundle() are
> called for every element which seems a bit wasteful. The only other option
> is to see all elements as one bundle, because Flink does not bundle/micro
> batch elements in streaming execution.
>
> On Wed, 8 Jun 2016 at 16:38 Bobby Evans <ev...@yahoo-inc.com.invalid>
> wrote:
>
>> Are you sure about that for Flink?  I thought the iterable finished when
>> you processed a maximum number of elements or the input queue was empty so
>> that it could returned control back to akka for better sharing of the
>> thread pool.
>>
>>
>> https://github.com/apache/incubator-beam/blob/af8f5935ca1866012ceb102b9472c8b1ef102d73/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/functions/FlinkDoFnFunction.java#L99
>> Also in the javadocs for DoFn.Context it explicitly states that you can
>> emit from the finishBundle method.
>>
>>
>> https://github.com/apache/incubator-beam/blob/master/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/DoFn.java#L104-L110
>> I thought I had seen some example of this being used for batching output
>> to something downstream, like HDFS or Kafka, but I'm not sure on that.  If
>> you can emit from finsihBundle and an new instance of the DoFn will be
>> created around each bundle then I can see some people trying to do
>> aggregations inside a DoFn and then emitting them at the end of the bundle
>> knowing that if a batch fails or is rolled back the system will handle it.
>> If that is not allowed we should really update the javadocs around it to
>> explain the pitfalls of doing this.
>>  - Bobby
>>
>>     On Wednesday, June 8, 2016 4:24 AM, Aljoscha Krettek <
>> aljoscha@apache.org> wrote:
>>
>>
>>  Hi,
>> a quick related question: In the Flink runner we basically see everything
>> as one big bundle, i.e. we call startBundle() once at the beginning and
>> then keep processing indefinitely, never calling finishBundle(). Is this
>> also correct behavior?
>>
>> Best,
>> Aljoscha
>>
>> On Tue, 7 Jun 2016 at 20:44 Thomas Groh <tg...@google.com.invalid> wrote:
>>
>> > Hey everyone;
>> >
>> > I'm starting to work on BEAM-38 (
>> > https://issues.apache.org/jira/browse/BEAM-38), which enables an
>> > optimization for runners with many small bundles. BEAM-38 allows
>> runners to
>> > reuse DoFn instances so long as that DoFn has not terminated abnormally.
>> > This replaces the previous requirement that a DoFn be used for only a
>> > single bundle if either of startBundle or finishBundle have been
>> > overwritten.
>> >
>> > DoFn deserialization-per-bundle can be a significance performance
>> > bottleneck when there are many small bundles, as is common in streaming
>> > executions. It has also surfaced as the cause of much of the current
>> > slowness in the new InProcessRunner.
>> >
>> > Existing Runners do not require any changes; they may choose to take
>> > advantage of of the new optimization opportunity. However, user DoFns
>> may
>> > need to be revised to properly set up and tear down state in startBundle
>> > and finishBundle, respectively, if the depended on only being used for a
>> > single bundle.
>> >
>> > The first two updates are already in pull requests:
>> >
>> > PR #419 (https://github.com/apache/incubator-beam/pull/419) updates the
>> > Javadoc to the new spec
>> > PR #418 (https://github.com/apache/incubator-beam/pull/418) updates the
>> > DirectRunner to reuse DoFns according to the new policy.
>> >
>> > Yours,
>> >
>> > Thomas
>> >
>>
>>
>>
>
>

Re: DoFn Reuse

Posted by Aljoscha Krettek <al...@apache.org>.
Pretty sure, yes. The Iterable in a MapPartitionFunction should give you
all the values in a given partition.

I checked again for streaming execution. We're doing the opposite, right
now: every element is a bundle in itself, startBundle()/finishBundle() are
called for every element which seems a bit wasteful. The only other option
is to see all elements as one bundle, because Flink does not bundle/micro
batch elements in streaming execution.

On Wed, 8 Jun 2016 at 16:38 Bobby Evans <ev...@yahoo-inc.com.invalid> wrote:

> Are you sure about that for Flink?  I thought the iterable finished when
> you processed a maximum number of elements or the input queue was empty so
> that it could returned control back to akka for better sharing of the
> thread pool.
>
>
> https://github.com/apache/incubator-beam/blob/af8f5935ca1866012ceb102b9472c8b1ef102d73/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/functions/FlinkDoFnFunction.java#L99
> Also in the javadocs for DoFn.Context it explicitly states that you can
> emit from the finishBundle method.
>
>
> https://github.com/apache/incubator-beam/blob/master/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/DoFn.java#L104-L110
> I thought I had seen some example of this being used for batching output
> to something downstream, like HDFS or Kafka, but I'm not sure on that.  If
> you can emit from finsihBundle and an new instance of the DoFn will be
> created around each bundle then I can see some people trying to do
> aggregations inside a DoFn and then emitting them at the end of the bundle
> knowing that if a batch fails or is rolled back the system will handle it.
> If that is not allowed we should really update the javadocs around it to
> explain the pitfalls of doing this.
>  - Bobby
>
>     On Wednesday, June 8, 2016 4:24 AM, Aljoscha Krettek <
> aljoscha@apache.org> wrote:
>
>
>  Hi,
> a quick related question: In the Flink runner we basically see everything
> as one big bundle, i.e. we call startBundle() once at the beginning and
> then keep processing indefinitely, never calling finishBundle(). Is this
> also correct behavior?
>
> Best,
> Aljoscha
>
> On Tue, 7 Jun 2016 at 20:44 Thomas Groh <tg...@google.com.invalid> wrote:
>
> > Hey everyone;
> >
> > I'm starting to work on BEAM-38 (
> > https://issues.apache.org/jira/browse/BEAM-38), which enables an
> > optimization for runners with many small bundles. BEAM-38 allows runners
> to
> > reuse DoFn instances so long as that DoFn has not terminated abnormally.
> > This replaces the previous requirement that a DoFn be used for only a
> > single bundle if either of startBundle or finishBundle have been
> > overwritten.
> >
> > DoFn deserialization-per-bundle can be a significance performance
> > bottleneck when there are many small bundles, as is common in streaming
> > executions. It has also surfaced as the cause of much of the current
> > slowness in the new InProcessRunner.
> >
> > Existing Runners do not require any changes; they may choose to take
> > advantage of of the new optimization opportunity. However, user DoFns may
> > need to be revised to properly set up and tear down state in startBundle
> > and finishBundle, respectively, if the depended on only being used for a
> > single bundle.
> >
> > The first two updates are already in pull requests:
> >
> > PR #419 (https://github.com/apache/incubator-beam/pull/419) updates the
> > Javadoc to the new spec
> > PR #418 (https://github.com/apache/incubator-beam/pull/418) updates the
> > DirectRunner to reuse DoFns according to the new policy.
> >
> > Yours,
> >
> > Thomas
> >
>
>
>

Re: DoFn Reuse

Posted by Bobby Evans <ev...@yahoo-inc.com.INVALID>.
Are you sure about that for Flink?  I thought the iterable finished when you processed a maximum number of elements or the input queue was empty so that it could returned control back to akka for better sharing of the thread pool.

https://github.com/apache/incubator-beam/blob/af8f5935ca1866012ceb102b9472c8b1ef102d73/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/functions/FlinkDoFnFunction.java#L99
Also in the javadocs for DoFn.Context it explicitly states that you can emit from the finishBundle method.

https://github.com/apache/incubator-beam/blob/master/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/DoFn.java#L104-L110
I thought I had seen some example of this being used for batching output to something downstream, like HDFS or Kafka, but I'm not sure on that.  If you can emit from finsihBundle and an new instance of the DoFn will be created around each bundle then I can see some people trying to do aggregations inside a DoFn and then emitting them at the end of the bundle knowing that if a batch fails or is rolled back the system will handle it.  If that is not allowed we should really update the javadocs around it to explain the pitfalls of doing this.
 - Bobby 

    On Wednesday, June 8, 2016 4:24 AM, Aljoscha Krettek <al...@apache.org> wrote:
 

 Hi,
a quick related question: In the Flink runner we basically see everything
as one big bundle, i.e. we call startBundle() once at the beginning and
then keep processing indefinitely, never calling finishBundle(). Is this
also correct behavior?

Best,
Aljoscha

On Tue, 7 Jun 2016 at 20:44 Thomas Groh <tg...@google.com.invalid> wrote:

> Hey everyone;
>
> I'm starting to work on BEAM-38 (
> https://issues.apache.org/jira/browse/BEAM-38), which enables an
> optimization for runners with many small bundles. BEAM-38 allows runners to
> reuse DoFn instances so long as that DoFn has not terminated abnormally.
> This replaces the previous requirement that a DoFn be used for only a
> single bundle if either of startBundle or finishBundle have been
> overwritten.
>
> DoFn deserialization-per-bundle can be a significance performance
> bottleneck when there are many small bundles, as is common in streaming
> executions. It has also surfaced as the cause of much of the current
> slowness in the new InProcessRunner.
>
> Existing Runners do not require any changes; they may choose to take
> advantage of of the new optimization opportunity. However, user DoFns may
> need to be revised to properly set up and tear down state in startBundle
> and finishBundle, respectively, if the depended on only being used for a
> single bundle.
>
> The first two updates are already in pull requests:
>
> PR #419 (https://github.com/apache/incubator-beam/pull/419) updates the
> Javadoc to the new spec
> PR #418 (https://github.com/apache/incubator-beam/pull/418) updates the
> DirectRunner to reuse DoFns according to the new policy.
>
> Yours,
>
> Thomas
>


  

Re: DoFn Reuse

Posted by Aljoscha Krettek <al...@apache.org>.
Hi,
a quick related question: In the Flink runner we basically see everything
as one big bundle, i.e. we call startBundle() once at the beginning and
then keep processing indefinitely, never calling finishBundle(). Is this
also correct behavior?

Best,
Aljoscha

On Tue, 7 Jun 2016 at 20:44 Thomas Groh <tg...@google.com.invalid> wrote:

> Hey everyone;
>
> I'm starting to work on BEAM-38 (
> https://issues.apache.org/jira/browse/BEAM-38), which enables an
> optimization for runners with many small bundles. BEAM-38 allows runners to
> reuse DoFn instances so long as that DoFn has not terminated abnormally.
> This replaces the previous requirement that a DoFn be used for only a
> single bundle if either of startBundle or finishBundle have been
> overwritten.
>
> DoFn deserialization-per-bundle can be a significance performance
> bottleneck when there are many small bundles, as is common in streaming
> executions. It has also surfaced as the cause of much of the current
> slowness in the new InProcessRunner.
>
> Existing Runners do not require any changes; they may choose to take
> advantage of of the new optimization opportunity. However, user DoFns may
> need to be revised to properly set up and tear down state in startBundle
> and finishBundle, respectively, if the depended on only being used for a
> single bundle.
>
> The first two updates are already in pull requests:
>
> PR #419 (https://github.com/apache/incubator-beam/pull/419) updates the
> Javadoc to the new spec
> PR #418 (https://github.com/apache/incubator-beam/pull/418) updates the
> DirectRunner to reuse DoFns according to the new policy.
>
> Yours,
>
> Thomas
>