You are viewing a plain text version of this content. The canonical link for it is here.
Posted to user@beam.apache.org by Vilhelm von Ehrenheim <vo...@gmail.com> on 2018/04/16 11:15:35 UTC

Multiple firings on side input

Hi!
I have a side input with streaming updates in a global window. I have tried
to approach this several ways but can’t figure out how to do it. What I
really need is a side-input Map that should be updated when streaming input
change (i.e keys are updated).

I have tried to implement this with a View.asMap transform but got an error
that I have duplicate keys in my set (which are there due to multiple
triggered updates on the pcollection). I then tried to do it as a singleton
using a global CombineFn to build the map and use the a
Combine.globally().asSingletonView() instead. But I then got an error
java.lang.IllegalArgumentException:
PCollection with more than one element accessed as a singleton view.

How are you supposed to do this? In the documentation there is a part that
suggests that this should be possible:

If the side input has multiple trigger firings, Beam uses the value from
the latest trigger firing. This is particularly useful if you use a side
input with a single global window and specify a trigger.

Thanks!

// Vilhelm von Ehrenheim
​

Re: Multiple firings on side input

Posted by Vilhelm von Ehrenheim <vo...@gmail.com>.
Ok thanks! Ill give it a shot.

Btw. Maybe that part of the docs should be removed until retractions are in
place then? It made me really sure it should work for multiple firings.

Again, thanks for the help!

// Vilhelm

On Mon, 16 Apr 2018, 19:10 Kenneth Knowles, <kl...@google.com> wrote:

> Hi Vilhelm,
>
> This is a known issue in the Beam model. Trigger firings should
> automatically update downstream results, but instead they are treated as
> new elements. The design for retractions will alleviate this problem. But
> you can work around it yourself in specific cases like this.
>
> You can use View.asMultimap() which will mean each trigger firing for a
> key will add a new element to the set of values for a key. In order to
> distinguish the latest one, you will need to do some manual preparation.
>
>     // This is your triggered input; there will be duplicate keys
>     PCollection<KV<MyKey, MyValue>> input = ...
>
>     // Here I am just making up the types you need to implement to keep
> the index of the triggering
>     PCollectionView<MultiMap<MyKey, MyValuePlusSequenceNumber>> sideInput
> = input
>         .apply(ParDo.of(new DoFn<KV<MyKey, MyValue>, KV<MyKey,
> MyValuePlusSequenceNumber>() {
>           @ProcessElement
>           public void process(ProcessContext ctx) {
>             ctx.output(
>                 ctx.element().getKey(),
>                 new MyValuePlusSequenceNumber(
>                     ctx.element().getValue(),
>                     ctx.pane().getIndex()));
>           }
>         })
>         .apply(View.asMultiMap());
>
> This will allow you to grab all the trigger firings associated with a
> particular key and find the last. It is not ideal, either in clarity or
> performance, but it can work for some cases until we have retraction
> support.
>
> Apologies for typos or broken code here, as I am just typing it in email
> without checking its compilation or behavior.
>
> Kenn
>
> On Mon, Apr 16, 2018 at 4:15 AM Vilhelm von Ehrenheim <
> vonehrenheim@gmail.com> wrote:
>
>> Hi!
>> I have a side input with streaming updates in a global window. I have
>> tried to approach this several ways but can’t figure out how to do it. What
>> I really need is a side-input Map that should be updated when streaming
>> input change (i.e keys are updated).
>>
>> I have tried to implement this with a View.asMap transform but got an
>> error that I have duplicate keys in my set (which are there due to multiple
>> triggered updates on the pcollection). I then tried to do it as a singleton
>> using a global CombineFn to build the map and use the a
>> Combine.globally().asSingletonView() instead. But I then got an error java.lang.IllegalArgumentException:
>> PCollection with more than one element accessed as a singleton view.
>>
>> How are you supposed to do this? In the documentation there is a part
>> that suggests that this should be possible:
>>
>> If the side input has multiple trigger firings, Beam uses the value from
>> the latest trigger firing. This is particularly useful if you use a side
>> input with a single global window and specify a trigger.
>>
>> Thanks!
>>
>> // Vilhelm von Ehrenheim
>> ​
>>
>

Re: Multiple firings on side input

Posted by Kenneth Knowles <kl...@google.com>.
Hi Vilhelm,

This is a known issue in the Beam model. Trigger firings should
automatically update downstream results, but instead they are treated as
new elements. The design for retractions will alleviate this problem. But
you can work around it yourself in specific cases like this.

You can use View.asMultimap() which will mean each trigger firing for a key
will add a new element to the set of values for a key. In order to
distinguish the latest one, you will need to do some manual preparation.

    // This is your triggered input; there will be duplicate keys
    PCollection<KV<MyKey, MyValue>> input = ...

    // Here I am just making up the types you need to implement to keep the
index of the triggering
    PCollectionView<MultiMap<MyKey, MyValuePlusSequenceNumber>> sideInput =
input
        .apply(ParDo.of(new DoFn<KV<MyKey, MyValue>, KV<MyKey,
MyValuePlusSequenceNumber>() {
          @ProcessElement
          public void process(ProcessContext ctx) {
            ctx.output(
                ctx.element().getKey(),
                new MyValuePlusSequenceNumber(
                    ctx.element().getValue(),
                    ctx.pane().getIndex()));
          }
        })
        .apply(View.asMultiMap());

This will allow you to grab all the trigger firings associated with a
particular key and find the last. It is not ideal, either in clarity or
performance, but it can work for some cases until we have retraction
support.

Apologies for typos or broken code here, as I am just typing it in email
without checking its compilation or behavior.

Kenn

On Mon, Apr 16, 2018 at 4:15 AM Vilhelm von Ehrenheim <
vonehrenheim@gmail.com> wrote:

> Hi!
> I have a side input with streaming updates in a global window. I have
> tried to approach this several ways but can’t figure out how to do it. What
> I really need is a side-input Map that should be updated when streaming
> input change (i.e keys are updated).
>
> I have tried to implement this with a View.asMap transform but got an
> error that I have duplicate keys in my set (which are there due to multiple
> triggered updates on the pcollection). I then tried to do it as a singleton
> using a global CombineFn to build the map and use the a
> Combine.globally().asSingletonView() instead. But I then got an error java.lang.IllegalArgumentException:
> PCollection with more than one element accessed as a singleton view.
>
> How are you supposed to do this? In the documentation there is a part that
> suggests that this should be possible:
>
> If the side input has multiple trigger firings, Beam uses the value from
> the latest trigger firing. This is particularly useful if you use a side
> input with a single global window and specify a trigger.
>
> Thanks!
>
> // Vilhelm von Ehrenheim
> ​
>