You are viewing a plain text version of this content. The canonical link for it is here.
Posted to dev@beam.apache.org by Eugene Kirpichov <ki...@google.com> on 2017/12/04 23:19:49 UTC

Guarding against unsafe triggers at construction time

Hi,

After a recent investigation of a data loss bug caused by unintuitive
behavior of some kinds of triggers, we had a discussion about how we can
protect against future issues like this, and I summarized it in
https://issues.apache.org/jira/browse/BEAM-3288 . Copying here:

Current Beam trigger semantics are rather confusing and in some cases
extremely unsafe, especially if the pipeline includes multiple chained
GBKs. One example of that is https://issues.apache.org/jira/browse/BEAM-3169
 .

There's multiple issues:

The API allows users to specify terminating top-level triggers (e.g.
"trigger a pane after receiving 10000 elements in the window, and that's
it"), but experience from user support shows that this is nearly always a
mistake and the user did not intend to drop all further data.

In general, triggers are the only place in Beam where data is being dropped
without making a lot of very loud noise about it - a practice for which the
PTransform style guide uses the language: "never, ever, ever do this".

Continuation triggers are still worse. For context: continuation trigger is
the trigger that's set on the output of a GBK and controls further
aggregation of the results of this aggregation by downstream GBKs. The
output shouldn't just use the same trigger as the input, because e.g. if
the input trigger said "wait for an hour before emitting a pane", that
doesn't mean that we should wait for another hour before emitting a result
of aggregating the result of the input trigger. Continuation triggers try
to simulate the behavior "as if a pane of the input propagated through the
entire pipeline", but the implementation of individual continuation
triggers doesn't do that. E.g. the continuation of "first N elements in
pane" trigger is "first 1 element in pane", and if the results of a first
GBK are further grouped by a second GBK onto more coarse key (e.g. if
everything is grouped onto the same key), that effectively means that, of
the keys of the first GBK, only one survives and all others are dropped
(what happened in the data loss bug).

The ultimate fix to all of these things is
https://s.apache.org/beam-sink-triggers . However, it is a huge model
change, and meanwhile we have to do something. The options are, in order of
increasing backward incompatibility (but incompatibility in a "rejecting
something that previously was accepted but extremely dangerous" kind of
way):

   - *Make the continuation trigger of most triggers be the "always-fire"
   trigger.* Seems that this should be the case for all triggers except the
   watermark trigger. This will definitely increase safety, but lead to more
   eager firing of downstream aggregations. It also will violate a user's
   expectation that a fire-once trigger fires everything downstream only once,
   but that expectation appears impossible to satisfy safely.
   - *Make the continuation trigger of some triggers be the "invalid"
   trigger, *i.e. require the user to set it explicitly: there's in general
   no good and safe way to infer what a trigger on a second GBK "truly" should
   be, based on the trigger of the PCollection input into a first GBK. This is
   especially true for terminating triggers.
   - *Prohibit top-level terminating triggers entirely. *This will ensure
   that the only data that ever gets dropped is "droppably late" data.


Do people think that these options are sensible?
+Kenn Knowles <kl...@google.com> +Thomas Groh <tg...@google.com> +Ben Chambers
<bc...@google.com> is this a fair summary of our discussion?

Thanks!

Re: Guarding against unsafe triggers at construction time

Posted by Kenneth Knowles <kl...@google.com>.
My own thoughts inline on the three ideas discussed.

On Mon, Dec 4, 2017 at 3:19 PM, Eugene Kirpichov <ki...@google.com>
wrote:
>
> Continuation triggers are still worse. For context: continuation trigger
> is the trigger that's set on the output of a GBK and controls further
> aggregation of the results of this aggregation by downstream GBKs. The
> output shouldn't just use the same trigger as the input, because e.g. if
> the input trigger said "wait for an hour before emitting a pane", that
> doesn't mean that we should wait for another hour before emitting a result
> of aggregating the result of the input trigger. Continuation triggers try
> to simulate the behavior "as if a pane of the input propagated through the
> entire pipeline", but the implementation of individual continuation
> triggers doesn't do that. E.g. the continuation of "first N elements in
> pane" trigger is "first 1 element in pane", and if the results of a first
> GBK are further grouped by a second GBK onto more coarse key (e.g. if
> everything is grouped onto the same key), that effectively means that, of
> the keys of the first GBK, only one survives and all others are dropped
> (what happened in the data loss bug).
>

Additional flavor: the full version of the faulty spec for "once trigger"
is that the continuation trigger is a once trigger that fires once *after
awaiting all of the once-fired data from upstream*. This spec is not met -
the continuation trigger does not correctly wait for all upstream data.
FWIW we can still try to meet that spec while still forbidding dropping - a
correct design would have the "once firing" effect but bugs would manifest
as an extra firing instead of data loss. But I also don't think the spec is
worth keeping.

The ultimate fix to all of these things is https://s.apache.org/beam-
> sink-triggers . However, it is a huge model change, and meanwhile we have
> to do something. The options are, in order of increasing backward
> incompatibility (but incompatibility in a "rejecting something that
> previously was accepted but extremely dangerous" kind of way):
>
>    - *Make the continuation trigger of most triggers be the "always-fire"
>    trigger.* Seems that this should be the case for all triggers except
>    the watermark trigger. This will definitely increase safety, but lead to
>    more eager firing of downstream aggregations. It also will violate a user's
>    expectation that a fire-once trigger fires everything downstream only once,
>    but that expectation appears impossible to satisfy safely.
>
>
+1 This doesn't solve the issue, but I think we should do it in addition.
It makes the upstream trigger govern latency and we can define it to be
compatible with any trigger, letting the other trigger govern the output.
This will fix the issue that a three-way join should still work (at least
not crash!) when expressed as successive binary joins. This is currently
not true because continuation trigger of processing time is sync processing
time, so the trigger of the first binary join is judge incompatible with
the remaining input. Also, most runners do not implement synchronized
processing time; its utility is a bit questionable in this space anyhow.

Note: there are only three leaf triggers: processing time, count, and end
of window. Element count already has "always trigger" as a continuation,
but in a form that might appear incompatible with other triggers.
Processing time has the problematic sync processing time - since it already
doesn't work, let's just make it "always trigger". And leave EOW alone.


>    - *Make the continuation trigger of some triggers be the "invalid"
>    trigger, *i.e. require the user to set it explicitly: there's in
>    general no good and safe way to infer what a trigger on a second GBK
>    "truly" should be, based on the trigger of the PCollection input into a
>    first GBK. This is especially true for terminating triggers.
>
>
-0.5 Meh from me. Triggers are already a pain enough. Manual interventions
like this will be extra annoying and may often cross composite boundaries,
breaking abstractions.


>    - *Prohibit top-level terminating triggers entirely. *This will ensure
>    that the only data that ever gets dropped is "droppably late" data.
>
>
+1 I've wanted this for a long time, but didn't make it in for v2. Given
the current behavior is data loss, I would be OK with doing it without a
major version bump. I know most people only want to fix code bugs in minor
versions, but I tend to also favor fixing semantic bugs like this. Anyone
who is currently not losing data will continue to not lose data.



> Do people think that these options are sensible?
> +Kenn Knowles <kl...@google.com> +Thomas Groh <tg...@google.com> +Ben
> Chambers <bc...@google.com> is this a fair summary of our discussion?
>

Yup.

Kenn

Re: Guarding against unsafe triggers at construction time

Posted by Thomas Groh <tg...@google.com>.
I'm in favor of option 3 in all cases; in favor of option 2 if it's
considered to be "more backwards-compatible" than option 1;

Option 1 I'm in favor of at minimum making the continuation trigger for all
triggers be nonterminating. I'm not super bothered by how we accomplish
that, so long as we don't drop more data due to a continuation trigger.

On Mon, Dec 4, 2017 at 4:02 PM, Raghu Angadi <ra...@google.com> wrote:

> I have been thinking about this since last week's discussions about
> buffering in sinks and was reading https://s.apache.org/beam-sink-triggers. It
> says BEAM-3169 is an example of a bug caused by misunderstanding of trigger
> semantics.
>
>   - I would like to know which part of the (documented) trigger semantics
> it misunderstood. I personally didn't think about how triggers are
> propagated/enforced downstream. Always thought of them in the context of
> the current step. JavaDoc for triggers does not mention it explicitly.
>
>   - In addition the fix for BEAM-3169 seems to essentially using
> Reshuffle, which places elements in its own window. KafkaIO's exactly-once
> sink does something similar
> <https://github.com/apache/beam/blob/dedc5e8f25a560ad9d620cab468f417def1747fb/sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/KafkaIO.java#L1930>.
> Is it a violation of triggers set upstream (as mentioned in a in a recent dev
> thread
> <https://lists.apache.org/thread.html/ebcb316edb85c6bb2c1024a8ac52f0647dab069310b217802b735962@%3Cdev.beam.apache.org%3E>
> )?
>
> I guess I am hoping for expanded JavaDoc to describe trigger semantics in
> much more detail (preferably with examples) such that users and developers
> can understand better not suffer from many subtle bugs. Best practices are
> useful, of course, and having users actually understand the right semantics
> is also very useful.
>
>
> On Mon, Dec 4, 2017 at 3:19 PM, Eugene Kirpichov <ki...@google.com>
> wrote:
>
>> Hi,
>>
>> After a recent investigation of a data loss bug caused by unintuitive
>> behavior of some kinds of triggers, we had a discussion about how we can
>> protect against future issues like this, and I summarized it in
>> https://issues.apache.org/jira/browse/BEAM-3288 . Copying here:
>>
>> Current Beam trigger semantics are rather confusing and in some cases
>> extremely unsafe, especially if the pipeline includes multiple chained
>> GBKs. One example of that is https://issues.apache.org/j
>> ira/browse/BEAM-3169 .
>>
>> There's multiple issues:
>>
>> The API allows users to specify terminating top-level triggers (e.g.
>> "trigger a pane after receiving 10000 elements in the window, and that's
>> it"), but experience from user support shows that this is nearly always a
>> mistake and the user did not intend to drop all further data.
>>
>> In general, triggers are the only place in Beam where data is being
>> dropped without making a lot of very loud noise about it - a practice for
>> which the PTransform style guide uses the language: "never, ever, ever do
>> this".
>>
>> Continuation triggers are still worse. For context: continuation trigger
>> is the trigger that's set on the output of a GBK and controls further
>> aggregation of the results of this aggregation by downstream GBKs. The
>> output shouldn't just use the same trigger as the input, because e.g. if
>> the input trigger said "wait for an hour before emitting a pane", that
>> doesn't mean that we should wait for another hour before emitting a result
>> of aggregating the result of the input trigger. Continuation triggers try
>> to simulate the behavior "as if a pane of the input propagated through the
>> entire pipeline", but the implementation of individual continuation
>> triggers doesn't do that. E.g. the continuation of "first N elements in
>> pane" trigger is "first 1 element in pane", and if the results of a first
>> GBK are further grouped by a second GBK onto more coarse key (e.g. if
>> everything is grouped onto the same key), that effectively means that, of
>> the keys of the first GBK, only one survives and all others are dropped
>> (what happened in the data loss bug).
>>
>> The ultimate fix to all of these things is https://s.apache.org/beam-s
>> ink-triggers . However, it is a huge model change, and meanwhile we have
>> to do something. The options are, in order of increasing backward
>> incompatibility (but incompatibility in a "rejecting something that
>> previously was accepted but extremely dangerous" kind of way):
>>
>>    - *Make the continuation trigger of most triggers be the
>>    "always-fire" trigger.* Seems that this should be the case for all
>>    triggers except the watermark trigger. This will definitely increase
>>    safety, but lead to more eager firing of downstream aggregations. It also
>>    will violate a user's expectation that a fire-once trigger fires everything
>>    downstream only once, but that expectation appears impossible to satisfy
>>    safely.
>>    - *Make the continuation trigger of some triggers be the "invalid"
>>    trigger, *i.e. require the user to set it explicitly: there's in
>>    general no good and safe way to infer what a trigger on a second GBK
>>    "truly" should be, based on the trigger of the PCollection input into a
>>    first GBK. This is especially true for terminating triggers.
>>    - *Prohibit top-level terminating triggers entirely. *This will
>>    ensure that the only data that ever gets dropped is "droppably late" data.
>>
>>
>> Do people think that these options are sensible?
>> +Kenn Knowles <kl...@google.com> +Thomas Groh <tg...@google.com> +Ben
>> Chambers <bc...@google.com> is this a fair summary of our discussion?
>>
>> Thanks!
>>
>
>

Re: Guarding against unsafe triggers at construction time

Posted by Raghu Angadi <ra...@google.com>.
I have been thinking about this since last week's discussions about
buffering in sinks and was reading https://s.apache.org/beam-sink-triggers. It
says BEAM-3169 is an example of a bug caused by misunderstanding of trigger
semantics.

  - I would like to know which part of the (documented) trigger semantics
it misunderstood. I personally didn't think about how triggers are
propagated/enforced downstream. Always thought of them in the context of
the current step. JavaDoc for triggers does not mention it explicitly.

  - In addition the fix for BEAM-3169 seems to essentially using Reshuffle,
which places elements in its own window. KafkaIO's exactly-once sink
does something
similar
<https://github.com/apache/beam/blob/dedc5e8f25a560ad9d620cab468f417def1747fb/sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/KafkaIO.java#L1930>.
Is it a violation of triggers set upstream (as mentioned in a in a recent dev
thread
<https://lists.apache.org/thread.html/ebcb316edb85c6bb2c1024a8ac52f0647dab069310b217802b735962@%3Cdev.beam.apache.org%3E>
)?

I guess I am hoping for expanded JavaDoc to describe trigger semantics in
much more detail (preferably with examples) such that users and developers
can understand better not suffer from many subtle bugs. Best practices are
useful, of course, and having users actually understand the right semantics
is also very useful.


On Mon, Dec 4, 2017 at 3:19 PM, Eugene Kirpichov <ki...@google.com>
wrote:

> Hi,
>
> After a recent investigation of a data loss bug caused by unintuitive
> behavior of some kinds of triggers, we had a discussion about how we can
> protect against future issues like this, and I summarized it in
> https://issues.apache.org/jira/browse/BEAM-3288 . Copying here:
>
> Current Beam trigger semantics are rather confusing and in some cases
> extremely unsafe, especially if the pipeline includes multiple chained
> GBKs. One example of that is https://issues.apache.org/
> jira/browse/BEAM-3169 .
>
> There's multiple issues:
>
> The API allows users to specify terminating top-level triggers (e.g.
> "trigger a pane after receiving 10000 elements in the window, and that's
> it"), but experience from user support shows that this is nearly always a
> mistake and the user did not intend to drop all further data.
>
> In general, triggers are the only place in Beam where data is being
> dropped without making a lot of very loud noise about it - a practice for
> which the PTransform style guide uses the language: "never, ever, ever do
> this".
>
> Continuation triggers are still worse. For context: continuation trigger
> is the trigger that's set on the output of a GBK and controls further
> aggregation of the results of this aggregation by downstream GBKs. The
> output shouldn't just use the same trigger as the input, because e.g. if
> the input trigger said "wait for an hour before emitting a pane", that
> doesn't mean that we should wait for another hour before emitting a result
> of aggregating the result of the input trigger. Continuation triggers try
> to simulate the behavior "as if a pane of the input propagated through the
> entire pipeline", but the implementation of individual continuation
> triggers doesn't do that. E.g. the continuation of "first N elements in
> pane" trigger is "first 1 element in pane", and if the results of a first
> GBK are further grouped by a second GBK onto more coarse key (e.g. if
> everything is grouped onto the same key), that effectively means that, of
> the keys of the first GBK, only one survives and all others are dropped
> (what happened in the data loss bug).
>
> The ultimate fix to all of these things is https://s.apache.org/beam-
> sink-triggers . However, it is a huge model change, and meanwhile we have
> to do something. The options are, in order of increasing backward
> incompatibility (but incompatibility in a "rejecting something that
> previously was accepted but extremely dangerous" kind of way):
>
>    - *Make the continuation trigger of most triggers be the "always-fire"
>    trigger.* Seems that this should be the case for all triggers except
>    the watermark trigger. This will definitely increase safety, but lead to
>    more eager firing of downstream aggregations. It also will violate a user's
>    expectation that a fire-once trigger fires everything downstream only once,
>    but that expectation appears impossible to satisfy safely.
>    - *Make the continuation trigger of some triggers be the "invalid"
>    trigger, *i.e. require the user to set it explicitly: there's in
>    general no good and safe way to infer what a trigger on a second GBK
>    "truly" should be, based on the trigger of the PCollection input into a
>    first GBK. This is especially true for terminating triggers.
>    - *Prohibit top-level terminating triggers entirely. *This will ensure
>    that the only data that ever gets dropped is "droppably late" data.
>
>
> Do people think that these options are sensible?
> +Kenn Knowles <kl...@google.com> +Thomas Groh <tg...@google.com> +Ben
> Chambers <bc...@google.com> is this a fair summary of our discussion?
>
> Thanks!
>

Re: Guarding against unsafe triggers at construction time

Posted by Eugene Kirpichov <ki...@google.com>.
FWIW, things seem to work if I make the continuations of AfterPane and
AfterProcessingTime wrapped into Repeatedly.forever() - then, in DISCARDING
mode, they pass the property.
They still fail it in ACCUMULATING mode. I think ACCUMULATING mode is
currently just semantically broken for stacked aggregations and needs to be
forbidden.

On Fri, Dec 8, 2017 at 1:10 PM Reuven Lax <re...@google.com> wrote:

> As I've mentioned before, I think OneTriggers are generally a bad idea.
>
> On Fri, Dec 8, 2017 at 1:05 PM, Eugene Kirpichov <ki...@google.com>
> wrote:
>
>> The property that stacking more GBKs doesn't drop more data than a first
>> one already drops, nor introduce duplicate data (see previous email,
>> starting with "I'd propose the following property:").
>>
>> On Fri, Dec 8, 2017 at 12:29 PM Reuven Lax <re...@google.com> wrote:
>>
>>> Which intuitive property?
>>>
>>> On Fri, Dec 8, 2017 at 12:11 PM, Eugene Kirpichov <ki...@google.com>
>>> wrote:
>>>
>>>> I created a test demonstrating that our triggers violate this pretty
>>>> intuitive property.
>>>>
>>>> https://github.com/apache/beam/pull/4239
>>>>
>>>> I think we should discuss this: seems like a pretty important property
>>>> to me and seems pretty bad that it's violated. Again, per discussion below,
>>>> some of it should be addressed with changing semantics of individual
>>>> triggers, and some with prohibiting certain combinations of triggers /
>>>> accumulation modes / GBKs.
>>>>
>>>> I tried changing the continuation trigger of some triggers to be the
>>>> "always fire" trigger, however we have a lot of code assuming that it will
>>>> be a OnceTrigger (e.g. that continuation of a OnceTrigger has to be a
>>>> OnceTrigger, and of course that components of a OnceTrigger have to be a
>>>> OnceTrigger) and I didn't yet dig into whether that division is at all
>>>> important or just baked in at compile time.
>>>>
>>>> On Thu, Dec 7, 2017 at 5:20 PM Eugene Kirpichov <ki...@google.com>
>>>> wrote:
>>>>
>>>>> On Mon, Dec 4, 2017 at 4:24 PM Robert Bradshaw <ro...@google.com>
>>>>> wrote:
>>>>>
>>>>>> On Mon, Dec 4, 2017 at 3:19 PM, Eugene Kirpichov <
>>>>>> kirpichov@google.com> wrote:
>>>>>> > Hi,
>>>>>> >
>>>>>> > After a recent investigation of a data loss bug caused by
>>>>>> unintuitive
>>>>>> > behavior of some kinds of triggers, we had a discussion about how
>>>>>> we can
>>>>>> > protect against future issues like this, and I summarized it in
>>>>>> > https://issues.apache.org/jira/browse/BEAM-3288 . Copying here:
>>>>>> >
>>>>>> > Current Beam trigger semantics are rather confusing and in some
>>>>>> cases
>>>>>> > extremely unsafe, especially if the pipeline includes multiple
>>>>>> chained GBKs.
>>>>>> > One example of that is
>>>>>> https://issues.apache.org/jira/browse/BEAM-3169 .
>>>>>> >
>>>>>> > There's multiple issues:
>>>>>> >
>>>>>> > The API allows users to specify terminating top-level triggers (e.g.
>>>>>> > "trigger a pane after receiving 10000 elements in the window, and
>>>>>> that's
>>>>>> > it"), but experience from user support shows that this is nearly
>>>>>> always a
>>>>>> > mistake and the user did not intend to drop all further data.
>>>>>> >
>>>>>> > In general, triggers are the only place in Beam where data is being
>>>>>> dropped
>>>>>> > without making a lot of very loud noise about it - a practice for
>>>>>> which the
>>>>>> > PTransform style guide uses the language: "never, ever, ever do
>>>>>> this".
>>>>>> >
>>>>>> > Continuation triggers are still worse. For context: continuation
>>>>>> trigger is
>>>>>> > the trigger that's set on the output of a GBK and controls further
>>>>>> > aggregation of the results of this aggregation by downstream GBKs.
>>>>>> The
>>>>>> > output shouldn't just use the same trigger as the input, because
>>>>>> e.g. if the
>>>>>> > input trigger said "wait for an hour before emitting a pane", that
>>>>>> doesn't
>>>>>> > mean that we should wait for another hour before emitting a result
>>>>>> of
>>>>>> > aggregating the result of the input trigger. Continuation triggers
>>>>>> try to
>>>>>> > simulate the behavior "as if a pane of the input propagated through
>>>>>> the
>>>>>> > entire pipeline", but the implementation of individual continuation
>>>>>> triggers
>>>>>> > doesn't do that. E.g. the continuation of "first N elements in
>>>>>> pane" trigger
>>>>>> > is "first 1 element in pane", and if the results of a first GBK are
>>>>>> further
>>>>>> > grouped by a second GBK onto more coarse key (e.g. if everything is
>>>>>> grouped
>>>>>> > onto the same key), that effectively means that, of the keys of the
>>>>>> first
>>>>>> > GBK, only one survives and all others are dropped (what happened in
>>>>>> the data
>>>>>> > loss bug).
>>>>>> >
>>>>>> > The ultimate fix to all of these things is
>>>>>> > https://s.apache.org/beam-sink-triggers . However, it is a huge
>>>>>> model
>>>>>> > change, and meanwhile we have to do something. The options are, in
>>>>>> order of
>>>>>> > increasing backward incompatibility (but incompatibility in a
>>>>>> "rejecting
>>>>>> > something that previously was accepted but extremely dangerous"
>>>>>> kind of
>>>>>> > way):
>>>>>> >
>>>>>> > Make the continuation trigger of most triggers be the "always-fire"
>>>>>> trigger.
>>>>>> > Seems that this should be the case for all triggers except the
>>>>>> watermark
>>>>>> > trigger. This will definitely increase safety, but lead to more
>>>>>> eager firing
>>>>>> > of downstream aggregations. It also will violate a user's
>>>>>> expectation that a
>>>>>> > fire-once trigger fires everything downstream only once, but that
>>>>>> > expectation appears impossible to satisfy safely.
>>>>>>
>>>>>> Note that firing more often for multiply stacked triggers, especially
>>>>>> in the case of accumulating mode, easily leads to data corruption bugs
>>>>>> where elements are duplicated and/or overcounted. I suppose that bad
>>>>>> data is slightly easier to detect than missing data, but automatically
>>>>>> turning once triggers into more-than-once triggers (automatically, or
>>>>>> manually by prohibiting the former) isn't a straightforward fix.
>>>>>>
>>>>>
>>>>> I agree that the combination of accumulating mode and stacked GBKs is
>>>>> very problematic.
>>>>> I think that the way it's defined now is a semantic bug that produces
>>>>> garbage (duplicated data) with any repeated trigger, but garbage (lost
>>>>> data, per current discussion) with any non-repeated trigger as well.
>>>>>
>>>>> I think we should prohibit the combination of accumulating mode and
>>>>> stacked GBKs until we sort this out, which is again probably Sink Triggers:
>>>>> per that document, I suppose, the accumulating mode would be set only on
>>>>> the sink step, and upstream steps would get discarding mode [and downstream
>>>>> steps would have to set something explicitly].
>>>>>
>>>>> By "prohibit" I mean "GBK applied to something with accumulating mode
>>>>> should return a PCollection with an invalid trigger, where you have to
>>>>> explicitly configure a trigger before you can GBK again". Does this sound
>>>>> reasonable?
>>>>>
>>>>> If we agree on that, then I think having the continuation of all
>>>>> triggers be the "always-fire" trigger is also safe?
>>>>>
>>>>>
>>>>>>
>>>>>> > Make the continuation trigger of some triggers be the "invalid"
>>>>>> trigger,
>>>>>> > i.e. require the user to set it explicitly: there's in general no
>>>>>> good and
>>>>>> > safe way to infer what a trigger on a second GBK "truly" should be,
>>>>>> based on
>>>>>> > the trigger of the PCollection input into a first GBK. This is
>>>>>> especially
>>>>>> > true for terminating triggers.
>>>>>>
>>>>>> I think this should be on the table until we come up with good
>>>>>> semantics, which may not be until sink triggers. It's backwards
>>>>>> incompatible, but in a bug-fixing and verbose way (as opposed silently
>>>>>> changing semantics/behavior/outputs with the first option). Note that
>>>>>> sometimes for composite operations one does not have access to the
>>>>>> second grouping operation, so we must at least provide an opt-out
>>>>>> flag.
>>>>>>
>>>>> I would argue that the failure in those cases is Working As Intended:
>>>>> the pipeline is having undefined behavior and we're forcing the user to
>>>>> remove the ambiguity - either by specifying a repeated trigger (which they
>>>>> probably should have done in the first place) or by specifying the next
>>>>> trigger explicitly (which, granted, may require changing the composite
>>>>> transform, because the transform hasn't accounted for potential usage in
>>>>> this ambiguous case)
>>>>>
>>>>>
>>>>>>
>>>>>> > Prohibit top-level terminating triggers entirely. This will ensure
>>>>>> that the
>>>>>> > only data that ever gets dropped is "droppably late" data.
>>>>>>
>>>>>> The AfterWatermark.withEarlyFirings(...) is a top-level terminating
>>>>>> trigger that we should keep allowing, and has perfectly reasonable
>>>>>> continuation semantics. I suppose we could only allow it if allowed
>>>>>> lateness is 0, but that makes things even more context sensitive.
>>>>>>
>>>>>> > Do people think that these options are sensible?
>>>>>> > +Kenn Knowles +Thomas Groh +Ben Chambers is this a fair summary of
>>>>>> our
>>>>>> > discussion?
>>>>>>
>>>>>> I think this is a fair summary of the discussions we've had.
>>>>>>
>>>>>
>>>>> Do we have any sort of trigger test suite, or high-level properties
>>>>> triggers must satisfy?
>>>>>
>>>>> I'd propose the following property:
>>>>> Given a PCollection with a particular trigger, that is being passed
>>>>> through a GBK, processing through the downstream aggregations by default
>>>>> should happens without dropping or introducing any more data [unless a
>>>>> triggering strategy is explicitly set on a downstream aggregation to
>>>>> something that violates this property].
>>>>>
>>>>> For example:
>>>>> PCollection<V> pc = ...  // e.g. TestStream
>>>>>
>>>>> PCollection<V> regroup(PCollection<V> pc, int n) {
>>>>>   return pc.apply(WithKeys.of(random key in 0..n))
>>>>>
>>>>> .apply(GroupByKey.create()).apply(Values.create()).apply(Flatten.iterables());
>>>>> }
>>>>>
>>>>> Then pc.regroup(n).regroup(1) == pc.regroup(n), regardless of pc's
>>>>> triggering, windowing and accumulation mode.
>>>>>
>>>>> This property is currently definitely violated for some triggers; and
>>>>> it's definitely violated for the accumulating mode; both aspects must be
>>>>> fixed.
>>>>>
>>>>>
>>>>>> - Robert
>>>>>>
>>>>>
>>>
>

Re: Guarding against unsafe triggers at construction time

Posted by Reuven Lax <re...@google.com>.
As I've mentioned before, I think OneTriggers are generally a bad idea.

On Fri, Dec 8, 2017 at 1:05 PM, Eugene Kirpichov <ki...@google.com>
wrote:

> The property that stacking more GBKs doesn't drop more data than a first
> one already drops, nor introduce duplicate data (see previous email,
> starting with "I'd propose the following property:").
>
> On Fri, Dec 8, 2017 at 12:29 PM Reuven Lax <re...@google.com> wrote:
>
>> Which intuitive property?
>>
>> On Fri, Dec 8, 2017 at 12:11 PM, Eugene Kirpichov <ki...@google.com>
>> wrote:
>>
>>> I created a test demonstrating that our triggers violate this pretty
>>> intuitive property.
>>>
>>> https://github.com/apache/beam/pull/4239
>>>
>>> I think we should discuss this: seems like a pretty important property
>>> to me and seems pretty bad that it's violated. Again, per discussion below,
>>> some of it should be addressed with changing semantics of individual
>>> triggers, and some with prohibiting certain combinations of triggers /
>>> accumulation modes / GBKs.
>>>
>>> I tried changing the continuation trigger of some triggers to be the
>>> "always fire" trigger, however we have a lot of code assuming that it will
>>> be a OnceTrigger (e.g. that continuation of a OnceTrigger has to be a
>>> OnceTrigger, and of course that components of a OnceTrigger have to be a
>>> OnceTrigger) and I didn't yet dig into whether that division is at all
>>> important or just baked in at compile time.
>>>
>>> On Thu, Dec 7, 2017 at 5:20 PM Eugene Kirpichov <ki...@google.com>
>>> wrote:
>>>
>>>> On Mon, Dec 4, 2017 at 4:24 PM Robert Bradshaw <ro...@google.com>
>>>> wrote:
>>>>
>>>>> On Mon, Dec 4, 2017 at 3:19 PM, Eugene Kirpichov <ki...@google.com>
>>>>> wrote:
>>>>> > Hi,
>>>>> >
>>>>> > After a recent investigation of a data loss bug caused by unintuitive
>>>>> > behavior of some kinds of triggers, we had a discussion about how we
>>>>> can
>>>>> > protect against future issues like this, and I summarized it in
>>>>> > https://issues.apache.org/jira/browse/BEAM-3288 . Copying here:
>>>>> >
>>>>> > Current Beam trigger semantics are rather confusing and in some cases
>>>>> > extremely unsafe, especially if the pipeline includes multiple
>>>>> chained GBKs.
>>>>> > One example of that is https://issues.apache.org/
>>>>> jira/browse/BEAM-3169 .
>>>>> >
>>>>> > There's multiple issues:
>>>>> >
>>>>> > The API allows users to specify terminating top-level triggers (e.g.
>>>>> > "trigger a pane after receiving 10000 elements in the window, and
>>>>> that's
>>>>> > it"), but experience from user support shows that this is nearly
>>>>> always a
>>>>> > mistake and the user did not intend to drop all further data.
>>>>> >
>>>>> > In general, triggers are the only place in Beam where data is being
>>>>> dropped
>>>>> > without making a lot of very loud noise about it - a practice for
>>>>> which the
>>>>> > PTransform style guide uses the language: "never, ever, ever do
>>>>> this".
>>>>> >
>>>>> > Continuation triggers are still worse. For context: continuation
>>>>> trigger is
>>>>> > the trigger that's set on the output of a GBK and controls further
>>>>> > aggregation of the results of this aggregation by downstream GBKs.
>>>>> The
>>>>> > output shouldn't just use the same trigger as the input, because
>>>>> e.g. if the
>>>>> > input trigger said "wait for an hour before emitting a pane", that
>>>>> doesn't
>>>>> > mean that we should wait for another hour before emitting a result of
>>>>> > aggregating the result of the input trigger. Continuation triggers
>>>>> try to
>>>>> > simulate the behavior "as if a pane of the input propagated through
>>>>> the
>>>>> > entire pipeline", but the implementation of individual continuation
>>>>> triggers
>>>>> > doesn't do that. E.g. the continuation of "first N elements in pane"
>>>>> trigger
>>>>> > is "first 1 element in pane", and if the results of a first GBK are
>>>>> further
>>>>> > grouped by a second GBK onto more coarse key (e.g. if everything is
>>>>> grouped
>>>>> > onto the same key), that effectively means that, of the keys of the
>>>>> first
>>>>> > GBK, only one survives and all others are dropped (what happened in
>>>>> the data
>>>>> > loss bug).
>>>>> >
>>>>> > The ultimate fix to all of these things is
>>>>> > https://s.apache.org/beam-sink-triggers . However, it is a huge
>>>>> model
>>>>> > change, and meanwhile we have to do something. The options are, in
>>>>> order of
>>>>> > increasing backward incompatibility (but incompatibility in a
>>>>> "rejecting
>>>>> > something that previously was accepted but extremely dangerous" kind
>>>>> of
>>>>> > way):
>>>>> >
>>>>> > Make the continuation trigger of most triggers be the "always-fire"
>>>>> trigger.
>>>>> > Seems that this should be the case for all triggers except the
>>>>> watermark
>>>>> > trigger. This will definitely increase safety, but lead to more
>>>>> eager firing
>>>>> > of downstream aggregations. It also will violate a user's
>>>>> expectation that a
>>>>> > fire-once trigger fires everything downstream only once, but that
>>>>> > expectation appears impossible to satisfy safely.
>>>>>
>>>>> Note that firing more often for multiply stacked triggers, especially
>>>>> in the case of accumulating mode, easily leads to data corruption bugs
>>>>> where elements are duplicated and/or overcounted. I suppose that bad
>>>>> data is slightly easier to detect than missing data, but automatically
>>>>> turning once triggers into more-than-once triggers (automatically, or
>>>>> manually by prohibiting the former) isn't a straightforward fix.
>>>>>
>>>>
>>>> I agree that the combination of accumulating mode and stacked GBKs is
>>>> very problematic.
>>>> I think that the way it's defined now is a semantic bug that produces
>>>> garbage (duplicated data) with any repeated trigger, but garbage (lost
>>>> data, per current discussion) with any non-repeated trigger as well.
>>>>
>>>> I think we should prohibit the combination of accumulating mode and
>>>> stacked GBKs until we sort this out, which is again probably Sink Triggers:
>>>> per that document, I suppose, the accumulating mode would be set only on
>>>> the sink step, and upstream steps would get discarding mode [and downstream
>>>> steps would have to set something explicitly].
>>>>
>>>> By "prohibit" I mean "GBK applied to something with accumulating mode
>>>> should return a PCollection with an invalid trigger, where you have to
>>>> explicitly configure a trigger before you can GBK again". Does this sound
>>>> reasonable?
>>>>
>>>> If we agree on that, then I think having the continuation of all
>>>> triggers be the "always-fire" trigger is also safe?
>>>>
>>>>
>>>>>
>>>>> > Make the continuation trigger of some triggers be the "invalid"
>>>>> trigger,
>>>>> > i.e. require the user to set it explicitly: there's in general no
>>>>> good and
>>>>> > safe way to infer what a trigger on a second GBK "truly" should be,
>>>>> based on
>>>>> > the trigger of the PCollection input into a first GBK. This is
>>>>> especially
>>>>> > true for terminating triggers.
>>>>>
>>>>> I think this should be on the table until we come up with good
>>>>> semantics, which may not be until sink triggers. It's backwards
>>>>> incompatible, but in a bug-fixing and verbose way (as opposed silently
>>>>> changing semantics/behavior/outputs with the first option). Note that
>>>>> sometimes for composite operations one does not have access to the
>>>>> second grouping operation, so we must at least provide an opt-out
>>>>> flag.
>>>>>
>>>> I would argue that the failure in those cases is Working As Intended:
>>>> the pipeline is having undefined behavior and we're forcing the user to
>>>> remove the ambiguity - either by specifying a repeated trigger (which they
>>>> probably should have done in the first place) or by specifying the next
>>>> trigger explicitly (which, granted, may require changing the composite
>>>> transform, because the transform hasn't accounted for potential usage in
>>>> this ambiguous case)
>>>>
>>>>
>>>>>
>>>>> > Prohibit top-level terminating triggers entirely. This will ensure
>>>>> that the
>>>>> > only data that ever gets dropped is "droppably late" data.
>>>>>
>>>>> The AfterWatermark.withEarlyFirings(...) is a top-level terminating
>>>>> trigger that we should keep allowing, and has perfectly reasonable
>>>>> continuation semantics. I suppose we could only allow it if allowed
>>>>> lateness is 0, but that makes things even more context sensitive.
>>>>>
>>>>> > Do people think that these options are sensible?
>>>>> > +Kenn Knowles +Thomas Groh +Ben Chambers is this a fair summary of
>>>>> our
>>>>> > discussion?
>>>>>
>>>>> I think this is a fair summary of the discussions we've had.
>>>>>
>>>>
>>>> Do we have any sort of trigger test suite, or high-level properties
>>>> triggers must satisfy?
>>>>
>>>> I'd propose the following property:
>>>> Given a PCollection with a particular trigger, that is being passed
>>>> through a GBK, processing through the downstream aggregations by default
>>>> should happens without dropping or introducing any more data [unless a
>>>> triggering strategy is explicitly set on a downstream aggregation to
>>>> something that violates this property].
>>>>
>>>> For example:
>>>> PCollection<V> pc = ...  // e.g. TestStream
>>>>
>>>> PCollection<V> regroup(PCollection<V> pc, int n) {
>>>>   return pc.apply(WithKeys.of(random key in 0..n))
>>>>       .apply(GroupByKey.create()).apply(Values.create()).apply(
>>>> Flatten.iterables());
>>>> }
>>>>
>>>> Then pc.regroup(n).regroup(1) == pc.regroup(n), regardless of pc's
>>>> triggering, windowing and accumulation mode.
>>>>
>>>> This property is currently definitely violated for some triggers; and
>>>> it's definitely violated for the accumulating mode; both aspects must be
>>>> fixed.
>>>>
>>>>
>>>>> - Robert
>>>>>
>>>>
>>

Re: Guarding against unsafe triggers at construction time

Posted by Eugene Kirpichov <ki...@google.com>.
The property that stacking more GBKs doesn't drop more data than a first
one already drops, nor introduce duplicate data (see previous email,
starting with "I'd propose the following property:").

On Fri, Dec 8, 2017 at 12:29 PM Reuven Lax <re...@google.com> wrote:

> Which intuitive property?
>
> On Fri, Dec 8, 2017 at 12:11 PM, Eugene Kirpichov <ki...@google.com>
> wrote:
>
>> I created a test demonstrating that our triggers violate this pretty
>> intuitive property.
>>
>> https://github.com/apache/beam/pull/4239
>>
>> I think we should discuss this: seems like a pretty important property to
>> me and seems pretty bad that it's violated. Again, per discussion below,
>> some of it should be addressed with changing semantics of individual
>> triggers, and some with prohibiting certain combinations of triggers /
>> accumulation modes / GBKs.
>>
>> I tried changing the continuation trigger of some triggers to be the
>> "always fire" trigger, however we have a lot of code assuming that it will
>> be a OnceTrigger (e.g. that continuation of a OnceTrigger has to be a
>> OnceTrigger, and of course that components of a OnceTrigger have to be a
>> OnceTrigger) and I didn't yet dig into whether that division is at all
>> important or just baked in at compile time.
>>
>> On Thu, Dec 7, 2017 at 5:20 PM Eugene Kirpichov <ki...@google.com>
>> wrote:
>>
>>> On Mon, Dec 4, 2017 at 4:24 PM Robert Bradshaw <ro...@google.com>
>>> wrote:
>>>
>>>> On Mon, Dec 4, 2017 at 3:19 PM, Eugene Kirpichov <ki...@google.com>
>>>> wrote:
>>>> > Hi,
>>>> >
>>>> > After a recent investigation of a data loss bug caused by unintuitive
>>>> > behavior of some kinds of triggers, we had a discussion about how we
>>>> can
>>>> > protect against future issues like this, and I summarized it in
>>>> > https://issues.apache.org/jira/browse/BEAM-3288 . Copying here:
>>>> >
>>>> > Current Beam trigger semantics are rather confusing and in some cases
>>>> > extremely unsafe, especially if the pipeline includes multiple
>>>> chained GBKs.
>>>> > One example of that is
>>>> https://issues.apache.org/jira/browse/BEAM-3169 .
>>>> >
>>>> > There's multiple issues:
>>>> >
>>>> > The API allows users to specify terminating top-level triggers (e.g.
>>>> > "trigger a pane after receiving 10000 elements in the window, and
>>>> that's
>>>> > it"), but experience from user support shows that this is nearly
>>>> always a
>>>> > mistake and the user did not intend to drop all further data.
>>>> >
>>>> > In general, triggers are the only place in Beam where data is being
>>>> dropped
>>>> > without making a lot of very loud noise about it - a practice for
>>>> which the
>>>> > PTransform style guide uses the language: "never, ever, ever do this".
>>>> >
>>>> > Continuation triggers are still worse. For context: continuation
>>>> trigger is
>>>> > the trigger that's set on the output of a GBK and controls further
>>>> > aggregation of the results of this aggregation by downstream GBKs. The
>>>> > output shouldn't just use the same trigger as the input, because e.g.
>>>> if the
>>>> > input trigger said "wait for an hour before emitting a pane", that
>>>> doesn't
>>>> > mean that we should wait for another hour before emitting a result of
>>>> > aggregating the result of the input trigger. Continuation triggers
>>>> try to
>>>> > simulate the behavior "as if a pane of the input propagated through
>>>> the
>>>> > entire pipeline", but the implementation of individual continuation
>>>> triggers
>>>> > doesn't do that. E.g. the continuation of "first N elements in pane"
>>>> trigger
>>>> > is "first 1 element in pane", and if the results of a first GBK are
>>>> further
>>>> > grouped by a second GBK onto more coarse key (e.g. if everything is
>>>> grouped
>>>> > onto the same key), that effectively means that, of the keys of the
>>>> first
>>>> > GBK, only one survives and all others are dropped (what happened in
>>>> the data
>>>> > loss bug).
>>>> >
>>>> > The ultimate fix to all of these things is
>>>> > https://s.apache.org/beam-sink-triggers . However, it is a huge model
>>>> > change, and meanwhile we have to do something. The options are, in
>>>> order of
>>>> > increasing backward incompatibility (but incompatibility in a
>>>> "rejecting
>>>> > something that previously was accepted but extremely dangerous" kind
>>>> of
>>>> > way):
>>>> >
>>>> > Make the continuation trigger of most triggers be the "always-fire"
>>>> trigger.
>>>> > Seems that this should be the case for all triggers except the
>>>> watermark
>>>> > trigger. This will definitely increase safety, but lead to more eager
>>>> firing
>>>> > of downstream aggregations. It also will violate a user's expectation
>>>> that a
>>>> > fire-once trigger fires everything downstream only once, but that
>>>> > expectation appears impossible to satisfy safely.
>>>>
>>>> Note that firing more often for multiply stacked triggers, especially
>>>> in the case of accumulating mode, easily leads to data corruption bugs
>>>> where elements are duplicated and/or overcounted. I suppose that bad
>>>> data is slightly easier to detect than missing data, but automatically
>>>> turning once triggers into more-than-once triggers (automatically, or
>>>> manually by prohibiting the former) isn't a straightforward fix.
>>>>
>>>
>>> I agree that the combination of accumulating mode and stacked GBKs is
>>> very problematic.
>>> I think that the way it's defined now is a semantic bug that produces
>>> garbage (duplicated data) with any repeated trigger, but garbage (lost
>>> data, per current discussion) with any non-repeated trigger as well.
>>>
>>> I think we should prohibit the combination of accumulating mode and
>>> stacked GBKs until we sort this out, which is again probably Sink Triggers:
>>> per that document, I suppose, the accumulating mode would be set only on
>>> the sink step, and upstream steps would get discarding mode [and downstream
>>> steps would have to set something explicitly].
>>>
>>> By "prohibit" I mean "GBK applied to something with accumulating mode
>>> should return a PCollection with an invalid trigger, where you have to
>>> explicitly configure a trigger before you can GBK again". Does this sound
>>> reasonable?
>>>
>>> If we agree on that, then I think having the continuation of all
>>> triggers be the "always-fire" trigger is also safe?
>>>
>>>
>>>>
>>>> > Make the continuation trigger of some triggers be the "invalid"
>>>> trigger,
>>>> > i.e. require the user to set it explicitly: there's in general no
>>>> good and
>>>> > safe way to infer what a trigger on a second GBK "truly" should be,
>>>> based on
>>>> > the trigger of the PCollection input into a first GBK. This is
>>>> especially
>>>> > true for terminating triggers.
>>>>
>>>> I think this should be on the table until we come up with good
>>>> semantics, which may not be until sink triggers. It's backwards
>>>> incompatible, but in a bug-fixing and verbose way (as opposed silently
>>>> changing semantics/behavior/outputs with the first option). Note that
>>>> sometimes for composite operations one does not have access to the
>>>> second grouping operation, so we must at least provide an opt-out
>>>> flag.
>>>>
>>> I would argue that the failure in those cases is Working As Intended:
>>> the pipeline is having undefined behavior and we're forcing the user to
>>> remove the ambiguity - either by specifying a repeated trigger (which they
>>> probably should have done in the first place) or by specifying the next
>>> trigger explicitly (which, granted, may require changing the composite
>>> transform, because the transform hasn't accounted for potential usage in
>>> this ambiguous case)
>>>
>>>
>>>>
>>>> > Prohibit top-level terminating triggers entirely. This will ensure
>>>> that the
>>>> > only data that ever gets dropped is "droppably late" data.
>>>>
>>>> The AfterWatermark.withEarlyFirings(...) is a top-level terminating
>>>> trigger that we should keep allowing, and has perfectly reasonable
>>>> continuation semantics. I suppose we could only allow it if allowed
>>>> lateness is 0, but that makes things even more context sensitive.
>>>>
>>>> > Do people think that these options are sensible?
>>>> > +Kenn Knowles +Thomas Groh +Ben Chambers is this a fair summary of our
>>>> > discussion?
>>>>
>>>> I think this is a fair summary of the discussions we've had.
>>>>
>>>
>>> Do we have any sort of trigger test suite, or high-level properties
>>> triggers must satisfy?
>>>
>>> I'd propose the following property:
>>> Given a PCollection with a particular trigger, that is being passed
>>> through a GBK, processing through the downstream aggregations by default
>>> should happens without dropping or introducing any more data [unless a
>>> triggering strategy is explicitly set on a downstream aggregation to
>>> something that violates this property].
>>>
>>> For example:
>>> PCollection<V> pc = ...  // e.g. TestStream
>>>
>>> PCollection<V> regroup(PCollection<V> pc, int n) {
>>>   return pc.apply(WithKeys.of(random key in 0..n))
>>>
>>> .apply(GroupByKey.create()).apply(Values.create()).apply(Flatten.iterables());
>>> }
>>>
>>> Then pc.regroup(n).regroup(1) == pc.regroup(n), regardless of pc's
>>> triggering, windowing and accumulation mode.
>>>
>>> This property is currently definitely violated for some triggers; and
>>> it's definitely violated for the accumulating mode; both aspects must be
>>> fixed.
>>>
>>>
>>>> - Robert
>>>>
>>>
>

Re: Guarding against unsafe triggers at construction time

Posted by Reuven Lax <re...@google.com>.
Which intuitive property?

On Fri, Dec 8, 2017 at 12:11 PM, Eugene Kirpichov <ki...@google.com>
wrote:

> I created a test demonstrating that our triggers violate this pretty
> intuitive property.
>
> https://github.com/apache/beam/pull/4239
>
> I think we should discuss this: seems like a pretty important property to
> me and seems pretty bad that it's violated. Again, per discussion below,
> some of it should be addressed with changing semantics of individual
> triggers, and some with prohibiting certain combinations of triggers /
> accumulation modes / GBKs.
>
> I tried changing the continuation trigger of some triggers to be the
> "always fire" trigger, however we have a lot of code assuming that it will
> be a OnceTrigger (e.g. that continuation of a OnceTrigger has to be a
> OnceTrigger, and of course that components of a OnceTrigger have to be a
> OnceTrigger) and I didn't yet dig into whether that division is at all
> important or just baked in at compile time.
>
> On Thu, Dec 7, 2017 at 5:20 PM Eugene Kirpichov <ki...@google.com>
> wrote:
>
>> On Mon, Dec 4, 2017 at 4:24 PM Robert Bradshaw <ro...@google.com>
>> wrote:
>>
>>> On Mon, Dec 4, 2017 at 3:19 PM, Eugene Kirpichov <ki...@google.com>
>>> wrote:
>>> > Hi,
>>> >
>>> > After a recent investigation of a data loss bug caused by unintuitive
>>> > behavior of some kinds of triggers, we had a discussion about how we
>>> can
>>> > protect against future issues like this, and I summarized it in
>>> > https://issues.apache.org/jira/browse/BEAM-3288 . Copying here:
>>> >
>>> > Current Beam trigger semantics are rather confusing and in some cases
>>> > extremely unsafe, especially if the pipeline includes multiple chained
>>> GBKs.
>>> > One example of that is https://issues.apache.org/jira/browse/BEAM-3169
>>> .
>>> >
>>> > There's multiple issues:
>>> >
>>> > The API allows users to specify terminating top-level triggers (e.g.
>>> > "trigger a pane after receiving 10000 elements in the window, and
>>> that's
>>> > it"), but experience from user support shows that this is nearly
>>> always a
>>> > mistake and the user did not intend to drop all further data.
>>> >
>>> > In general, triggers are the only place in Beam where data is being
>>> dropped
>>> > without making a lot of very loud noise about it - a practice for
>>> which the
>>> > PTransform style guide uses the language: "never, ever, ever do this".
>>> >
>>> > Continuation triggers are still worse. For context: continuation
>>> trigger is
>>> > the trigger that's set on the output of a GBK and controls further
>>> > aggregation of the results of this aggregation by downstream GBKs. The
>>> > output shouldn't just use the same trigger as the input, because e.g.
>>> if the
>>> > input trigger said "wait for an hour before emitting a pane", that
>>> doesn't
>>> > mean that we should wait for another hour before emitting a result of
>>> > aggregating the result of the input trigger. Continuation triggers try
>>> to
>>> > simulate the behavior "as if a pane of the input propagated through the
>>> > entire pipeline", but the implementation of individual continuation
>>> triggers
>>> > doesn't do that. E.g. the continuation of "first N elements in pane"
>>> trigger
>>> > is "first 1 element in pane", and if the results of a first GBK are
>>> further
>>> > grouped by a second GBK onto more coarse key (e.g. if everything is
>>> grouped
>>> > onto the same key), that effectively means that, of the keys of the
>>> first
>>> > GBK, only one survives and all others are dropped (what happened in
>>> the data
>>> > loss bug).
>>> >
>>> > The ultimate fix to all of these things is
>>> > https://s.apache.org/beam-sink-triggers . However, it is a huge model
>>> > change, and meanwhile we have to do something. The options are, in
>>> order of
>>> > increasing backward incompatibility (but incompatibility in a
>>> "rejecting
>>> > something that previously was accepted but extremely dangerous" kind of
>>> > way):
>>> >
>>> > Make the continuation trigger of most triggers be the "always-fire"
>>> trigger.
>>> > Seems that this should be the case for all triggers except the
>>> watermark
>>> > trigger. This will definitely increase safety, but lead to more eager
>>> firing
>>> > of downstream aggregations. It also will violate a user's expectation
>>> that a
>>> > fire-once trigger fires everything downstream only once, but that
>>> > expectation appears impossible to satisfy safely.
>>>
>>> Note that firing more often for multiply stacked triggers, especially
>>> in the case of accumulating mode, easily leads to data corruption bugs
>>> where elements are duplicated and/or overcounted. I suppose that bad
>>> data is slightly easier to detect than missing data, but automatically
>>> turning once triggers into more-than-once triggers (automatically, or
>>> manually by prohibiting the former) isn't a straightforward fix.
>>>
>>
>> I agree that the combination of accumulating mode and stacked GBKs is
>> very problematic.
>> I think that the way it's defined now is a semantic bug that produces
>> garbage (duplicated data) with any repeated trigger, but garbage (lost
>> data, per current discussion) with any non-repeated trigger as well.
>>
>> I think we should prohibit the combination of accumulating mode and
>> stacked GBKs until we sort this out, which is again probably Sink Triggers:
>> per that document, I suppose, the accumulating mode would be set only on
>> the sink step, and upstream steps would get discarding mode [and downstream
>> steps would have to set something explicitly].
>>
>> By "prohibit" I mean "GBK applied to something with accumulating mode
>> should return a PCollection with an invalid trigger, where you have to
>> explicitly configure a trigger before you can GBK again". Does this sound
>> reasonable?
>>
>> If we agree on that, then I think having the continuation of all triggers
>> be the "always-fire" trigger is also safe?
>>
>>
>>>
>>> > Make the continuation trigger of some triggers be the "invalid"
>>> trigger,
>>> > i.e. require the user to set it explicitly: there's in general no good
>>> and
>>> > safe way to infer what a trigger on a second GBK "truly" should be,
>>> based on
>>> > the trigger of the PCollection input into a first GBK. This is
>>> especially
>>> > true for terminating triggers.
>>>
>>> I think this should be on the table until we come up with good
>>> semantics, which may not be until sink triggers. It's backwards
>>> incompatible, but in a bug-fixing and verbose way (as opposed silently
>>> changing semantics/behavior/outputs with the first option). Note that
>>> sometimes for composite operations one does not have access to the
>>> second grouping operation, so we must at least provide an opt-out
>>> flag.
>>>
>> I would argue that the failure in those cases is Working As Intended: the
>> pipeline is having undefined behavior and we're forcing the user to remove
>> the ambiguity - either by specifying a repeated trigger (which they
>> probably should have done in the first place) or by specifying the next
>> trigger explicitly (which, granted, may require changing the composite
>> transform, because the transform hasn't accounted for potential usage in
>> this ambiguous case)
>>
>>
>>>
>>> > Prohibit top-level terminating triggers entirely. This will ensure
>>> that the
>>> > only data that ever gets dropped is "droppably late" data.
>>>
>>> The AfterWatermark.withEarlyFirings(...) is a top-level terminating
>>> trigger that we should keep allowing, and has perfectly reasonable
>>> continuation semantics. I suppose we could only allow it if allowed
>>> lateness is 0, but that makes things even more context sensitive.
>>>
>>> > Do people think that these options are sensible?
>>> > +Kenn Knowles +Thomas Groh +Ben Chambers is this a fair summary of our
>>> > discussion?
>>>
>>> I think this is a fair summary of the discussions we've had.
>>>
>>
>> Do we have any sort of trigger test suite, or high-level properties
>> triggers must satisfy?
>>
>> I'd propose the following property:
>> Given a PCollection with a particular trigger, that is being passed
>> through a GBK, processing through the downstream aggregations by default
>> should happens without dropping or introducing any more data [unless a
>> triggering strategy is explicitly set on a downstream aggregation to
>> something that violates this property].
>>
>> For example:
>> PCollection<V> pc = ...  // e.g. TestStream
>>
>> PCollection<V> regroup(PCollection<V> pc, int n) {
>>   return pc.apply(WithKeys.of(random key in 0..n))
>>       .apply(GroupByKey.create()).apply(Values.create()).apply(
>> Flatten.iterables());
>> }
>>
>> Then pc.regroup(n).regroup(1) == pc.regroup(n), regardless of pc's
>> triggering, windowing and accumulation mode.
>>
>> This property is currently definitely violated for some triggers; and
>> it's definitely violated for the accumulating mode; both aspects must be
>> fixed.
>>
>>
>>> - Robert
>>>
>>

Re: Guarding against unsafe triggers at construction time

Posted by Eugene Kirpichov <ki...@google.com>.
I created a test demonstrating that our triggers violate this pretty
intuitive property.

https://github.com/apache/beam/pull/4239

I think we should discuss this: seems like a pretty important property to
me and seems pretty bad that it's violated. Again, per discussion below,
some of it should be addressed with changing semantics of individual
triggers, and some with prohibiting certain combinations of triggers /
accumulation modes / GBKs.

I tried changing the continuation trigger of some triggers to be the
"always fire" trigger, however we have a lot of code assuming that it will
be a OnceTrigger (e.g. that continuation of a OnceTrigger has to be a
OnceTrigger, and of course that components of a OnceTrigger have to be a
OnceTrigger) and I didn't yet dig into whether that division is at all
important or just baked in at compile time.

On Thu, Dec 7, 2017 at 5:20 PM Eugene Kirpichov <ki...@google.com>
wrote:

> On Mon, Dec 4, 2017 at 4:24 PM Robert Bradshaw <ro...@google.com>
> wrote:
>
>> On Mon, Dec 4, 2017 at 3:19 PM, Eugene Kirpichov <ki...@google.com>
>> wrote:
>> > Hi,
>> >
>> > After a recent investigation of a data loss bug caused by unintuitive
>> > behavior of some kinds of triggers, we had a discussion about how we can
>> > protect against future issues like this, and I summarized it in
>> > https://issues.apache.org/jira/browse/BEAM-3288 . Copying here:
>> >
>> > Current Beam trigger semantics are rather confusing and in some cases
>> > extremely unsafe, especially if the pipeline includes multiple chained
>> GBKs.
>> > One example of that is https://issues.apache.org/jira/browse/BEAM-3169
>> .
>> >
>> > There's multiple issues:
>> >
>> > The API allows users to specify terminating top-level triggers (e.g.
>> > "trigger a pane after receiving 10000 elements in the window, and that's
>> > it"), but experience from user support shows that this is nearly always
>> a
>> > mistake and the user did not intend to drop all further data.
>> >
>> > In general, triggers are the only place in Beam where data is being
>> dropped
>> > without making a lot of very loud noise about it - a practice for which
>> the
>> > PTransform style guide uses the language: "never, ever, ever do this".
>> >
>> > Continuation triggers are still worse. For context: continuation
>> trigger is
>> > the trigger that's set on the output of a GBK and controls further
>> > aggregation of the results of this aggregation by downstream GBKs. The
>> > output shouldn't just use the same trigger as the input, because e.g.
>> if the
>> > input trigger said "wait for an hour before emitting a pane", that
>> doesn't
>> > mean that we should wait for another hour before emitting a result of
>> > aggregating the result of the input trigger. Continuation triggers try
>> to
>> > simulate the behavior "as if a pane of the input propagated through the
>> > entire pipeline", but the implementation of individual continuation
>> triggers
>> > doesn't do that. E.g. the continuation of "first N elements in pane"
>> trigger
>> > is "first 1 element in pane", and if the results of a first GBK are
>> further
>> > grouped by a second GBK onto more coarse key (e.g. if everything is
>> grouped
>> > onto the same key), that effectively means that, of the keys of the
>> first
>> > GBK, only one survives and all others are dropped (what happened in the
>> data
>> > loss bug).
>> >
>> > The ultimate fix to all of these things is
>> > https://s.apache.org/beam-sink-triggers . However, it is a huge model
>> > change, and meanwhile we have to do something. The options are, in
>> order of
>> > increasing backward incompatibility (but incompatibility in a "rejecting
>> > something that previously was accepted but extremely dangerous" kind of
>> > way):
>> >
>> > Make the continuation trigger of most triggers be the "always-fire"
>> trigger.
>> > Seems that this should be the case for all triggers except the watermark
>> > trigger. This will definitely increase safety, but lead to more eager
>> firing
>> > of downstream aggregations. It also will violate a user's expectation
>> that a
>> > fire-once trigger fires everything downstream only once, but that
>> > expectation appears impossible to satisfy safely.
>>
>> Note that firing more often for multiply stacked triggers, especially
>> in the case of accumulating mode, easily leads to data corruption bugs
>> where elements are duplicated and/or overcounted. I suppose that bad
>> data is slightly easier to detect than missing data, but automatically
>> turning once triggers into more-than-once triggers (automatically, or
>> manually by prohibiting the former) isn't a straightforward fix.
>>
>
> I agree that the combination of accumulating mode and stacked GBKs is very
> problematic.
> I think that the way it's defined now is a semantic bug that produces
> garbage (duplicated data) with any repeated trigger, but garbage (lost
> data, per current discussion) with any non-repeated trigger as well.
>
> I think we should prohibit the combination of accumulating mode and
> stacked GBKs until we sort this out, which is again probably Sink Triggers:
> per that document, I suppose, the accumulating mode would be set only on
> the sink step, and upstream steps would get discarding mode [and downstream
> steps would have to set something explicitly].
>
> By "prohibit" I mean "GBK applied to something with accumulating mode
> should return a PCollection with an invalid trigger, where you have to
> explicitly configure a trigger before you can GBK again". Does this sound
> reasonable?
>
> If we agree on that, then I think having the continuation of all triggers
> be the "always-fire" trigger is also safe?
>
>
>>
>> > Make the continuation trigger of some triggers be the "invalid" trigger,
>> > i.e. require the user to set it explicitly: there's in general no good
>> and
>> > safe way to infer what a trigger on a second GBK "truly" should be,
>> based on
>> > the trigger of the PCollection input into a first GBK. This is
>> especially
>> > true for terminating triggers.
>>
>> I think this should be on the table until we come up with good
>> semantics, which may not be until sink triggers. It's backwards
>> incompatible, but in a bug-fixing and verbose way (as opposed silently
>> changing semantics/behavior/outputs with the first option). Note that
>> sometimes for composite operations one does not have access to the
>> second grouping operation, so we must at least provide an opt-out
>> flag.
>>
> I would argue that the failure in those cases is Working As Intended: the
> pipeline is having undefined behavior and we're forcing the user to remove
> the ambiguity - either by specifying a repeated trigger (which they
> probably should have done in the first place) or by specifying the next
> trigger explicitly (which, granted, may require changing the composite
> transform, because the transform hasn't accounted for potential usage in
> this ambiguous case)
>
>
>>
>> > Prohibit top-level terminating triggers entirely. This will ensure that
>> the
>> > only data that ever gets dropped is "droppably late" data.
>>
>> The AfterWatermark.withEarlyFirings(...) is a top-level terminating
>> trigger that we should keep allowing, and has perfectly reasonable
>> continuation semantics. I suppose we could only allow it if allowed
>> lateness is 0, but that makes things even more context sensitive.
>>
>> > Do people think that these options are sensible?
>> > +Kenn Knowles +Thomas Groh +Ben Chambers is this a fair summary of our
>> > discussion?
>>
>> I think this is a fair summary of the discussions we've had.
>>
>
> Do we have any sort of trigger test suite, or high-level properties
> triggers must satisfy?
>
> I'd propose the following property:
> Given a PCollection with a particular trigger, that is being passed
> through a GBK, processing through the downstream aggregations by default
> should happens without dropping or introducing any more data [unless a
> triggering strategy is explicitly set on a downstream aggregation to
> something that violates this property].
>
> For example:
> PCollection<V> pc = ...  // e.g. TestStream
>
> PCollection<V> regroup(PCollection<V> pc, int n) {
>   return pc.apply(WithKeys.of(random key in 0..n))
>
> .apply(GroupByKey.create()).apply(Values.create()).apply(Flatten.iterables());
> }
>
> Then pc.regroup(n).regroup(1) == pc.regroup(n), regardless of pc's
> triggering, windowing and accumulation mode.
>
> This property is currently definitely violated for some triggers; and it's
> definitely violated for the accumulating mode; both aspects must be fixed.
>
>
>> - Robert
>>
>

Re: Guarding against unsafe triggers at construction time

Posted by Eugene Kirpichov <ki...@google.com>.
On Mon, Dec 4, 2017 at 4:24 PM Robert Bradshaw <ro...@google.com> wrote:

> On Mon, Dec 4, 2017 at 3:19 PM, Eugene Kirpichov <ki...@google.com>
> wrote:
> > Hi,
> >
> > After a recent investigation of a data loss bug caused by unintuitive
> > behavior of some kinds of triggers, we had a discussion about how we can
> > protect against future issues like this, and I summarized it in
> > https://issues.apache.org/jira/browse/BEAM-3288 . Copying here:
> >
> > Current Beam trigger semantics are rather confusing and in some cases
> > extremely unsafe, especially if the pipeline includes multiple chained
> GBKs.
> > One example of that is https://issues.apache.org/jira/browse/BEAM-3169 .
> >
> > There's multiple issues:
> >
> > The API allows users to specify terminating top-level triggers (e.g.
> > "trigger a pane after receiving 10000 elements in the window, and that's
> > it"), but experience from user support shows that this is nearly always a
> > mistake and the user did not intend to drop all further data.
> >
> > In general, triggers are the only place in Beam where data is being
> dropped
> > without making a lot of very loud noise about it - a practice for which
> the
> > PTransform style guide uses the language: "never, ever, ever do this".
> >
> > Continuation triggers are still worse. For context: continuation trigger
> is
> > the trigger that's set on the output of a GBK and controls further
> > aggregation of the results of this aggregation by downstream GBKs. The
> > output shouldn't just use the same trigger as the input, because e.g. if
> the
> > input trigger said "wait for an hour before emitting a pane", that
> doesn't
> > mean that we should wait for another hour before emitting a result of
> > aggregating the result of the input trigger. Continuation triggers try to
> > simulate the behavior "as if a pane of the input propagated through the
> > entire pipeline", but the implementation of individual continuation
> triggers
> > doesn't do that. E.g. the continuation of "first N elements in pane"
> trigger
> > is "first 1 element in pane", and if the results of a first GBK are
> further
> > grouped by a second GBK onto more coarse key (e.g. if everything is
> grouped
> > onto the same key), that effectively means that, of the keys of the first
> > GBK, only one survives and all others are dropped (what happened in the
> data
> > loss bug).
> >
> > The ultimate fix to all of these things is
> > https://s.apache.org/beam-sink-triggers . However, it is a huge model
> > change, and meanwhile we have to do something. The options are, in order
> of
> > increasing backward incompatibility (but incompatibility in a "rejecting
> > something that previously was accepted but extremely dangerous" kind of
> > way):
> >
> > Make the continuation trigger of most triggers be the "always-fire"
> trigger.
> > Seems that this should be the case for all triggers except the watermark
> > trigger. This will definitely increase safety, but lead to more eager
> firing
> > of downstream aggregations. It also will violate a user's expectation
> that a
> > fire-once trigger fires everything downstream only once, but that
> > expectation appears impossible to satisfy safely.
>
> Note that firing more often for multiply stacked triggers, especially
> in the case of accumulating mode, easily leads to data corruption bugs
> where elements are duplicated and/or overcounted. I suppose that bad
> data is slightly easier to detect than missing data, but automatically
> turning once triggers into more-than-once triggers (automatically, or
> manually by prohibiting the former) isn't a straightforward fix.
>

I agree that the combination of accumulating mode and stacked GBKs is very
problematic.
I think that the way it's defined now is a semantic bug that produces
garbage (duplicated data) with any repeated trigger, but garbage (lost
data, per current discussion) with any non-repeated trigger as well.

I think we should prohibit the combination of accumulating mode and stacked
GBKs until we sort this out, which is again probably Sink Triggers: per
that document, I suppose, the accumulating mode would be set only on the
sink step, and upstream steps would get discarding mode [and downstream
steps would have to set something explicitly].

By "prohibit" I mean "GBK applied to something with accumulating mode
should return a PCollection with an invalid trigger, where you have to
explicitly configure a trigger before you can GBK again". Does this sound
reasonable?

If we agree on that, then I think having the continuation of all triggers
be the "always-fire" trigger is also safe?


>
> > Make the continuation trigger of some triggers be the "invalid" trigger,
> > i.e. require the user to set it explicitly: there's in general no good
> and
> > safe way to infer what a trigger on a second GBK "truly" should be,
> based on
> > the trigger of the PCollection input into a first GBK. This is especially
> > true for terminating triggers.
>
> I think this should be on the table until we come up with good
> semantics, which may not be until sink triggers. It's backwards
> incompatible, but in a bug-fixing and verbose way (as opposed silently
> changing semantics/behavior/outputs with the first option). Note that
> sometimes for composite operations one does not have access to the
> second grouping operation, so we must at least provide an opt-out
> flag.
>
I would argue that the failure in those cases is Working As Intended: the
pipeline is having undefined behavior and we're forcing the user to remove
the ambiguity - either by specifying a repeated trigger (which they
probably should have done in the first place) or by specifying the next
trigger explicitly (which, granted, may require changing the composite
transform, because the transform hasn't accounted for potential usage in
this ambiguous case)


>
> > Prohibit top-level terminating triggers entirely. This will ensure that
> the
> > only data that ever gets dropped is "droppably late" data.
>
> The AfterWatermark.withEarlyFirings(...) is a top-level terminating
> trigger that we should keep allowing, and has perfectly reasonable
> continuation semantics. I suppose we could only allow it if allowed
> lateness is 0, but that makes things even more context sensitive.
>
> > Do people think that these options are sensible?
> > +Kenn Knowles +Thomas Groh +Ben Chambers is this a fair summary of our
> > discussion?
>
> I think this is a fair summary of the discussions we've had.
>

Do we have any sort of trigger test suite, or high-level properties
triggers must satisfy?

I'd propose the following property:
Given a PCollection with a particular trigger, that is being passed through
a GBK, processing through the downstream aggregations by default should
happens without dropping or introducing any more data [unless a triggering
strategy is explicitly set on a downstream aggregation to something that
violates this property].

For example:
PCollection<V> pc = ...  // e.g. TestStream

PCollection<V> regroup(PCollection<V> pc, int n) {
  return pc.apply(WithKeys.of(random key in 0..n))

.apply(GroupByKey.create()).apply(Values.create()).apply(Flatten.iterables());
}

Then pc.regroup(n).regroup(1) == pc.regroup(n), regardless of pc's
triggering, windowing and accumulation mode.

This property is currently definitely violated for some triggers; and it's
definitely violated for the accumulating mode; both aspects must be fixed.


> - Robert
>

Re: Guarding against unsafe triggers at construction time

Posted by Robert Bradshaw <ro...@google.com>.
On Mon, Dec 4, 2017 at 3:19 PM, Eugene Kirpichov <ki...@google.com> wrote:
> Hi,
>
> After a recent investigation of a data loss bug caused by unintuitive
> behavior of some kinds of triggers, we had a discussion about how we can
> protect against future issues like this, and I summarized it in
> https://issues.apache.org/jira/browse/BEAM-3288 . Copying here:
>
> Current Beam trigger semantics are rather confusing and in some cases
> extremely unsafe, especially if the pipeline includes multiple chained GBKs.
> One example of that is https://issues.apache.org/jira/browse/BEAM-3169 .
>
> There's multiple issues:
>
> The API allows users to specify terminating top-level triggers (e.g.
> "trigger a pane after receiving 10000 elements in the window, and that's
> it"), but experience from user support shows that this is nearly always a
> mistake and the user did not intend to drop all further data.
>
> In general, triggers are the only place in Beam where data is being dropped
> without making a lot of very loud noise about it - a practice for which the
> PTransform style guide uses the language: "never, ever, ever do this".
>
> Continuation triggers are still worse. For context: continuation trigger is
> the trigger that's set on the output of a GBK and controls further
> aggregation of the results of this aggregation by downstream GBKs. The
> output shouldn't just use the same trigger as the input, because e.g. if the
> input trigger said "wait for an hour before emitting a pane", that doesn't
> mean that we should wait for another hour before emitting a result of
> aggregating the result of the input trigger. Continuation triggers try to
> simulate the behavior "as if a pane of the input propagated through the
> entire pipeline", but the implementation of individual continuation triggers
> doesn't do that. E.g. the continuation of "first N elements in pane" trigger
> is "first 1 element in pane", and if the results of a first GBK are further
> grouped by a second GBK onto more coarse key (e.g. if everything is grouped
> onto the same key), that effectively means that, of the keys of the first
> GBK, only one survives and all others are dropped (what happened in the data
> loss bug).
>
> The ultimate fix to all of these things is
> https://s.apache.org/beam-sink-triggers . However, it is a huge model
> change, and meanwhile we have to do something. The options are, in order of
> increasing backward incompatibility (but incompatibility in a "rejecting
> something that previously was accepted but extremely dangerous" kind of
> way):
>
> Make the continuation trigger of most triggers be the "always-fire" trigger.
> Seems that this should be the case for all triggers except the watermark
> trigger. This will definitely increase safety, but lead to more eager firing
> of downstream aggregations. It also will violate a user's expectation that a
> fire-once trigger fires everything downstream only once, but that
> expectation appears impossible to satisfy safely.

Note that firing more often for multiply stacked triggers, especially
in the case of accumulating mode, easily leads to data corruption bugs
where elements are duplicated and/or overcounted. I suppose that bad
data is slightly easier to detect than missing data, but automatically
turning once triggers into more-than-once triggers (automatically, or
manually by prohibiting the former) isn't a straightforward fix.

> Make the continuation trigger of some triggers be the "invalid" trigger,
> i.e. require the user to set it explicitly: there's in general no good and
> safe way to infer what a trigger on a second GBK "truly" should be, based on
> the trigger of the PCollection input into a first GBK. This is especially
> true for terminating triggers.

I think this should be on the table until we come up with good
semantics, which may not be until sink triggers. It's backwards
incompatible, but in a bug-fixing and verbose way (as opposed silently
changing semantics/behavior/outputs with the first option). Note that
sometimes for composite operations one does not have access to the
second grouping operation, so we must at least provide an opt-out
flag.

> Prohibit top-level terminating triggers entirely. This will ensure that the
> only data that ever gets dropped is "droppably late" data.

The AfterWatermark.withEarlyFirings(...) is a top-level terminating
trigger that we should keep allowing, and has perfectly reasonable
continuation semantics. I suppose we could only allow it if allowed
lateness is 0, but that makes things even more context sensitive.

> Do people think that these options are sensible?
> +Kenn Knowles +Thomas Groh +Ben Chambers is this a fair summary of our
> discussion?

I think this is a fair summary of the discussions we've had.

- Robert