You are viewing a plain text version of this content. The canonical link for it is here.
Posted to user@beam.apache.org by Sachin Mittal <sj...@gmail.com> on 2023/08/28 15:29:59 UTC

How can we get multiple side inputs from a single pipeline ?

Hi,

I was checking the code for side input patterns :

https://beam.apache.org/documentation/patterns/side-inputs/
Basically I need multiple side inputs from a  Slowly updating global window
side inputs.

So as per example pipeline is something like this:

PCollectionView<Map> map =
p.apply(GenerateSequence.from(0).withRate(1,
Duration.standardSeconds(5L)))            .apply(
ParDo.of(                    new DoFn<Long, Map<String, String>>() {
                   @ProcessElement                      public void
process(@Element Long input, @Timestamp Instant timestamp,
OutputReceiver<Map<String, String>> o) {
o.output(/* output a map */);                        // also output
another map and a list, is this possible ?                      }
              }))            .apply(
Window.<Map<String, String>>into(new GlobalWindows())
  .triggering(Repeatedly.forever(AfterProcessingTime.pastFirstElementInPane()))
                   .discardingFiredPanes())
.apply(Latest.globally())            .apply(View.asSingleton());


So as an extension of this example from the same DoFn which fetches the
side input, alongside the map, I may also need another Map and another List.
Reason I need to perform this in the same DoFn is that from this function
we query external sources to get the side input and the other side inputs
are also built from the same source.

So I would like to avoid querying external sources multiple times to
generate multiple side inputs from different DoFn and want to use the same
function to generate multiple side inputs.

 Can I achieve this by using  "Tags for multiple outputs" ?

Thanks
Sachin

Re: How can we get multiple side inputs from a single pipeline ?

Posted by Bruno Volpato via user <us...@beam.apache.org>.
Hi Sachin,

Yes, this seems fine to me -- your DoFn can output to specific tags, and
then use the PCollectionTuple.get(tagX), PCollectionTuple.get(tagY)
followed by View.asSingleton, View.asList, etc, to create different
PCollectionView instances.
Just be careful, you might need different triggers for each View -- the one
that you linked will work well for singleton, but may not be appropriate to
produce lists.

Let us know how it goes!
Best,
Bruno


On Mon, Aug 28, 2023 at 11:30 AM Sachin Mittal <sj...@gmail.com> wrote:

> Hi,
>
> I was checking the code for side input patterns :
>
> https://beam.apache.org/documentation/patterns/side-inputs/
> Basically I need multiple side inputs from a  Slowly updating global
> window side inputs.
>
> So as per example pipeline is something like this:
>
> PCollectionView<Map> map =        p.apply(GenerateSequence.from(0).withRate(1, Duration.standardSeconds(5L)))            .apply(                ParDo.of(                    new DoFn<Long, Map<String, String>>() {                      @ProcessElement                      public void process(@Element Long input, @Timestamp Instant timestamp, OutputReceiver<Map<String, String>> o) {                        o.output(/* output a map */);                        // also output another map and a list, is this possible ?                      }                    }))            .apply(                Window.<Map<String, String>>into(new GlobalWindows())                    .triggering(Repeatedly.forever(AfterProcessingTime.pastFirstElementInPane()))                    .discardingFiredPanes())            .apply(Latest.globally())            .apply(View.asSingleton());
>
>
> So as an extension of this example from the same DoFn which fetches the
> side input, alongside the map, I may also need another Map and another List.
> Reason I need to perform this in the same DoFn is that from this function
> we query external sources to get the side input and the other side inputs
> are also built from the same source.
>
> So I would like to avoid querying external sources multiple times to
> generate multiple side inputs from different DoFn and want to use the same
> function to generate multiple side inputs.
>
>  Can I achieve this by using  "Tags for multiple outputs" ?
>
> Thanks
> Sachin
>
>
>
>
>
>

Re: How can we get multiple side inputs from a single pipeline ?

Posted by Reuven Lax via user <us...@beam.apache.org>.
This looks fine. One caveat: there currently appears to be a bug in Beam
when you apply a combiner followed by View.asSingleton. I would
recommend replacing these lines:

.apply(Latest.globally())
.apply(View.asSingleton())

With the following:
.apply(Reify.timestamps())
.apply(Combine.globally(Latest.combineFn()).asSingletonView())

On Mon, Aug 28, 2023 at 8:30 AM Sachin Mittal <sj...@gmail.com> wrote:

> Hi,
>
> I was checking the code for side input patterns :
>
> https://beam.apache.org/documentation/patterns/side-inputs/
> Basically I need multiple side inputs from a  Slowly updating global
> window side inputs.
>
> So as per example pipeline is something like this:
>
> PCollectionView<Map> map =        p.apply(GenerateSequence.from(0).withRate(1, Duration.standardSeconds(5L)))            .apply(                ParDo.of(                    new DoFn<Long, Map<String, String>>() {                      @ProcessElement                      public void process(@Element Long input, @Timestamp Instant timestamp, OutputReceiver<Map<String, String>> o) {                        o.output(/* output a map */);                        // also output another map and a list, is this possible ?                      }                    }))            .apply(                Window.<Map<String, String>>into(new GlobalWindows())                    .triggering(Repeatedly.forever(AfterProcessingTime.pastFirstElementInPane()))                    .discardingFiredPanes())            .apply(Latest.globally())            .apply(View.asSingleton());
>
>
> So as an extension of this example from the same DoFn which fetches the
> side input, alongside the map, I may also need another Map and another List.
> Reason I need to perform this in the same DoFn is that from this function
> we query external sources to get the side input and the other side inputs
> are also built from the same source.
>
> So I would like to avoid querying external sources multiple times to
> generate multiple side inputs from different DoFn and want to use the same
> function to generate multiple side inputs.
>
>  Can I achieve this by using  "Tags for multiple outputs" ?
>
> Thanks
> Sachin
>
>
>
>
>
>