You are viewing a plain text version of this content. The canonical link for it is here.
Posted to user@beam.apache.org by Aaron Dixon <at...@gmail.com> on 2019/11/06 05:29:10 UTC

Multiple triggers contained w/in side input?

From https://beam.apache.org/documentation/programming-guide/#side-inputs

> If the side input has multiple trigger firings, Beam uses the value from
the latest trigger firing. This is particularly useful if you use a side
input with a single global window and specify a trigger.

I have this sub-pipeline:

-> GlobalWindow (triggering AfterProcessingTime.pastFirstElementInPane)
-> Combine.perKey (Max)
-> View.asMap
...which I use as a side input.

But I get a "Duplicate values for <key>" error (DirectRunner). (Stack trace
below.)

But the only way for duplicate keys to come out of the global window is via
multiple triggers.

What am I missing?



===
java.lang.IllegalArgumentException: Duplicate values for :ihop
at
org.apache.beam.sdk.values.PCollectionViews$MapViewFn.apply(PCollectionViews.java:397)
at
org.apache.beam.sdk.values.PCollectionViews$MapViewFn.apply(PCollectionViews.java:373)
at
org.apache.beam.runners.direct.SideInputContainer$SideInputContainerSideInputReader.get(SideInputContainer.java:261)
at
org.apache.beam.repackaged.direct_java.runners.core.SimpleDoFnRunner.sideInput(SimpleDoFnRunner.java:247)
at
org.apache.beam.repackaged.direct_java.runners.core.SimpleDoFnRunner.access$900(SimpleDoFnRunner.java:74)
at
org.apache.beam.repackaged.direct_java.runners.core.SimpleDoFnRunner$DoFnProcessContext.sideInput(SimpleDoFnRunner.java:548)

Re: Multiple triggers contained w/in side input?

Posted by Aaron Dixon <at...@gmail.com>.
Oops -- you already did clarify. Thank you. I see your ticket you filed.
(Sorry, it's late.) Thanks again.

On Wed, Nov 6, 2019 at 12:24 AM Aaron Dixon <at...@gmail.com> wrote:

> I'm sorry for any alarms.  My pseudocode should have read:
>
> GlobalWindow (triggering *Repeatedly.forever* AfterProcessingTime.
> pastFirstElementInPane)
>
> I'm suspecting that this "last used trigger" does not apply to my usage ..
> ie, View.asMap when applied to a Combine.perKey.
>
> Perhaps this "last used trigger" only applies to views of
> Combine.globally-- could you confirm or clarify?
>
> Thanks.
>
> On Tue, Nov 5, 2019 at 11:54 PM Kenneth Knowles <ke...@apache.org> wrote:
>
>>
>>
>> On Tue, Nov 5, 2019 at 9:29 PM Aaron Dixon <at...@gmail.com> wrote:
>>
>>> From
>>> https://beam.apache.org/documentation/programming-guide/#side-inputs
>>>
>>> > If the side input has multiple trigger firings, Beam uses the value
>>> from the latest trigger firing. This is particularly useful if you use a
>>> side input with a single global window and specify a trigger.
>>>
>>
>> Sorry for this. The documentation is entirely wrong. If a side input has
>> multiple firings, then all elements from all firings are included in the
>> side input as though they are unrelated elements. I have filed
>> https://issues.apache.org/jira/browse/BEAM-8563 to at least prevent the
>> confusing result and give a good error message.
>>
>> I have this sub-pipeline:
>>>
>>> -> GlobalWindow (triggering AfterProcessingTime.pastFirstElementInPane)
>>>
>>
>> Are you sure you want this? It will drop all data after the first firing.
>> We are about to disable such triggers due to the data loss risk. See
>> https://s.apache.org/finishing-triggers-drop-data. If your intent is to
>> drop all subsequent data, I am interested in your use case. Can you share
>> more?
>>
>>
>>> -> Combine.perKey (Max)
>>> -> View.asMap
>>> ...which I use as a side input.
>>>
>>> But I get a "Duplicate values for <key>" error (DirectRunner). (Stack
>>> trace below.)
>>>
>>> But the only way for duplicate keys to come out of the global window is
>>> via multiple triggers.
>>>
>>> What am I missing?
>>>
>>
>> This is surprising. Can you share the actual code of your pipeline?
>> According to your pseudocode, this is impossible. The trigger you described
>> should never fire multiple times. But as I mentioned above, the trigger is
>> about to be forbidden. If we can learn about your usage, maybe that will
>> help.
>>
>> Kenn
>>
>>
>>>
>>>
>>>
>>> ===
>>> java.lang.IllegalArgumentException: Duplicate values for :ihop
>>> at
>>> org.apache.beam.sdk.values.PCollectionViews$MapViewFn.apply(PCollectionViews.java:397)
>>> at
>>> org.apache.beam.sdk.values.PCollectionViews$MapViewFn.apply(PCollectionViews.java:373)
>>> at
>>> org.apache.beam.runners.direct.SideInputContainer$SideInputContainerSideInputReader.get(SideInputContainer.java:261)
>>> at
>>> org.apache.beam.repackaged.direct_java.runners.core.SimpleDoFnRunner.sideInput(SimpleDoFnRunner.java:247)
>>> at
>>> org.apache.beam.repackaged.direct_java.runners.core.SimpleDoFnRunner.access$900(SimpleDoFnRunner.java:74)
>>> at
>>> org.apache.beam.repackaged.direct_java.runners.core.SimpleDoFnRunner$DoFnProcessContext.sideInput(SimpleDoFnRunner.java:548)
>>>
>>>
>>>

Re: Multiple triggers contained w/in side input?

Posted by Aaron Dixon <at...@gmail.com>.
I'm sorry for any alarms.  My pseudocode should have read:

GlobalWindow (triggering *Repeatedly.forever* AfterProcessingTime.
pastFirstElementInPane)

I'm suspecting that this "last used trigger" does not apply to my usage ..
ie, View.asMap when applied to a Combine.perKey.

Perhaps this "last used trigger" only applies to views of
Combine.globally-- could you confirm or clarify?

Thanks.

On Tue, Nov 5, 2019 at 11:54 PM Kenneth Knowles <ke...@apache.org> wrote:

>
>
> On Tue, Nov 5, 2019 at 9:29 PM Aaron Dixon <at...@gmail.com> wrote:
>
>> From https://beam.apache.org/documentation/programming-guide/#side-inputs
>>
>> > If the side input has multiple trigger firings, Beam uses the value
>> from the latest trigger firing. This is particularly useful if you use a
>> side input with a single global window and specify a trigger.
>>
>
> Sorry for this. The documentation is entirely wrong. If a side input has
> multiple firings, then all elements from all firings are included in the
> side input as though they are unrelated elements. I have filed
> https://issues.apache.org/jira/browse/BEAM-8563 to at least prevent the
> confusing result and give a good error message.
>
> I have this sub-pipeline:
>>
>> -> GlobalWindow (triggering AfterProcessingTime.pastFirstElementInPane)
>>
>
> Are you sure you want this? It will drop all data after the first firing.
> We are about to disable such triggers due to the data loss risk. See
> https://s.apache.org/finishing-triggers-drop-data. If your intent is to
> drop all subsequent data, I am interested in your use case. Can you share
> more?
>
>
>> -> Combine.perKey (Max)
>> -> View.asMap
>> ...which I use as a side input.
>>
>> But I get a "Duplicate values for <key>" error (DirectRunner). (Stack
>> trace below.)
>>
>> But the only way for duplicate keys to come out of the global window is
>> via multiple triggers.
>>
>> What am I missing?
>>
>
> This is surprising. Can you share the actual code of your pipeline?
> According to your pseudocode, this is impossible. The trigger you described
> should never fire multiple times. But as I mentioned above, the trigger is
> about to be forbidden. If we can learn about your usage, maybe that will
> help.
>
> Kenn
>
>
>>
>>
>>
>> ===
>> java.lang.IllegalArgumentException: Duplicate values for :ihop
>> at
>> org.apache.beam.sdk.values.PCollectionViews$MapViewFn.apply(PCollectionViews.java:397)
>> at
>> org.apache.beam.sdk.values.PCollectionViews$MapViewFn.apply(PCollectionViews.java:373)
>> at
>> org.apache.beam.runners.direct.SideInputContainer$SideInputContainerSideInputReader.get(SideInputContainer.java:261)
>> at
>> org.apache.beam.repackaged.direct_java.runners.core.SimpleDoFnRunner.sideInput(SimpleDoFnRunner.java:247)
>> at
>> org.apache.beam.repackaged.direct_java.runners.core.SimpleDoFnRunner.access$900(SimpleDoFnRunner.java:74)
>> at
>> org.apache.beam.repackaged.direct_java.runners.core.SimpleDoFnRunner$DoFnProcessContext.sideInput(SimpleDoFnRunner.java:548)
>>
>>
>>

Re: Multiple triggers contained w/in side input?

Posted by Kenneth Knowles <ke...@apache.org>.
Good questions,

On Wed, Nov 6, 2019 at 12:59 AM rahul patwari <ra...@gmail.com>
wrote:

> Hi Kenn,
>
> Does the side input has elements from the previous trigger even when used
> with .discardingFiredPanes() like
> https://beam.apache.org/documentation/patterns/side-inputs/#slowly-updating-global-window-side-inputs
>
>

Yes, the elements from the previous trigger firing will be there. The
elements with be different. Suppose:

 - you are doing a Sum and the inputs are 1, 2, 3, 4
 - you trigger after 1, 2 and then trigger again after 3, 4

There will always be two elements in the output, and two elements go into
the side input. The elements will be:

 - discardingFiredPanes: 3, 7
 - accumulatingFiredPanes: 3, 10

Does View.asSingleton() affect this behaviour?
>

View.asSingleton() will crash if you have multiple triggers on a
Combine.globally() or Sum.globally(), etc. Just like how View.asMap() will
crash if you have multiple triggers on a per-key Combine/Sum/etc.


> Thanks,
> Rahul
>
> On Wed, Nov 6, 2019 at 11:24 AM Kenneth Knowles <ke...@apache.org> wrote:
>
>>
>>
>> On Tue, Nov 5, 2019 at 9:29 PM Aaron Dixon <at...@gmail.com> wrote:
>>
>>> From
>>> https://beam.apache.org/documentation/programming-guide/#side-inputs
>>>
>>> > If the side input has multiple trigger firings, Beam uses the value
>>> from the latest trigger firing. This is particularly useful if you use a
>>> side input with a single global window and specify a trigger.
>>>
>>
>> Sorry for this. The documentation is entirely wrong. If a side input has
>> multiple firings, then all elements from all firings are included in the
>> side input as though they are unrelated elements. I have filed
>> https://issues.apache.org/jira/browse/BEAM-8563 to at least prevent the
>> confusing result and give a good error message.
>>
>> I have this sub-pipeline:
>>>
>>> -> GlobalWindow (triggering AfterProcessingTime.pastFirstElementInPane)
>>>
>>
>> Are you sure you want this? It will drop all data after the first firing.
>> We are about to disable such triggers due to the data loss risk. See
>> https://s.apache.org/finishing-triggers-drop-data. If your intent is to
>> drop all subsequent data, I am interested in your use case. Can you share
>> more?
>>
>>
>>> -> Combine.perKey (Max)
>>> -> View.asMap
>>> ...which I use as a side input.
>>>
>>> But I get a "Duplicate values for <key>" error (DirectRunner). (Stack
>>> trace below.)
>>>
>>> But the only way for duplicate keys to come out of the global window is
>>> via multiple triggers.
>>>
>>> What am I missing?
>>>
>>
>> This is surprising. Can you share the actual code of your pipeline?
>> According to your pseudocode, this is impossible. The trigger you described
>> should never fire multiple times. But as I mentioned above, the trigger is
>> about to be forbidden. If we can learn about your usage, maybe that will
>> help.
>>
>> Kenn
>>
>>
>>>
>>>
>>>
>>> ===
>>> java.lang.IllegalArgumentException: Duplicate values for :ihop
>>> at
>>> org.apache.beam.sdk.values.PCollectionViews$MapViewFn.apply(PCollectionViews.java:397)
>>> at
>>> org.apache.beam.sdk.values.PCollectionViews$MapViewFn.apply(PCollectionViews.java:373)
>>> at
>>> org.apache.beam.runners.direct.SideInputContainer$SideInputContainerSideInputReader.get(SideInputContainer.java:261)
>>> at
>>> org.apache.beam.repackaged.direct_java.runners.core.SimpleDoFnRunner.sideInput(SimpleDoFnRunner.java:247)
>>> at
>>> org.apache.beam.repackaged.direct_java.runners.core.SimpleDoFnRunner.access$900(SimpleDoFnRunner.java:74)
>>> at
>>> org.apache.beam.repackaged.direct_java.runners.core.SimpleDoFnRunner$DoFnProcessContext.sideInput(SimpleDoFnRunner.java:548)
>>>
>>>
>>>

Re: Multiple triggers contained w/in side input?

Posted by rahul patwari <ra...@gmail.com>.
Hi Kenn,

Does the side input has elements from the previous trigger even when used
with .discardingFiredPanes() like
https://beam.apache.org/documentation/patterns/side-inputs/#slowly-updating-global-window-side-inputs


Does View.asSingleton() affect this behaviour?

Thanks,
Rahul

On Wed, Nov 6, 2019 at 11:24 AM Kenneth Knowles <ke...@apache.org> wrote:

>
>
> On Tue, Nov 5, 2019 at 9:29 PM Aaron Dixon <at...@gmail.com> wrote:
>
>> From https://beam.apache.org/documentation/programming-guide/#side-inputs
>>
>> > If the side input has multiple trigger firings, Beam uses the value
>> from the latest trigger firing. This is particularly useful if you use a
>> side input with a single global window and specify a trigger.
>>
>
> Sorry for this. The documentation is entirely wrong. If a side input has
> multiple firings, then all elements from all firings are included in the
> side input as though they are unrelated elements. I have filed
> https://issues.apache.org/jira/browse/BEAM-8563 to at least prevent the
> confusing result and give a good error message.
>
> I have this sub-pipeline:
>>
>> -> GlobalWindow (triggering AfterProcessingTime.pastFirstElementInPane)
>>
>
> Are you sure you want this? It will drop all data after the first firing.
> We are about to disable such triggers due to the data loss risk. See
> https://s.apache.org/finishing-triggers-drop-data. If your intent is to
> drop all subsequent data, I am interested in your use case. Can you share
> more?
>
>
>> -> Combine.perKey (Max)
>> -> View.asMap
>> ...which I use as a side input.
>>
>> But I get a "Duplicate values for <key>" error (DirectRunner). (Stack
>> trace below.)
>>
>> But the only way for duplicate keys to come out of the global window is
>> via multiple triggers.
>>
>> What am I missing?
>>
>
> This is surprising. Can you share the actual code of your pipeline?
> According to your pseudocode, this is impossible. The trigger you described
> should never fire multiple times. But as I mentioned above, the trigger is
> about to be forbidden. If we can learn about your usage, maybe that will
> help.
>
> Kenn
>
>
>>
>>
>>
>> ===
>> java.lang.IllegalArgumentException: Duplicate values for :ihop
>> at
>> org.apache.beam.sdk.values.PCollectionViews$MapViewFn.apply(PCollectionViews.java:397)
>> at
>> org.apache.beam.sdk.values.PCollectionViews$MapViewFn.apply(PCollectionViews.java:373)
>> at
>> org.apache.beam.runners.direct.SideInputContainer$SideInputContainerSideInputReader.get(SideInputContainer.java:261)
>> at
>> org.apache.beam.repackaged.direct_java.runners.core.SimpleDoFnRunner.sideInput(SimpleDoFnRunner.java:247)
>> at
>> org.apache.beam.repackaged.direct_java.runners.core.SimpleDoFnRunner.access$900(SimpleDoFnRunner.java:74)
>> at
>> org.apache.beam.repackaged.direct_java.runners.core.SimpleDoFnRunner$DoFnProcessContext.sideInput(SimpleDoFnRunner.java:548)
>>
>>
>>

Re: Multiple triggers contained w/in side input?

Posted by Kenneth Knowles <ke...@apache.org>.
On Tue, Nov 5, 2019 at 9:29 PM Aaron Dixon <at...@gmail.com> wrote:

> From https://beam.apache.org/documentation/programming-guide/#side-inputs
>
> > If the side input has multiple trigger firings, Beam uses the value
> from the latest trigger firing. This is particularly useful if you use a
> side input with a single global window and specify a trigger.
>

Sorry for this. The documentation is entirely wrong. If a side input has
multiple firings, then all elements from all firings are included in the
side input as though they are unrelated elements. I have filed
https://issues.apache.org/jira/browse/BEAM-8563 to at least prevent the
confusing result and give a good error message.

I have this sub-pipeline:
>
> -> GlobalWindow (triggering AfterProcessingTime.pastFirstElementInPane)
>

Are you sure you want this? It will drop all data after the first firing.
We are about to disable such triggers due to the data loss risk. See
https://s.apache.org/finishing-triggers-drop-data. If your intent is to
drop all subsequent data, I am interested in your use case. Can you share
more?


> -> Combine.perKey (Max)
> -> View.asMap
> ...which I use as a side input.
>
> But I get a "Duplicate values for <key>" error (DirectRunner). (Stack
> trace below.)
>
> But the only way for duplicate keys to come out of the global window is
> via multiple triggers.
>
> What am I missing?
>

This is surprising. Can you share the actual code of your pipeline?
According to your pseudocode, this is impossible. The trigger you described
should never fire multiple times. But as I mentioned above, the trigger is
about to be forbidden. If we can learn about your usage, maybe that will
help.

Kenn


>
>
>
> ===
> java.lang.IllegalArgumentException: Duplicate values for :ihop
> at
> org.apache.beam.sdk.values.PCollectionViews$MapViewFn.apply(PCollectionViews.java:397)
> at
> org.apache.beam.sdk.values.PCollectionViews$MapViewFn.apply(PCollectionViews.java:373)
> at
> org.apache.beam.runners.direct.SideInputContainer$SideInputContainerSideInputReader.get(SideInputContainer.java:261)
> at
> org.apache.beam.repackaged.direct_java.runners.core.SimpleDoFnRunner.sideInput(SimpleDoFnRunner.java:247)
> at
> org.apache.beam.repackaged.direct_java.runners.core.SimpleDoFnRunner.access$900(SimpleDoFnRunner.java:74)
> at
> org.apache.beam.repackaged.direct_java.runners.core.SimpleDoFnRunner$DoFnProcessContext.sideInput(SimpleDoFnRunner.java:548)
>
>
>