You are viewing a plain text version of this content. The canonical link for it is here.
Posted to user@beam.apache.org by "Bergmann, Rico (GfK External)" <Ri...@ext.gfk.com> on 2016/12/15 06:14:23 UTC

Immutability requirement for UDF input

Hi!

In the Beam documentation I read, that a DoFn should never modify any value of an incoming element (retrieved via ProcessContext.element(.)). I'm wondering, when this would be a problem, if I don't have any object reuse behavior in my execution environment. Can you give a hint, where this might cause problems?

And a second question: How would I implement a DISTINCT transformation on a PCollection?

Thanks in advance,
Rico.

________________________________


GfK SE, Nuremberg, Germany, commercial register at the local court Amtsgericht Nuremberg HRB 25014; Management Board: Dr. Gerhard Hausruckinger (Speaker of the Management Board), Christian Diedrich (CFO), Matthias Hartmann, David Krajicek, Alessandra Cama; Chairman of the Supervisory Board: Ralf Klein-B?lting This email and any attachments may contain confidential or privileged information. Please note that unauthorized copying, disclosure or distribution of the material in this email is not permitted.

Re: Immutability requirement for UDF input

Posted by Frances Perry <fj...@google.com>.
On Wed, Dec 14, 2016 at 11:56 PM, Bergmann, Rico (GfK External) <
Rico.Bergmann@ext.gfk.com> wrote:

> Thanks for the quick answer!
>
>
>
> So the ParDo fusion will only be possible if I run different DoFns on the
> same input concurrently, right?
>

There are different types of ParDo fusion -- produce-consumer (aka.
function composition), siblings, etc. And those are just one type of
optimizations (graph transformations) that runners might do, along with
things like combiner lifting, flatten sinking, etc.

I'm sure there are a bunch of good links available about what different
runners do. Here's a couple I'm most familiar with:

* Beam evolved out of a system called FlumeJava, and there's a paper from
2010 about FlumeJava's particular optimizations: http://research.google.com/
pubs/archive/35650.pdf
* The Cloud Dataflow Service current runs Beam pipelines with a number of
optimizations: https://cloud.google.com/dataflow/service/
dataflow-service-desc


> Or vice versa, if I don’t have concurrent DoFns running on the same input
> it woud be save to modify the input element?
>

Generally the model allows runners to make any semantic-preserving
optimization they wish -- and the semantics of the Beam model say that
inputs should not be reused. (The DirectRunner tests for this by default,
so if a DoFn does modify inputs, you will get unit test failures.) It's
also not clear to me what various runners may do in the face of retries --
if a failure occurs partway through a computation and is retried, the model
would allow a runner to use the same copy of the input.

So you could proceed, but your behavior will be undefined. ;-)

However, I do think there are use cases (particularly those focused on
incremental changes to a few, large elements) where it would be useful for
the computation to be able to signal to the runner that it wishes to mutate
its input, even at the cost of preventing certain optimizations. Filed
https://issues.apache.org/jira/browse/BEAM-1164 to track this.


>
>
> Best,
>
> Rico.
>
>
>
>
>
> *Von:* Frances Perry [mailto:fjp@google.com]
> *Gesendet:* Donnerstag, 15. Dezember 2016 07:28
> *An:* user@beam.incubator.apache.org
> *Betreff:* Re: Immutability requirement for UDF input
>
>
>
>
>
>
>
> On Wed, Dec 14, 2016 at 10:14 PM, Bergmann, Rico (GfK External) <
> Rico.Bergmann@ext.gfk.com> wrote:
>
> Hi!
>
>
>
> In the Beam documentation I read, that a DoFn should never modify any
> value of an incoming element (retrieved via ProcessContext.element(.)). I’m
> wondering, when this would be a problem, if I don’t have any object reuse
> behavior in my execution environment. Can you give a hint, where this might
> cause problems?
>
>
>
> The Beam model is designed specifically to gives runners some flexibility
> to support efficient execution. For example, many runners do an
> optimization called ParDo fusion, where a given element is run through a
> tree of adjacent ParDos and only materialized at the leaves. In this case,
> the output of one ParDo is handed straight to consuming ParDos, which means
> a single output element may be handed to multiple sibling consumers.
> Determining if a DoFn mutates an element is tricky, and requiring the
> runner to always copy the element just in case the first DoFn mutated it
> would introduce a significant performance cost.
>
>
>
>  And a second question: How would I implement a DISTINCT transformation on
> a PCollection?
>
>
>
> We already a Distinct transformation you can use, but you can also dive
> into the source code to see how it works!
>
> https://github.com/apache/incubator-beam/blob/master/sdks/
> java/core/src/main/java/org/apache/beam/sdk/transforms/Distinct.java
>
>
>
>
>
> Thanks in advance,
>
> Rico.
>
>
> ------------------------------
>
>
>
> GfK SE, Nuremberg, Germany, commercial register at the local court
> Amtsgericht Nuremberg HRB 25014; Management Board: Dr. Gerhard
> Hausruckinger (Speaker of the Management Board), Christian Diedrich (CFO),
> Matthias Hartmann, David Krajicek, Alessandra Cama; Chairman of the
> Supervisory Board: Ralf Klein-Bölting This email and any attachments may
> contain confidential or privileged information. Please note that
> unauthorized copying, disclosure or distribution of the material in this
> email is not permitted.
>
>
>
> ------------------------------
>
>
> GfK SE, Nuremberg, Germany, commercial register at the local court
> Amtsgericht Nuremberg HRB 25014; Management Board: Dr. Gerhard
> Hausruckinger (Speaker of the Management Board), Christian Diedrich (CFO),
> Matthias Hartmann, David Krajicek, Alessandra Cama; Chairman of the
> Supervisory Board: Ralf Klein-Bölting This email and any attachments may
> contain confidential or privileged information. Please note that
> unauthorized copying, disclosure or distribution of the material in this
> email is not permitted.
>

AW: Immutability requirement for UDF input

Posted by "Bergmann, Rico (GfK External)" <Ri...@ext.gfk.com>.
Thanks for the quick answer!

So the ParDo fusion will only be possible if I run different DoFns on the same input concurrently, right? Or vice versa, if I don’t have concurrent DoFns running on the same input it woud be save to modify the input element?

Best,
Rico.


Von: Frances Perry [mailto:fjp@google.com]
Gesendet: Donnerstag, 15. Dezember 2016 07:28
An: user@beam.incubator.apache.org
Betreff: Re: Immutability requirement for UDF input



On Wed, Dec 14, 2016 at 10:14 PM, Bergmann, Rico (GfK External) <Ri...@ext.gfk.com>> wrote:
Hi!

In the Beam documentation I read, that a DoFn should never modify any value of an incoming element (retrieved via ProcessContext.element(.)). I’m wondering, when this would be a problem, if I don’t have any object reuse behavior in my execution environment. Can you give a hint, where this might cause problems?

The Beam model is designed specifically to gives runners some flexibility to support efficient execution. For example, many runners do an optimization called ParDo fusion, where a given element is run through a tree of adjacent ParDos and only materialized at the leaves. In this case, the output of one ParDo is handed straight to consuming ParDos, which means a single output element may be handed to multiple sibling consumers. Determining if a DoFn mutates an element is tricky, and requiring the runner to always copy the element just in case the first DoFn mutated it would introduce a significant performance cost.

 And a second question: How would I implement a DISTINCT transformation on a PCollection?

We already a Distinct transformation you can use, but you can also dive into the source code to see how it works!
https://github.com/apache/incubator-beam/blob/master/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/Distinct.java


Thanks in advance,
Rico.

________________________________


GfK SE, Nuremberg, Germany, commercial register at the local court Amtsgericht Nuremberg HRB 25014; Management Board: Dr. Gerhard Hausruckinger (Speaker of the Management Board), Christian Diedrich (CFO), Matthias Hartmann, David Krajicek, Alessandra Cama; Chairman of the Supervisory Board: Ralf Klein-Bölting This email and any attachments may contain confidential or privileged information. Please note that unauthorized copying, disclosure or distribution of the material in this email is not permitted.


________________________________


GfK SE, Nuremberg, Germany, commercial register at the local court Amtsgericht Nuremberg HRB 25014; Management Board: Dr. Gerhard Hausruckinger (Speaker of the Management Board), Christian Diedrich (CFO), Matthias Hartmann, David Krajicek, Alessandra Cama; Chairman of the Supervisory Board: Ralf Klein-Bölting This email and any attachments may contain confidential or privileged information. Please note that unauthorized copying, disclosure or distribution of the material in this email is not permitted.

Re: Immutability requirement for UDF input

Posted by Frances Perry <fj...@google.com>.
On Wed, Dec 14, 2016 at 10:14 PM, Bergmann, Rico (GfK External) <
Rico.Bergmann@ext.gfk.com> wrote:

> Hi!
>
>
>
> In the Beam documentation I read, that a DoFn should never modify any
> value of an incoming element (retrieved via ProcessContext.element(.)). I’m
> wondering, when this would be a problem, if I don’t have any object reuse
> behavior in my execution environment. Can you give a hint, where this might
> cause problems?
>

The Beam model is designed specifically to gives runners some flexibility
to support efficient execution. For example, many runners do an
optimization called ParDo fusion, where a given element is run through a
tree of adjacent ParDos and only materialized at the leaves. In this case,
the output of one ParDo is handed straight to consuming ParDos, which means
a single output element may be handed to multiple sibling consumers.
Determining if a DoFn mutates an element is tricky, and requiring the
runner to always copy the element just in case the first DoFn mutated it
would introduce a significant performance cost.

 And a second question: How would I implement a DISTINCT transformation on
> a PCollection?
>
>
We already a Distinct transformation you can use, but you can also dive
into the source code to see how it works!
https://github.com/apache/incubator-beam/blob/master/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/Distinct.java


>
>
> Thanks in advance,
>
> Rico.
>
> ------------------------------
>
>
> GfK SE, Nuremberg, Germany, commercial register at the local court
> Amtsgericht Nuremberg HRB 25014; Management Board: Dr. Gerhard
> Hausruckinger (Speaker of the Management Board), Christian Diedrich (CFO),
> Matthias Hartmann, David Krajicek, Alessandra Cama; Chairman of the
> Supervisory Board: Ralf Klein-Bölting This email and any attachments may
> contain confidential or privileged information. Please note that
> unauthorized copying, disclosure or distribution of the material in this
> email is not permitted.
>