You are viewing a plain text version of this content. The canonical link for it is here.
Posted to dev@beam.apache.org by Stephan Ewen <se...@apache.org> on 2016/05/13 11:12:37 UTC

Using Side Inputs to Join with Static Data Sets

Hi!

Aljoscha and me have been going through the side inputs quite a bit, and we
were wondering about the following:

How does one properly join a static data set with a stream?.

This sounds like a job for a side input, but would require that the side
input materializes the initial static data before the main input can begin
processing.

Given that the static data set is in a global window, and the Beam side
inputs only wait for the first element in the window to be available, the
main input would start joining against the side input prematurely.

Is this simply considered an uncommon use case, or is there a way to
realize this that we overlooked?

Greetings,
Stephan

Re: Using Side Inputs to Join with Static Data Sets

Posted by Aljoscha Krettek <al...@apache.org>.
Ah, this piece of Code (the DataflowPipelineRunner) explains why I was
talking about the continuation trigger in our earlier discussions on side
inputs. If you have code like this:

PCollection<> input = ...
PCollectionView<> view = input
  .apply(Window.into(...))
  .apply(Combine.Globally(...))
  .asSingletonView()

PCollection<> input2 = ...

input2.
  .apply(ParDo.withSideInputs(view).of(...))

the continuation trigger of the Trigger specified in the Window.into() will
decide when the side input is ready by controlling (implicitly) when the
"Combine.globally(Concatenate)" fires.

I wasn't aware of this inserted Combine.globally(Concatenate) so I thought
this logic was handled by the SideInputDoFnRunner directly. I think,
however, that the behavior would be the same if the SideInputDoFnRunner had
the Trigger and would perform the Concatenate internally.

On Fri, 13 May 2016 at 18:24 Kenneth Knowles <kl...@google.com.invalid> wrote:

> I think it may help to unpack the override & expansion of View.asXYZ() in
> the DataflowPipelineRunner [1] and the InProcessPipelineRunner [2]
>
> Each of these does:
>
> 1. some preparation, perhaps
> 2. concatenate the side input PColl into a single iterable (there's a GBK
> here; triggering)
> 3. materialize that iterable in a runner-specific way (for you, perhaps
> this is a write to a BroadcastVariable)
>
> So most everything said on this thread is right, but it is helpful to
> distinguish elements of the incoming PCollection being viewed from elements
> written to the side input in step (3) because of triggering in step (2).
> This is why in the global window you don't see the side input window ready
> when the first element arrives, but when the whole window triggers.
>
> This expansion is not something we envision being owned by the SDK, longer
> term, but is probably the best way to go right now.
>
> [1]
>
> https://github.com/apache/incubator-beam/blob/master/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/DataflowPipelineRunner.java#L2732
> [2]
>
> https://github.com/apache/incubator-beam/blob/master/runners/direct-java/src/main/java/org/apache/beam/runners/direct/ViewEvaluatorFactory.java#L103
>
> Kenn
>
> On Fri, May 13, 2016 at 8:48 AM, Lukasz Cwik <lc...@google.com.invalid>
> wrote:
>
> > A side input window becomes "ready" as soon as a trigger has fired
> > producing data within the PCollection which the side input view would be
> > over.
> >
> > For example, if the side input window is in the global window with an
> after
> > watermark trigger, it will fire once when all the data has been processed
> > along the side input path since the watermark will go from negative
> > infinity to positive infinity. This is the canonical way of how to load a
> > static dataset to use as a side input for streaming. Generally, the main
> > input will need to block till at least one pane has been output into the
> > side input PCollection.
> >
> >
> > On Fri, May 13, 2016 at 7:01 AM, Aljoscha Krettek <al...@apache.org>
> > wrote:
> >
> > > Hi,
> > > in streaming, side input for a window is considered ready as soon as at
> > > least one element is ready, this is the same for all kinds of side
> > inputs,
> > > i.e. List, Map, Singleton. This means that successive main-input
> elements
> > > can see a different side input List if more side-input elements keep
> > > arriving. Side input is also never scoped to a key, but always global
> > > (broadcast), that is if you have a Map you get the whole Map<K, V> from
> > > your c.sideInput() call.
> > >
> > > At least that's what I gathered from discussions on the ML with
> Kenneth.
> > > And that's why Stephan and I where wondering about the "correctness
> > > guarantees" that this gives and whether this is enough for most common
> > use
> > > cases.
> > >
> > > Cheers,
> > > Aljoscha
> > >
> > > On Fri, 13 May 2016 at 14:44 Maximilian Michels <mx...@apache.org>
> wrote:
> > >
> > > > Hi Stephan,
> > > >
> > > > As far as I understand side inputs, by definition, always need to be
> > > > "ready" before processing of any kind can start. What is considered
> > > > ready depends on the type of side input. If you use View.asList() or
> > > > View.asSingleton() then the whole side input needs to be
> materialized.
> > > > On the other hand, if you use View.asIterable(), processing can start
> > > > once the the first element arrives.
> > > >
> > > > If the side input itself is windowed, then the notion of "ready" only
> > > > applies to the individual windows. Side Input itself can also be
> > > > scoped by key if you use the View.asMap() or View.asMultimap() side
> > > > inputs views.
> > > >
> > > > From a quick look at the InProcessRunner it appears that processing
> > > > does not start until the side input of the window is ready. Beam
> > > > experts, please correct me if I got this wrong.
> > > >
> > > > Cheers,
> > > > Max
> > > >
> > > > On Fri, May 13, 2016 at 1:12 PM, Stephan Ewen <se...@apache.org>
> > wrote:
> > > > > Hi!
> > > > >
> > > > > Aljoscha and me have been going through the side inputs quite a
> bit,
> > > and
> > > > we
> > > > > were wondering about the following:
> > > > >
> > > > > How does one properly join a static data set with a stream?.
> > > > >
> > > > > This sounds like a job for a side input, but would require that the
> > > side
> > > > > input materializes the initial static data before the main input
> can
> > > > begin
> > > > > processing.
> > > > >
> > > > > Given that the static data set is in a global window, and the Beam
> > side
> > > > > inputs only wait for the first element in the window to be
> available,
> > > the
> > > > > main input would start joining against the side input prematurely.
> > > > >
> > > > > Is this simply considered an uncommon use case, or is there a way
> to
> > > > > realize this that we overlooked?
> > > > >
> > > > > Greetings,
> > > > > Stephan
> > > >
> > >
> >
>

Re: Using Side Inputs to Join with Static Data Sets

Posted by Kenneth Knowles <kl...@google.com.INVALID>.
I think it may help to unpack the override & expansion of View.asXYZ() in
the DataflowPipelineRunner [1] and the InProcessPipelineRunner [2]

Each of these does:

1. some preparation, perhaps
2. concatenate the side input PColl into a single iterable (there's a GBK
here; triggering)
3. materialize that iterable in a runner-specific way (for you, perhaps
this is a write to a BroadcastVariable)

So most everything said on this thread is right, but it is helpful to
distinguish elements of the incoming PCollection being viewed from elements
written to the side input in step (3) because of triggering in step (2).
This is why in the global window you don't see the side input window ready
when the first element arrives, but when the whole window triggers.

This expansion is not something we envision being owned by the SDK, longer
term, but is probably the best way to go right now.

[1]
https://github.com/apache/incubator-beam/blob/master/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/DataflowPipelineRunner.java#L2732
[2]
https://github.com/apache/incubator-beam/blob/master/runners/direct-java/src/main/java/org/apache/beam/runners/direct/ViewEvaluatorFactory.java#L103

Kenn

On Fri, May 13, 2016 at 8:48 AM, Lukasz Cwik <lc...@google.com.invalid>
wrote:

> A side input window becomes "ready" as soon as a trigger has fired
> producing data within the PCollection which the side input view would be
> over.
>
> For example, if the side input window is in the global window with an after
> watermark trigger, it will fire once when all the data has been processed
> along the side input path since the watermark will go from negative
> infinity to positive infinity. This is the canonical way of how to load a
> static dataset to use as a side input for streaming. Generally, the main
> input will need to block till at least one pane has been output into the
> side input PCollection.
>
>
> On Fri, May 13, 2016 at 7:01 AM, Aljoscha Krettek <al...@apache.org>
> wrote:
>
> > Hi,
> > in streaming, side input for a window is considered ready as soon as at
> > least one element is ready, this is the same for all kinds of side
> inputs,
> > i.e. List, Map, Singleton. This means that successive main-input elements
> > can see a different side input List if more side-input elements keep
> > arriving. Side input is also never scoped to a key, but always global
> > (broadcast), that is if you have a Map you get the whole Map<K, V> from
> > your c.sideInput() call.
> >
> > At least that's what I gathered from discussions on the ML with Kenneth.
> > And that's why Stephan and I where wondering about the "correctness
> > guarantees" that this gives and whether this is enough for most common
> use
> > cases.
> >
> > Cheers,
> > Aljoscha
> >
> > On Fri, 13 May 2016 at 14:44 Maximilian Michels <mx...@apache.org> wrote:
> >
> > > Hi Stephan,
> > >
> > > As far as I understand side inputs, by definition, always need to be
> > > "ready" before processing of any kind can start. What is considered
> > > ready depends on the type of side input. If you use View.asList() or
> > > View.asSingleton() then the whole side input needs to be materialized.
> > > On the other hand, if you use View.asIterable(), processing can start
> > > once the the first element arrives.
> > >
> > > If the side input itself is windowed, then the notion of "ready" only
> > > applies to the individual windows. Side Input itself can also be
> > > scoped by key if you use the View.asMap() or View.asMultimap() side
> > > inputs views.
> > >
> > > From a quick look at the InProcessRunner it appears that processing
> > > does not start until the side input of the window is ready. Beam
> > > experts, please correct me if I got this wrong.
> > >
> > > Cheers,
> > > Max
> > >
> > > On Fri, May 13, 2016 at 1:12 PM, Stephan Ewen <se...@apache.org>
> wrote:
> > > > Hi!
> > > >
> > > > Aljoscha and me have been going through the side inputs quite a bit,
> > and
> > > we
> > > > were wondering about the following:
> > > >
> > > > How does one properly join a static data set with a stream?.
> > > >
> > > > This sounds like a job for a side input, but would require that the
> > side
> > > > input materializes the initial static data before the main input can
> > > begin
> > > > processing.
> > > >
> > > > Given that the static data set is in a global window, and the Beam
> side
> > > > inputs only wait for the first element in the window to be available,
> > the
> > > > main input would start joining against the side input prematurely.
> > > >
> > > > Is this simply considered an uncommon use case, or is there a way to
> > > > realize this that we overlooked?
> > > >
> > > > Greetings,
> > > > Stephan
> > >
> >
>

Re: Using Side Inputs to Join with Static Data Sets

Posted by Lukasz Cwik <lc...@google.com.INVALID>.
A side input window becomes "ready" as soon as a trigger has fired
producing data within the PCollection which the side input view would be
over.

For example, if the side input window is in the global window with an after
watermark trigger, it will fire once when all the data has been processed
along the side input path since the watermark will go from negative
infinity to positive infinity. This is the canonical way of how to load a
static dataset to use as a side input for streaming. Generally, the main
input will need to block till at least one pane has been output into the
side input PCollection.


On Fri, May 13, 2016 at 7:01 AM, Aljoscha Krettek <al...@apache.org>
wrote:

> Hi,
> in streaming, side input for a window is considered ready as soon as at
> least one element is ready, this is the same for all kinds of side inputs,
> i.e. List, Map, Singleton. This means that successive main-input elements
> can see a different side input List if more side-input elements keep
> arriving. Side input is also never scoped to a key, but always global
> (broadcast), that is if you have a Map you get the whole Map<K, V> from
> your c.sideInput() call.
>
> At least that's what I gathered from discussions on the ML with Kenneth.
> And that's why Stephan and I where wondering about the "correctness
> guarantees" that this gives and whether this is enough for most common use
> cases.
>
> Cheers,
> Aljoscha
>
> On Fri, 13 May 2016 at 14:44 Maximilian Michels <mx...@apache.org> wrote:
>
> > Hi Stephan,
> >
> > As far as I understand side inputs, by definition, always need to be
> > "ready" before processing of any kind can start. What is considered
> > ready depends on the type of side input. If you use View.asList() or
> > View.asSingleton() then the whole side input needs to be materialized.
> > On the other hand, if you use View.asIterable(), processing can start
> > once the the first element arrives.
> >
> > If the side input itself is windowed, then the notion of "ready" only
> > applies to the individual windows. Side Input itself can also be
> > scoped by key if you use the View.asMap() or View.asMultimap() side
> > inputs views.
> >
> > From a quick look at the InProcessRunner it appears that processing
> > does not start until the side input of the window is ready. Beam
> > experts, please correct me if I got this wrong.
> >
> > Cheers,
> > Max
> >
> > On Fri, May 13, 2016 at 1:12 PM, Stephan Ewen <se...@apache.org> wrote:
> > > Hi!
> > >
> > > Aljoscha and me have been going through the side inputs quite a bit,
> and
> > we
> > > were wondering about the following:
> > >
> > > How does one properly join a static data set with a stream?.
> > >
> > > This sounds like a job for a side input, but would require that the
> side
> > > input materializes the initial static data before the main input can
> > begin
> > > processing.
> > >
> > > Given that the static data set is in a global window, and the Beam side
> > > inputs only wait for the first element in the window to be available,
> the
> > > main input would start joining against the side input prematurely.
> > >
> > > Is this simply considered an uncommon use case, or is there a way to
> > > realize this that we overlooked?
> > >
> > > Greetings,
> > > Stephan
> >
>

Re: Using Side Inputs to Join with Static Data Sets

Posted by Aljoscha Krettek <al...@apache.org>.
Hi,
in streaming, side input for a window is considered ready as soon as at
least one element is ready, this is the same for all kinds of side inputs,
i.e. List, Map, Singleton. This means that successive main-input elements
can see a different side input List if more side-input elements keep
arriving. Side input is also never scoped to a key, but always global
(broadcast), that is if you have a Map you get the whole Map<K, V> from
your c.sideInput() call.

At least that's what I gathered from discussions on the ML with Kenneth.
And that's why Stephan and I where wondering about the "correctness
guarantees" that this gives and whether this is enough for most common use
cases.

Cheers,
Aljoscha

On Fri, 13 May 2016 at 14:44 Maximilian Michels <mx...@apache.org> wrote:

> Hi Stephan,
>
> As far as I understand side inputs, by definition, always need to be
> "ready" before processing of any kind can start. What is considered
> ready depends on the type of side input. If you use View.asList() or
> View.asSingleton() then the whole side input needs to be materialized.
> On the other hand, if you use View.asIterable(), processing can start
> once the the first element arrives.
>
> If the side input itself is windowed, then the notion of "ready" only
> applies to the individual windows. Side Input itself can also be
> scoped by key if you use the View.asMap() or View.asMultimap() side
> inputs views.
>
> From a quick look at the InProcessRunner it appears that processing
> does not start until the side input of the window is ready. Beam
> experts, please correct me if I got this wrong.
>
> Cheers,
> Max
>
> On Fri, May 13, 2016 at 1:12 PM, Stephan Ewen <se...@apache.org> wrote:
> > Hi!
> >
> > Aljoscha and me have been going through the side inputs quite a bit, and
> we
> > were wondering about the following:
> >
> > How does one properly join a static data set with a stream?.
> >
> > This sounds like a job for a side input, but would require that the side
> > input materializes the initial static data before the main input can
> begin
> > processing.
> >
> > Given that the static data set is in a global window, and the Beam side
> > inputs only wait for the first element in the window to be available, the
> > main input would start joining against the side input prematurely.
> >
> > Is this simply considered an uncommon use case, or is there a way to
> > realize this that we overlooked?
> >
> > Greetings,
> > Stephan
>

Re: Using Side Inputs to Join with Static Data Sets

Posted by Maximilian Michels <mx...@apache.org>.
Hi Stephan,

As far as I understand side inputs, by definition, always need to be
"ready" before processing of any kind can start. What is considered
ready depends on the type of side input. If you use View.asList() or
View.asSingleton() then the whole side input needs to be materialized.
On the other hand, if you use View.asIterable(), processing can start
once the the first element arrives.

If the side input itself is windowed, then the notion of "ready" only
applies to the individual windows. Side Input itself can also be
scoped by key if you use the View.asMap() or View.asMultimap() side
inputs views.

From a quick look at the InProcessRunner it appears that processing
does not start until the side input of the window is ready. Beam
experts, please correct me if I got this wrong.

Cheers,
Max

On Fri, May 13, 2016 at 1:12 PM, Stephan Ewen <se...@apache.org> wrote:
> Hi!
>
> Aljoscha and me have been going through the side inputs quite a bit, and we
> were wondering about the following:
>
> How does one properly join a static data set with a stream?.
>
> This sounds like a job for a side input, but would require that the side
> input materializes the initial static data before the main input can begin
> processing.
>
> Given that the static data set is in a global window, and the Beam side
> inputs only wait for the first element in the window to be available, the
> main input would start joining against the side input prematurely.
>
> Is this simply considered an uncommon use case, or is there a way to
> realize this that we overlooked?
>
> Greetings,
> Stephan