You are viewing a plain text version of this content. The canonical link for it is here.
Posted to dev@beam.apache.org by Robert Bradshaw <ro...@google.com> on 2021/03/18 00:39:01 UTC

Re: Do we need synchronized processing time? / What to do about "continuation triggers"?

On Thu, Feb 25, 2021 at 12:53 AM Jan Lukavský <je...@seznam.cz> wrote:

> I get this, this makes totally sense. But - what else could the
> propagation meaningfully do, then to propagate the 10 seconds triggering to
> the very first GBK(s) and then try to push the outcome of these PTransforms
> as fast as possible through the pipeline? Yes, seems it would require
> retractions, at least in cases when the DAG contains multiple paths from
> root(s) to leaf. It seems to me, that the intermediate GBK(s) play no role,
> because if they do not trigger as fast as possible (and retract wrongly
> triggered outputs due to out-of-orderness), what they do, is they add
> latency and actually make the "sink triggering" not trigger at the
> configured frequency. Everything else seems clear to me, I just don't get
> this part. Is is possible to describe a specific a example where an inner
> GBK would trigger with some different trigger than with each pane?
>
I just realized this was never answered. Yes, I do think the initial
triggering would (typically) be propagated up to the initial GBK (or
possibly even influence the Source), though not that "triggering" here
means late data handling, accumulation mode, etc. not just the trigger.
It's possible that multiple sinks could flow up to the same roots, in which
case a triggering would be chosen that could satisfy them both. As for
intermediate GBKs, I do not think the spec would require them to fire as
fast as possible. For example, if the request was to be up to date to the
hour, one could certainly imagine intermediate GBKs firing at some modest
fraction of an hour to balance latency with performance (and perhaps the
upstream triggers firing faster than once an hour as well, to compute
partial results rather than having expensive "data dumps" every 60
minutes).

The biggest shift, of course, is being a more declarative API to describe
one's intent, and the system can work out the best way to satisfy that.
Many details TBD.



>  Jan
> On 2/25/21 12:44 AM, Kenneth Knowles wrote:
>
>
>
> On Wed, Feb 24, 2021 at 12:44 AM Jan Lukavský <je...@seznam.cz> wrote:
>
>> Hi Robert,
>>
>> > Here "sink" is really any observable outside effect, so I think "how
>> often output should be written" and "how quickly output should react to the
>> change of input" are the same.
>>
>> The difference is in the input trigger - let's imagine, that I have two
>> chained GBKs (A and B). If I trigger A every minute, but B every second, I
>> will output 60 records per minute, but 59 of them will be the same. That's
>> why it seems to me, that meaningful "sink" triggering has to start at the
>> input and then propagate with each pane.
>>
> The idea with sink triggering is that the level of abstraction is raised.
> You have a DoFn (more generally IO transform) that writes to some external
> system, and you request updates every ten seconds. This specification is
> propagated to cause all the GBKs in the pipeline to emit data at a rate to
> enable updates to that IO every ten seconds.
>
> Sinks will need separate configurations to handle multiple panes
> (updates/retractions) vs final values, and we can validate that a sink can
> support a particular triggering strategy. Sinks already need this, but we
> haven't solved the problem very formally or systematically. In many cases,
> these are just two different sinks - for example a CSV file with an extra
> column to indicate overwrite/retraction is really a different sink than
> just appending. They write to the same storage system, but the relationship
> of the input records to the output storage differs.
>
> There's a lot of unsolved problems in terms of exactly how the triggering
> requirements of a sink can feed back to upstream aggregations to cause them
> to trigger at appropriate times. It could be static (inferring upstream
> triggering) but seems like it might have to be dynamic (running a state
> machine at the sink that broadcasts messages). I don't think this is
> straightforward, nor is it guaranteed to be doable without knobs or some
> fresh ideas.
>
> Kenn
>
>> > As an example, if I want, say, hourly output, triggering hourly at the
>> source and then as quickly as possible from then on may be wasteful. It may
>> also be desirable to arrange such that certain transforms only have a
>> single pane per window, which is easier to propagate up than down. As
>> another example, consider accumulating vs. discarding. If I have
>> CombineValues(sum) followed by a re-keying and another CombineValues(sum),
>> and I want the final output to be accumulating, the first must be
>> discarding (or, better, retractions). Propagating upwards is possible in a
>> way propagating downward is not.
>>
>> I'm not sure I understand this. If I want hourly output, I cannot trigger
>> source with lower frequency. If I trigger source with hourly, but do not
>> propagate this as fast as possible, I'm inevitably introducing additional
>> latency (that's the definition of "not as fast as possible") in downstream
>> processing. Therefore the final triggering cannot be "hourly output" at
>> least not with regard to the rate of change in inputs.
>> On 2/23/21 5:47 PM, Robert Bradshaw wrote:
>>
>> On Tue, Feb 23, 2021 at 1:07 AM Jan Lukavský <je...@seznam.cz> wrote:
>>
>>> First, +1 to the conclusion of this thread.
>>>
>>> One note regarding the composite transforms and triggers *inside* those
>>> transforms - I think that propagating the triggering from input PCollection
>>> might be even dangerous and composite PTransforms that would be sensitive
>>> to the change of triggering will (should!) override the input triggering,
>>> and therefore adjusting it upfront will not work. There is clear option for
>>> composite PTransform (which includes one or more GBKs) to create API to
>>> specify the _input_ triggering of the composite as a whole, i.e.
>>>
>>>  input.apply(MyComposite.create().triggering())
>>>
>>> which (consistently with how triggering works for pure GBK) would change
>>> the input triggering (if we define trigger as "buffer input in state, flush
>>> buffer when trigger fires") of the PTransform. The PTransform knows how it
>>> expands and so it is quite easy to do the output triggering correctly.
>>>
>> When we originally explored this (for windowing, before triggering
>> existed) we looked at the number of composite operations (combining,
>> joining, cogbk, ...) that contained GBKs and realized it would add a lot of
>> boilerplate to manually pass through the windowing information to each.
>> Worse, this is a burden placed on every author of a composite operation
>> (and omitting this argument, or hard coding a default, would be strictly
>> worse). Triggering doesn't flow as nicely, but requiring it on every
>> subtransform invocation during pipeline construction would have the same
>> downsides of verbosity.
>>
>>> Regarding the sink triggering - out of curiosity, how does that differ
>>> from applying the triggering on the very first GBK(s) and the subsequently
>>> trigger all downstream GBKs using AfterPane.elementCountAtLeast(1)? It
>>> seems to me, that from user perspective what I will want to define is not
>>> "how often output should be written", but "how quickly output should react
>>> to the change of input" - therefore I *must* trigger with at least this
>>> frequency from the source and then propagate each pane as quickly as
>>> possible to the output. Am I missing something?
>>>
>> Here "sink" is really any observable outside effect, so I think "how
>> often output should be written" and "how quickly output should react to the
>> change of input" are the same.
>>
>> As an example, if I want, say, hourly output, triggering hourly at the
>> source and then as quickly as possible from then on may be wasteful. It may
>> also be desirable to arrange such that certain transforms only have a
>> single pane per window, which is easier to propagate up than down. As
>> another example, consider accumulating vs. discarding. If I have
>> CombineValues(sum) followed by a re-keying and another CombineValues(sum),
>> and I want the final output to be accumulating, the first must be
>> discarding (or, better, retractions). Propagating upwards is possible in a
>> way propagating downward is not.
>>
>>
>>
>>>
>>>  Jan
>>>
>>>
>>> On 2/22/21 9:53 PM, Reuven Lax wrote:
>>>
>>> I really wish that we had found the time to build sink triggers. Jan is
>>> right - specifying triggers up front and having them propagate down is
>>> confusing (it's also a bit confusing for Windows, but with Windows the
>>> propagation at least makes sense). The fact that users rarely have access
>>> to the actual GBK operation means that allowing them to specify triggers on
>>> their sinks is the best approach.
>>>
>>> On Mon, Feb 22, 2021 at 12:48 PM Robert Bradshaw <ro...@google.com>
>>> wrote:
>>>
>>>> On Mon, Feb 22, 2021 at 11:51 AM Kenneth Knowles <ke...@apache.org>
>>>> wrote:
>>>>
>>>>> I agree completely: Triggers control the output of the GBK.
>>>>>
>>>>> The issue is composite transforms, where there will be a GBK deep
>>>>> inside some code and the user cannot adjust the triggering.
>>>>>
>>>>> What a user really wants is "sink triggers
>>>>> <https://s.apache.org/beam-sink-triggers>" [1], a purely hypothetical
>>>>> feature where they specify the latency requirements on each _output_ and
>>>>> everything else is figured out automatically. Unfortunately, sink triggers
>>>>> require retractions, so each PCollection can be a complete changelog.
>>>>> Otherwise transformations cannot be transparently correct throughout a
>>>>> pipeline and triggers cannot be decoupled from pipeline logic. Retractions
>>>>> themselves are not necessarily complex in some cases (Flink SQL has them -
>>>>> they are extra easy for "pure" code) but require a massive working of the
>>>>> library of transforms, particularly IOs. And backwards compatibility
>>>>> concerns for existing DoFns are somewhat tricky. We've had two prototypes
>>>>> [2] [3] and some important design investigations [4], but no time to really
>>>>> finish adding them, even as just an optional experiment. And once we have
>>>>> retractions, there is still a lot to figure out to finish sink triggers.
>>>>> They may not even really be possible!
>>>>>
>>>>> So for now, we do our best with the user setting up triggering at the
>>>>> beginning of the pipeline instead of the end of the pipeline. The very
>>>>> first GBK (which may be deep in library code) is controlled by the
>>>>> triggering they set up and all the rest get the "continuation trigger"
>>>>> which tries to just let the data flow. Unless they set up another bit of
>>>>> triggering. Some of our transforms do this for various reasons.
>>>>>
>>>>> I think the conclusion of this particular thread is:
>>>>>
>>>>>  - make all the SDKs use AfterSynchronizedProcessingTime triggers
>>>>>  - allow runners to do whatever they want when they see
>>>>> AfterSynchronizedProcessingTime trigger
>>>>>  - remove TimeDomain.afterSynchronizedProcessingTime from the proto
>>>>> since it is only for timers and they should not use this
>>>>>  - later, figure out if we want to add support for making downstream
>>>>> triggering optional (could be useful prep for sink triggers)
>>>>>
>>>>
>>>> +1
>>>>
>>>>
>>>>> [1] https://s.apache.org/beam-sink-triggers
>>>>> [2] https://github.com/apache/beam/pull/4742
>>>>> [3] https://github.com/apache/beam/pull/9199
>>>>> [4] https://s.apache.org/beam-retractions
>>>>>
>>>>> On Mon, Feb 22, 2021 at 1:28 AM Jan Lukavský <je...@seznam.cz> wrote:
>>>>>
>>>>>> The same holds true for pane accumulation mode.
>>>>>>
>>>>>>  Jan
>>>>>> On 2/22/21 10:21 AM, Jan Lukavský wrote:
>>>>>>
>>>>>> Hi,
>>>>>>
>>>>>> I'm not sure if I got everything from this thread right, but from my
>>>>>> point of view, triggers are property of GBK. They are property of neither
>>>>>> windowing, nor PCollection, but relate solely to GBK. This can be seen from
>>>>>> the fact, that unlike windowFn, triggers are completely ignored in stateful
>>>>>> ParDo (there is no semantics for them, which is fine). It would be cool if
>>>>>> the model could be adjusted for that - this would actually mean, that the
>>>>>> correct place, where to specify triggering is not Window PTransform, but
>>>>>> the GBK, i.e.
>>>>>>
>>>>>>  input.apply(GroupByKey.create().triggering(...))
>>>>>>
>>>>>> That would imply we simply have default trigger for all GBKs, unless
>>>>>> explicitly changed, but for that particular instance only. I'm not sure
>>>>>> what the impacts on pipeline compatibility would be, though.
>>>>>>
>>>>>>  Jan
>>>>>> On 2/19/21 12:09 AM, Robert Bradshaw wrote:
>>>>>>
>>>>>> On Wed, Feb 17, 2021 at 1:56 PM Kenneth Knowles <ke...@apache.org>
>>>>>> wrote:
>>>>>>
>>>>>>>
>>>>>>> On Wed, Feb 17, 2021 at 1:06 PM Robert Bradshaw <ro...@google.com>
>>>>>>> wrote:
>>>>>>>
>>>>>>>> I would prefer to leave downstream triggering up to the runner (or,
>>>>>>>> better, leave upstream triggering up to the runner, a la sink triggers),
>>>>>>>> but one problem is that without an explicit AfterSynchronizedProcessingTime
>>>>>>>> one can't tell if the downstream ProcessingTime between two groupings is
>>>>>>>> due to an explicit re-triggering between them or inherited from one to the
>>>>>>>> other.
>>>>>>>>
>>>>>>>
>>>>>>> I mean to propose that there should be no triggering specified
>>>>>>> unless due to explicit re-triggering.
>>>>>>>
>>>>>>
>>>>>> You're saying that we leave the trigger (and perhaps other) fields of
>>>>>> the WindowingStrategy attached to PCollections downstream the first GBK
>>>>>> unset in the proto? And let runners walk over the graph to infer it? I
>>>>>> could be OK with making this legal, though updating all SDKs and Runners to
>>>>>> handle this doesn't seem high priority at the moment.
>>>>>>
>>>>>>
>>>>>>>
>>>>>>> (and BTW yes I agree about sink triggers, but we need retractions
>>>>>>> and probably some theoretical work before we can aim for that)
>>>>>>>
>>>>>>> Kenn
>>>>>>>
>>>>>>>
>>>>>>>> On Wed, Feb 17, 2021 at 12:37 PM Kenneth Knowles <ke...@apache.org>
>>>>>>>> wrote:
>>>>>>>>
>>>>>>>>> Just for the thread I want to comment on another, more drastic
>>>>>>>>> approach: eliminate continuation triggers from the model, leaving
>>>>>>>>> downstream triggering up to a runner. This approach is not viable because
>>>>>>>>> transforms may need to change their behavior based on whether or not a
>>>>>>>>> trigger will fire more than once. Transforms can and do inspect the
>>>>>>>>> windowing strategy to do things differently.
>>>>>>>>>
>>>>>>>>> Kenn
>>>>>>>>>
>>>>>>>>> On Wed, Feb 17, 2021 at 11:47 AM Reuven Lax <re...@google.com>
>>>>>>>>> wrote:
>>>>>>>>>
>>>>>>>>>> I'll say that synchronized processing time has confused users
>>>>>>>>>> before. Users sometimes use processing-time triggers to optimize latency,
>>>>>>>>>> banking that that will decouple stage latency from the long-tail latency of
>>>>>>>>>> previous stages. However continuation triggers silently switching to
>>>>>>>>>> synchronized processing time has defeated that, and it wasn't clear to
>>>>>>>>>> users why.
>>>>>>>>>>
>>>>>>>>>> On Wed, Feb 17, 2021 at 11:12 AM Robert Bradshaw <
>>>>>>>>>> robertwb@google.com> wrote:
>>>>>>>>>>
>>>>>>>>>>> On Fri, Feb 12, 2021 at 9:09 AM Kenneth Knowles <ke...@apache.org>
>>>>>>>>>>> wrote:
>>>>>>>>>>>
>>>>>>>>>>>>
>>>>>>>>>>>> On Thu, Feb 11, 2021 at 9:38 PM Robert Bradshaw <
>>>>>>>>>>>> robertwb@google.com> wrote:
>>>>>>>>>>>>
>>>>>>>>>>>>> Of course the right answer is to just implement sink triggers
>>>>>>>>>>>>> and sidestep the question altogether :).
>>>>>>>>>>>>>
>>>>>>>>>>>>> In the meantime, I think leaving
>>>>>>>>>>>>> AfterSynchronizedProcessingTime in the model makes the most sense, and
>>>>>>>>>>>>> runners can choose an implementation between firing eagerly and waiting
>>>>>>>>>>>>> some amount of time until they think all (most?) downstream results are in
>>>>>>>>>>>>> before firing, depending on how smart the runner wants to be. As you point
>>>>>>>>>>>>> out, they're all correct, and we'll have multiple firings due to the
>>>>>>>>>>>>> upstream trigger anyway, and this is safer than it used to be (though still
>>>>>>>>>>>>> possibly requires work).
>>>>>>>>>>>>>
>>>>>>>>>>>>
>>>>>>>>>>>> Just to clarify, as I got a little confused, is your
>>>>>>>>>>>> suggestion: Leave AfterSynchronizedProcessingTime* triggers in the
>>>>>>>>>>>> model/proto, let the SDK put them in where they want, and let runners
>>>>>>>>>>>> decide how to interpret them? (this SGTM and requires the least/no changes)
>>>>>>>>>>>>
>>>>>>>>>>>
>>>>>>>>>>> Yep. We may want to update Python/Go to produce
>>>>>>>>>>> AfterSynchronizedProcessingTime downstream of ProcessingTime triggers too,
>>>>>>>>>>> eventually, to better express intent.
>>>>>>>>>>>
>>>>>>>>>>>
>>>>>>>>>>>> Kenn
>>>>>>>>>>>>
>>>>>>>>>>>> *noting that TimeDomain.SYNCHRONIZED_PROCESSING_TIME is not
>>>>>>>>>>>> related to this, except in implementation, and should be removed either way.
>>>>>>>>>>>>
>>>>>>>>>>>>
>>>>>>>>>>>>> On Wed, Feb 10, 2021 at 1:37 PM Kenneth Knowles <
>>>>>>>>>>>>> kenn@apache.org> wrote:
>>>>>>>>>>>>>
>>>>>>>>>>>>>> Hi all,
>>>>>>>>>>>>>>
>>>>>>>>>>>>>> TL;DR:
>>>>>>>>>>>>>> 1. should we replace "after synchronized processing time"
>>>>>>>>>>>>>> with "after count 1"?
>>>>>>>>>>>>>> 2. should we remove "continuation trigger" and leave this to
>>>>>>>>>>>>>> runners?
>>>>>>>>>>>>>>
>>>>>>>>>>>>>> ----
>>>>>>>>>>>>>>
>>>>>>>>>>>>>> "AfterSynchronizedProcessingTime" triggers were invented to
>>>>>>>>>>>>>> solve a specific problem. They are inconsistent across SDKs today.
>>>>>>>>>>>>>>
>>>>>>>>>>>>>>  - You have an aggregation/GBK with aligned processing time
>>>>>>>>>>>>>> trigger like ("output every minute on the minute")
>>>>>>>>>>>>>>  - You have a downstream aggregation/GBK between that and the
>>>>>>>>>>>>>> sink
>>>>>>>>>>>>>>  - You expect to have about one output every minute per
>>>>>>>>>>>>>> key+window pair
>>>>>>>>>>>>>>
>>>>>>>>>>>>>> Any output of the upstream aggregation may contribute to any
>>>>>>>>>>>>>> key+window of the downstream aggregation. The
>>>>>>>>>>>>>> AfterSynchronizedProcessingTime trigger waits for all the processing time
>>>>>>>>>>>>>> based triggers to fire and commit their outputs. The downstream aggregation
>>>>>>>>>>>>>> will output as fast as possible in panes consistent with the upstream
>>>>>>>>>>>>>> aggregation.
>>>>>>>>>>>>>>
>>>>>>>>>>>>>>  - The Java SDK behavior is as above, to output "as fast as
>>>>>>>>>>>>>> reasonable".
>>>>>>>>>>>>>>  - The Python SDK never uses
>>>>>>>>>>>>>> "AfterSynchronizedProcessingTime" triggers but simply propagates the same
>>>>>>>>>>>>>> trigger to the next GBK, creating additional delay.
>>>>>>>>>>>>>>  - I don't know what the Go SDK may do, if it supports this
>>>>>>>>>>>>>> at all.
>>>>>>>>>>>>>>
>>>>>>>>>>>>>> Any behavior could be defined as "correct". A simple option
>>>>>>>>>>>>>> could be to have the downstream aggregation "fire always" aka "after
>>>>>>>>>>>>>> element count 1". How would this change things? We would potentially see
>>>>>>>>>>>>>> many more outputs.
>>>>>>>>>>>>>>
>>>>>>>>>>>>>> Why did we do this in the first place? There are (at least)
>>>>>>>>>>>>>> these reasons:
>>>>>>>>>>>>>>
>>>>>>>>>>>>>>  - Previously, triggers could "finish" an aggregation thus
>>>>>>>>>>>>>> dropping all further data. In this case, waiting for all outputs is
>>>>>>>>>>>>>> critical or else you lose data. Now triggers cannot finish aggregations.
>>>>>>>>>>>>>>  - Whenever there may be more than one pane, a user has to
>>>>>>>>>>>>>> write logic to compensate and deal with it. Changing from guaranteed single
>>>>>>>>>>>>>> pane to multi-pane would break things. So if the user configures a single
>>>>>>>>>>>>>> firing, all downstream aggregations must respect it. Now that triggers
>>>>>>>>>>>>>> cannot finish, I think processing time can only be used in multi-pane
>>>>>>>>>>>>>> contexts anyhow.
>>>>>>>>>>>>>>  - The above example illustrates how the behavior in Java
>>>>>>>>>>>>>> maintains something that the user will expect. Or so we think. Maybe users
>>>>>>>>>>>>>> don't care.
>>>>>>>>>>>>>>
>>>>>>>>>>>>>> How did we get into this inconsistent state? When the user
>>>>>>>>>>>>>> specifies triggering it applies to the very nearest aggregation/GBK. The
>>>>>>>>>>>>>> SDK decides what triggering to insert downstream. One possibility is to
>>>>>>>>>>>>>> remove this and have it unspecified, left to runner behavior.
>>>>>>>>>>>>>>
>>>>>>>>>>>>>> I think maybe these pieces of complexity are both not helpful
>>>>>>>>>>>>>> and also not (necessarily) breaking changes to alter, especially
>>>>>>>>>>>>>> considering we have inconsistency in the model.
>>>>>>>>>>>>>>
>>>>>>>>>>>>>> WDYT? And I wonder what this means for xlang and
>>>>>>>>>>>>>> portability... how does continuation triggering even work? (if at all)
>>>>>>>>>>>>>>
>>>>>>>>>>>>>> Kenn
>>>>>>>>>>>>>>
>>>>>>>>>>>>>