You are viewing a plain text version of this content. The canonical link for it is here.
Posted to dev@beam.apache.org by Kenneth Knowles <kl...@google.com.INVALID> on 2017/09/06 07:50:25 UTC

Side inputs, state, outputs

The scope of the side input proposal expanded to include discussion of
state and outputs. I didn't want to pollute the other thread with this, but
I do want to emphasize what is different about these.

PCollectionView: read-only, time-evolving, per window, automatic GC based
on WindowMappingFn(s), independent of key and transform, with only
data-dependency for happens-before. It is a value, basically. Adding a
get() method with indirection to some mutated-into-place context might not
be too bad.

State: read/write, per transform+key+window, trivial concurrency control,
automatic GC. It is a naive mutable field, basically. Instead of StateSpec
separate from State, you could probably put the read/write methods on the
declaration and hack a mutated-into-place context. A bit gross but maybe
there's a usable API in that idea somewhere.

Output: write-only, obviously comes from a particular transform. It is a
control-inverted return value, basically. Omitted from the new DoFn
proposal for simplicity, but make sense as intrinsic read-only fields of
the DoFn if they are not dynamic. One could imagine designs in which a
PCollection is just a Receiver/Consumer so you can capture it in your
closure and write to it without plumbing (and likely with concurrency
control) but that's entirely unexplored AFAIK.

So I agree with Eugene's initial point that only side inputs make sense
independent of transform, until more investigation has taken place.

Kenn


On Tue, Sep 5, 2017 at 11:24 PM, Eugene Kirpichov <
kirpichov@google.com.invalid> wrote:

> Hm, I guess you're right - for outputs it could be indeed quite valuable to
> output to them without plumbing (e.g. outputting errors). Could be done
> perhaps via TupleTag.output()? (assuming the same TupleTag can not be
> reused to tag multiple PCollection's)
>
> For now I sent a PR for side input support
> https://github.com/apache/beam/pull/3814 .
>
> On Tue, Sep 5, 2017 at 9:52 PM Lukasz Cwik <lc...@google.com.invalid>
> wrote:
>
> > I disagree, state may not care where it is used as well since a person
> may
> > call a function which needs to store/retrieve state and instead of having
> > the DoFn declare the StateSpec and then pass in the state implementation
> > down into the function everywhere. Similarly for outputs, the internal
> > functions could take the TupleTag and request an output manager or take
> an
> > "output" reference which give functions the ability to produce output
> > directly without needing to pass everything that is needed to be output
> > back to the caller.
> >
> > On Tue, Sep 5, 2017 at 9:23 PM, Eugene Kirpichov <
> > kirpichov@google.com.invalid> wrote:
> >
> > > Hm, I think of these things (state, side outputs etc.), only side
> inputs
> > > make sense to access in arbitrary user callbacks without explicit
> > knowledge
> > > of the surrounding transform - so only side inputs can be implicit like
> > > this.
> > >
> > > Ultimately we'll probably end up removing ProcessContext, and keeping
> > only
> > > annotations (on fields / methods / parameters). In that world, a field
> > > annotation could be used (like per my previous email) to statically
> > specify
> > > which side inputs will be needed - while the value could still be
> > accessed
> > > via .get(), just like state cells are accessed via .read() and
> .write():
> > > i.e., #get() is not a new method of access.
> > >
> > > Overall, it seems like I should proceed with the idea. I filed
> > > https://issues.apache.org/jira/browse/BEAM-2844.
> > >
> > > On Tue, Sep 5, 2017 at 9:08 PM Lukasz Cwik <lc...@google.com.invalid>
> > > wrote:
> > >
> > > > For API consistency reasons, it would be good if we did this
> > holistically
> > > > and expanded this approach to state, side outputs, ... so that a
> person
> > > can
> > > > always call Something.get() to return something that they can access
> > > > implementation wise. It will be confusing for our users to have many
> > > > variations in our style of how all these concepts are used
> > > (ProcessContext
> > > > / Annotations / #get())
> > > >
> > > > On Tue, Sep 5, 2017 at 8:08 AM, Eugene Kirpichov <
> > > > kirpichov@google.com.invalid> wrote:
> > > >
> > > > > Also, I think my approach is compatible with annotations and future
> > > > removal
> > > > > of .withSideInputs if we annotate a field:
> > > > > final PCollectionView<Foo> foo = ...;
> > > > >
> > > > > class MyDoFn {
> > > > >   @SideInput
> > > > >   PCollectionView<Foo> foo = foo;
> > > > >
> > > > >   ...foo.get()...
> > > > > }
> > > > >
> > > > > We can extract the accessed views from the DoFn instance using
> > > > reflection.
> > > > > Still not compatible with lambdas, but compatible automatically
> with
> > > all
> > > > > anonymous classes.
> > > > >
> > > > > On Tue, Sep 5, 2017, 8:02 AM Eugene Kirpichov <
> kirpichov@google.com>
> > > > > wrote:
> > > > >
> > > > > > Hi Luke,
> > > > > >
> > > > > > I know this (annotations) is the pattern we were considering for
> > side
> > > > > > inputs, but I no longer think it is the best way to access them.
> > > > > > Annotations help getting rid of the .withSideInputs() call, but
> > this
> > > is
> > > > > > where their advantage ends.
> > > > > >
> > > > > > The advantages of the proposed approach are that it automatically
> > > works
> > > > > > with all existing callback or lambda code. No need to further
> > develop
> > > > the
> > > > > > reflection machinery to support side input annotations - and
> > > especially
> > > > > to
> > > > > > support arbitrary user interfaces, no need to change existing
> > > > transforms,
> > > > > > no need for transform authors to even know that the machinery
> > exists
> > > to
> > > > > > make side inputs usable in their transforms (and no need for
> > authors
> > > to
> > > > > > think about whether or not they should support side inputs).
> > > > > >
> > > > > > Moreover, like Reuven says, annotations don't work with lambdas
> at
> > > all:
> > > > > > creating a lambda with a flexible set of annotation arguments
> > appears
> > > > to
> > > > > be
> > > > > > currently impossible, and even capturing the annotations on
> > arguments
> > > > of
> > > > > a
> > > > > > lambda is I believe also impossible because the Java compiler
> drops
> > > > them
> > > > > in
> > > > > > the generated class or method handle.
> > > > > >
> > > > > > On Tue, Sep 5, 2017 at 6:57 AM Lukasz Cwik
> > <lcwik@google.com.invalid
> > > >
> > > > > > wrote:
> > > > > >
> > > > > >> I believe we should follow the pattern that state uses and add a
> > > type
> > > > > >> annotation to link the side input definition to its usage
> > directly.
> > > > This
> > > > > >> would allow us to know that the side input was definitely being
> > > > accessed
> > > > > >> and perform validation during graph construction for any used
> but
> > > > > >> unspecified side inputs.
> > > > > >>
> > > > > >> Code snippet:
> > > > > >> final PCollectionView<String> foo = pipeline.apply("fooName",
> > > > > >> Create.of("foo")).apply(View.<String>asSingleton());
> > > > > >> PCollection<String> output = pipeline
> > > > > >>     .apply(Create.of(1, 2, 3))
> > > > > >>     .apply(MapElements.via(
> > > > > >>         new SimpleFunction<Integer, String>() {
> > > > > >>           @Override
> > > > > >>           public String apply(Integer input,
> @SideInput("fooName")
> > > > > String
> > > > > >> fooValue) {
> > > > > >>             return fooValue + " " + input;
> > > > > >>           }
> > > > > >>         }).withSideInputs(foo));*
> > > > > >>
> > > > > >> On Mon, Sep 4, 2017 at 6:22 PM, Eugene Kirpichov <
> > > > > >> kirpichov@google.com.invalid> wrote:
> > > > > >>
> > > > > >> > Sure, here's how a modified (passing) MapElements unit test
> > looks
> > > > > like,
> > > > > >> > with usage of side inputs:
> > > > > >> >
> > > > > >> >   @Test
> > > > > >> >   @Category(NeedsRunner.class)
> > > > > >> >   public void testMapBasicWithSideInput() throws Exception {
> > > > > >> >    * final PCollectionView<String> foo =*
> > > > > >> > *        pipeline.apply("foo",
> > > > > >> > Create.of("foo")).apply(View.<String>asSingleton());*
> > > > > >> >     PCollection<String> output = pipeline
> > > > > >> >         .apply(Create.of(1, 2, 3))
> > > > > >> >         .apply(MapElements.via(
> > > > > >> >             new SimpleFunction<Integer, String>() {
> > > > > >> >               @Override
> > > > > >> >               public String apply(Integer input) {
> > > > > >> >                 return* foo.get() *+ " " + input;
> > > > > >> >               }
> > > > > >> >             })
> > > > > >> >         *.withSideInputs(foo));*
> > > > > >> >
> > > > > >> >     PAssert.that(output).containsInAnyOrder("foo 1", "foo 2",
> > > "foo
> > > > > 3");
> > > > > >> >     pipeline.run();
> > > > > >> >   }
> > > > > >> >
> > > > > >> >
> > > > > >> > On Mon, Sep 4, 2017 at 6:12 PM Reuven Lax
> > > <relax@google.com.invalid
> > > > >
> > > > > >> > wrote:
> > > > > >> >
> > > > > >> > > Can you provide a code snippet showing how this would look?
> > > > > >> > >
> > > > > >> > > On Sun, Sep 3, 2017 at 6:49 PM, Eugene Kirpichov <
> > > > > >> > > kirpichov@google.com.invalid> wrote:
> > > > > >> > >
> > > > > >> > > > TL;DR Introduce method PCollectionView.get(), implemented
> > as:
> > > > get
> > > > > >> > > > thread-local ProcessContext and do c.sideInput(this). As a
> > > > result,
> > > > > >> any
> > > > > >> > > user
> > > > > >> > > > lambdas such as MapElements can use side inputs.
> > > > > >> > > >
> > > > > >> > > > Quite a few transforms have user-code callbacks or
> lambdas:
> > > > ParDo
> > > > > >> > (DoFn),
> > > > > >> > > > Map/FlatMapElements, the DynamicDestinations classes in
> > > various
> > > > > IOs,
> > > > > >> > > > combine fns, the PollFn callback in Watch, etc.
> > > > > >> > > >
> > > > > >> > > > Of these, only DoFn and CombineFn have built-in support
> for
> > > side
> > > > > >> > inputs;
> > > > > >> > > > for DynamicDestinations it is plumbed explicitly; others
> > don't
> > > > > have
> > > > > >> > > access
> > > > > >> > > > (e.g. you can't access side inputs from
> Map/FlatMapElements
> > > > > because
> > > > > >> > they
> > > > > >> > > > don't have a ProcessContext or any context for that
> matter).
> > > > > >> > > >
> > > > > >> > > > I think it's important to solve this, especially as Java 8
> > > > becomes
> > > > > >> > > people's
> > > > > >> > > > default choice: users will want to use side inputs in
> > > > > >> > > Map/FlatMapElements.
> > > > > >> > > >
> > > > > >> > > > It also appears to be quite easy to implement:
> > > > > >> > > >
> > > > > >> > > > Runner part:
> > > > > >> > > > - introduce a SideInputAccessor interface
> > > > > >> > > > - make .get() on a PCollectionView get it from a
> > thread-local
> > > > > >> > > > SideInputAccessor
> > > > > >> > > > - make runners set the thread-local SideInputAccessor any
> > time
> > > > the
> > > > > >> > runner
> > > > > >> > > > is evaluating something in a context where side inputs are
> > > > > >> available,
> > > > > >> > > e.g.
> > > > > >> > > > a ProcessElement method, or applying a CombineFn - the set
> > of
> > > > such
> > > > > >> > places
> > > > > >> > > > will be quite small. I believe only runners (but not
> > > transforms)
> > > > > >> will
> > > > > >> > > ever
> > > > > >> > > > need to set this thread-local
> > > > > >> > > >
> > > > > >> > > > Transform part:
> > > > > >> > > > - Transforms that take user-code lambdas and want to let
> > them
> > > > > access
> > > > > >> > side
> > > > > >> > > > inputs still will need to be configurable with a method
> like
> > > > > >> > > > .withSideInputs(view1, view2...) and will need to plumb
> > those
> > > > down
> > > > > >> to
> > > > > >> > the
> > > > > >> > > > primitive DoFn's or CombineFn's - information on *which*
> > side
> > > > > inputs
> > > > > >> > will
> > > > > >> > > > be accessed, of course, still needs to be in the graph.
> > > > > >> > > >
> > > > > >> > > > I quickly prototyped this in direct runner and
> MapElements,
> > > and
> > > > it
> > > > > >> > worked
> > > > > >> > > > with no issues - I'm wondering if there's something subtle
> > > that
> > > > > I'm
> > > > > >> > > > missing, or is it actually a good idea?
> > > > > >> > > >
> > > > > >> > >
> > > > > >> >
> > > > > >>
> > > > > >
> > > > >
> > > >
> > >
> >
>