You are viewing a plain text version of this content. The canonical link for it is here.
Posted to dev@beam.apache.org by Amit Sela <am...@gmail.com> on 2016/10/21 11:59:07 UTC

[DISCUSS] Deferring (pre) combine for merging windows.

I'd like to raise an issue that was discussed in BEAM-696
<https://issues.apache.org/jira/browse/BEAM-696>.
I won't recap here because it would be extensive (and probably exhaustive),
and I'd also like to restart the discussion here rather then summarize it.

*The problem*
In the case of (main) input in a merging window (e.g. Sessions) with
sideInputs, pre-combining might lead to non-deterministic behaviour, for
example:
Main input: e1 (time: 3), e2 (time: 5)
Session: gap duration of 3 -> e1 alone belongs to [3, 6), e2 alone [5, 8),
combined together the merging of their windows yields [3, 8).
Matching SideInputs with FixedWindows of size 2 should yield - e1 matching
sideInput window [4, 6), e2 [6, 8), merged [6, 8).
Now, if the sideInput is used in a merging step of the combine, and both
elements are a part of the same bundle, the sideInput accessed will
correspond to [6, 8) which is the expected behaviour, but if e1 is
pre-combined in a separate bundle, it will access sideInput for [4, 6)
which is wrong.
** this can tends to be a bit confusing, so any clarifications/corrections
are most welcomed.*

*Solutions*
The optimal solution would be to differ until trigger in case of merging
windows with sideInputs that are not "agnostic" to such behaviour, but this
is clearly not feasible since the nature and use of sideInputs in
CombineFns are opaque.
Second best would be to differ until trigger *only* if sideInputs are used
for merging windows - pretty sure this is how Flink and Dataflow (soon
Spark) runners do that.

*Tradeoffs*
This seems like a very user-friendly way to apply authored pipelines
correctly, but this also means that users who called for a Combine
transformation will get a Grouping transformation instead (sort of the
opposite of combiner lifting ? a combiner unwrapping ?).
For the SDK, Combine is simply a composite transform, but keep in mind that
this affects runner optimization.
The price to pay here is (1) shuffle all elements into a single bundle (the
cost varies according to a runner's typical bundle size) (2) state can grow
as processing is differed and not compacted until triggered.

IMHO, the execution should remain faithful to what the pipeline states, and
if this results in errors, well... it happens.
There are many legitimate use cases where an actual GroupByKey should be
used (regardless of sideInputs), such as sequencing of events in a window,
and I don't see the difference here.

As stated above, I'm (almost) not recapping anyones notes as they are
persisted in BEAM-696, so if you had something to say please provide you
input here.
I will note that Ben Chambers and Pei He mentioned that even with
differing, this could still run into some non-determinism if there are
triggers controlling when we extract output because non-merging windows'
trigger firing is non-deterministic.

Thanks,
Amit

Re: [DISCUSS] Deferring (pre) combine for merging windows.

Posted by Aljoscha Krettek <al...@apache.org>.
@Amit: Yes, Flink is more "what you write is what you get". For example, in
Flink we have a Fold function for windows which cannot be efficiently
computed with merging windows (it would require using a "group by" window
and then folding the iterable). We just don't allow this.

For Beam, I think it's ok if we clearly define Combine in terms of
GroupByKey | CombineValues (which we do). With different runners it's hard
to enforce common optimisation strategies.

On Sun, 23 Oct 2016 at 06:02 Robert Bradshaw <ro...@google.com.invalid>
wrote:

> On Sat, Oct 22, 2016 at 2:38 AM, Amit Sela <am...@gmail.com> wrote:
> > I understand the semantics, but I feel like there might be a different
> > point of view for open-source runners.
>
> It seems we're losing a major promise of the runner interchangeability
> story if different runners can give different results for a
> well-defined transformation. I strongly feel we should avoid that path
> whenever possible. Specifically in this case Combine.perKey should
> mean the same thing on all runners (namely its composite definition),
> and only be executed differently when it's safe to do so.
>
> > Dataflow is a service, and it tries to do it's best to optimize execution
> > while users don't have to worry about internal implementation (they are
> not
> > aware of it).
> > I can assure
> > <
> https://databricks.gitbooks.io/databricks-spark-knowledge-base/content/best_practices/prefer_reducebykey_over_groupbykey.html
> >
> > you that for Spark users, applying groupByKey instead of combinePerKey is
> > an important note.
>
> For sure. Dataflow calls this out too. See the second star at
> https://cloud.google.com/dataflow/model/combine#using-combine-perkey
> (though it's not called out as prominently as it is for Spark
> users--likely should be more). Beam documentation should make this
> point as well.
>
> > @Aljoscha do Flink users (working on Flink native API) usually care about
> > this difference of implementation ?
> > Any other runners that can provide input ?
>
> IIRC, Flink and Dataflow (and, trivially, the direct runner) all avoid
> this unsafe optimization when merging windows are mixed with
> non-global side inputs.
>
> Note also that the user of the Combine.perKey transform may not know
> the choice of windowing of the main or side inputs, so can't make this
> determination of whether it's safe to use this optimization. (As a
> concrete example, suppose I created a TopNPercent transform that did a
> global count and passed that as a side input to the Top CombineFn.)
>
> > On Sat, Oct 22, 2016 at 2:25 AM Robert Bradshaw
> <ro...@google.com.invalid>
> > wrote:
> >
> > Combine.perKey() is defined as GroupByKey() | Combine.values().
> >
> > A runner is free, in fact encouraged, to take advantage of the
> > associative properties of CombineFn to compute the result of
> > GroupByKey() | Combine.values() as cheaply as possible, but it is
> > incorrect to produce something that could not have been produced by
> > this composite implementation. (In the case of deterministic trigger
> > firing, (e.g. the default trigger), plus assuming of course a
> > associative, deterministic CombineFn, there is exactly one correct
> > output for every input no matter the WindowFns).
> >
> > A corollary to this is that we cannot apply combining operations that
> > inspect the main input window (including side inputs where the mapping
> > is anything but the constant map (like to GlobalWindow)) until the
> > main input window is known.
> >
> >
> > On Fri, Oct 21, 2016 at 3:50 PM, Amit Sela <am...@gmail.com> wrote:
> >> Please excuse my typos and apply "s/differ/defer/g" ;-).
> >> Amit.
> >>
> >> On Fri, Oct 21, 2016 at 2:59 PM Amit Sela <am...@gmail.com> wrote:
> >>
> >>> I'd like to raise an issue that was discussed in BEAM-696
> >>> <https://issues.apache.org/jira/browse/BEAM-696>.
> >>> I won't recap here because it would be extensive (and probably
> >>> exhaustive), and I'd also like to restart the discussion here rather
> then
> >>> summarize it.
> >>>
> >>> *The problem*
> >>> In the case of (main) input in a merging window (e.g. Sessions) with
> >>> sideInputs, pre-combining might lead to non-deterministic behaviour,
> for
> >>> example:
> >>> Main input: e1 (time: 3), e2 (time: 5)
> >>> Session: gap duration of 3 -> e1 alone belongs to [3, 6), e2 alone [5,
> > 8),
> >>> combined together the merging of their windows yields [3, 8).
> >>> Matching SideInputs with FixedWindows of size 2 should yield - e1
> > matching
> >>> sideInput window [4, 6), e2 [6, 8), merged [6, 8).
> >>> Now, if the sideInput is used in a merging step of the combine, and
> both
> >>> elements are a part of the same bundle, the sideInput accessed will
> >>> correspond to [6, 8) which is the expected behaviour, but if e1 is
> >>> pre-combined in a separate bundle, it will access sideInput for [4, 6)
> >>> which is wrong.
> >>> ** this can tends to be a bit confusing, so any
> > clarifications/corrections
> >>> are most welcomed.*
> >>>
> >>> *Solutions*
> >>> The optimal solution would be to differ until trigger in case of
> merging
> >>> windows with sideInputs that are not "agnostic" to such behaviour, but
> > this
> >>> is clearly not feasible since the nature and use of sideInputs in
> >>> CombineFns are opaque.
> >>> Second best would be to differ until trigger *only* if sideInputs are
> >>> used for merging windows - pretty sure this is how Flink and Dataflow
> > (soon
> >>> Spark) runners do that.
> >>>
> >>> *Tradeoffs*
> >>> This seems like a very user-friendly way to apply authored pipelines
> >>> correctly, but this also means that users who called for a Combine
> >>> transformation will get a Grouping transformation instead (sort of the
> >>> opposite of combiner lifting ? a combiner unwrapping ?).
> >>> For the SDK, Combine is simply a composite transform, but keep in mind
> >>> that this affects runner optimization.
> >>> The price to pay here is (1) shuffle all elements into a single bundle
> >>> (the cost varies according to a runner's typical bundle size) (2) state
> > can
> >>> grow as processing is differed and not compacted until triggered.
> >>>
> >>> IMHO, the execution should remain faithful to what the pipeline states,
> >>> and if this results in errors, well... it happens.
> >>> There are many legitimate use cases where an actual GroupByKey should
> be
> >>> used (regardless of sideInputs), such as sequencing of events in a
> > window,
> >>> and I don't see the difference here.
> >>>
> >>> As stated above, I'm (almost) not recapping anyones notes as they are
> >>> persisted in BEAM-696, so if you had something to say please provide
> you
> >>> input here.
> >>> I will note that Ben Chambers and Pei He mentioned that even with
> >>> differing, this could still run into some non-determinism if there are
> >>> triggers controlling when we extract output because non-merging
> windows'
> >>> trigger firing is non-deterministic.
> >>>
> >>> Thanks,
> >>> Amit
> >>>
> >>>
>

Re: [DISCUSS] Deferring (pre) combine for merging windows.

Posted by Robert Bradshaw <ro...@google.com.INVALID>.
On Sat, Oct 22, 2016 at 2:38 AM, Amit Sela <am...@gmail.com> wrote:
> I understand the semantics, but I feel like there might be a different
> point of view for open-source runners.

It seems we're losing a major promise of the runner interchangeability
story if different runners can give different results for a
well-defined transformation. I strongly feel we should avoid that path
whenever possible. Specifically in this case Combine.perKey should
mean the same thing on all runners (namely its composite definition),
and only be executed differently when it's safe to do so.

> Dataflow is a service, and it tries to do it's best to optimize execution
> while users don't have to worry about internal implementation (they are not
> aware of it).
> I can assure
> <https://databricks.gitbooks.io/databricks-spark-knowledge-base/content/best_practices/prefer_reducebykey_over_groupbykey.html>
> you that for Spark users, applying groupByKey instead of combinePerKey is
> an important note.

For sure. Dataflow calls this out too. See the second star at
https://cloud.google.com/dataflow/model/combine#using-combine-perkey
(though it's not called out as prominently as it is for Spark
users--likely should be more). Beam documentation should make this
point as well.

> @Aljoscha do Flink users (working on Flink native API) usually care about
> this difference of implementation ?
> Any other runners that can provide input ?

IIRC, Flink and Dataflow (and, trivially, the direct runner) all avoid
this unsafe optimization when merging windows are mixed with
non-global side inputs.

Note also that the user of the Combine.perKey transform may not know
the choice of windowing of the main or side inputs, so can't make this
determination of whether it's safe to use this optimization. (As a
concrete example, suppose I created a TopNPercent transform that did a
global count and passed that as a side input to the Top CombineFn.)

> On Sat, Oct 22, 2016 at 2:25 AM Robert Bradshaw <ro...@google.com.invalid>
> wrote:
>
> Combine.perKey() is defined as GroupByKey() | Combine.values().
>
> A runner is free, in fact encouraged, to take advantage of the
> associative properties of CombineFn to compute the result of
> GroupByKey() | Combine.values() as cheaply as possible, but it is
> incorrect to produce something that could not have been produced by
> this composite implementation. (In the case of deterministic trigger
> firing, (e.g. the default trigger), plus assuming of course a
> associative, deterministic CombineFn, there is exactly one correct
> output for every input no matter the WindowFns).
>
> A corollary to this is that we cannot apply combining operations that
> inspect the main input window (including side inputs where the mapping
> is anything but the constant map (like to GlobalWindow)) until the
> main input window is known.
>
>
> On Fri, Oct 21, 2016 at 3:50 PM, Amit Sela <am...@gmail.com> wrote:
>> Please excuse my typos and apply "s/differ/defer/g" ;-).
>> Amit.
>>
>> On Fri, Oct 21, 2016 at 2:59 PM Amit Sela <am...@gmail.com> wrote:
>>
>>> I'd like to raise an issue that was discussed in BEAM-696
>>> <https://issues.apache.org/jira/browse/BEAM-696>.
>>> I won't recap here because it would be extensive (and probably
>>> exhaustive), and I'd also like to restart the discussion here rather then
>>> summarize it.
>>>
>>> *The problem*
>>> In the case of (main) input in a merging window (e.g. Sessions) with
>>> sideInputs, pre-combining might lead to non-deterministic behaviour, for
>>> example:
>>> Main input: e1 (time: 3), e2 (time: 5)
>>> Session: gap duration of 3 -> e1 alone belongs to [3, 6), e2 alone [5,
> 8),
>>> combined together the merging of their windows yields [3, 8).
>>> Matching SideInputs with FixedWindows of size 2 should yield - e1
> matching
>>> sideInput window [4, 6), e2 [6, 8), merged [6, 8).
>>> Now, if the sideInput is used in a merging step of the combine, and both
>>> elements are a part of the same bundle, the sideInput accessed will
>>> correspond to [6, 8) which is the expected behaviour, but if e1 is
>>> pre-combined in a separate bundle, it will access sideInput for [4, 6)
>>> which is wrong.
>>> ** this can tends to be a bit confusing, so any
> clarifications/corrections
>>> are most welcomed.*
>>>
>>> *Solutions*
>>> The optimal solution would be to differ until trigger in case of merging
>>> windows with sideInputs that are not "agnostic" to such behaviour, but
> this
>>> is clearly not feasible since the nature and use of sideInputs in
>>> CombineFns are opaque.
>>> Second best would be to differ until trigger *only* if sideInputs are
>>> used for merging windows - pretty sure this is how Flink and Dataflow
> (soon
>>> Spark) runners do that.
>>>
>>> *Tradeoffs*
>>> This seems like a very user-friendly way to apply authored pipelines
>>> correctly, but this also means that users who called for a Combine
>>> transformation will get a Grouping transformation instead (sort of the
>>> opposite of combiner lifting ? a combiner unwrapping ?).
>>> For the SDK, Combine is simply a composite transform, but keep in mind
>>> that this affects runner optimization.
>>> The price to pay here is (1) shuffle all elements into a single bundle
>>> (the cost varies according to a runner's typical bundle size) (2) state
> can
>>> grow as processing is differed and not compacted until triggered.
>>>
>>> IMHO, the execution should remain faithful to what the pipeline states,
>>> and if this results in errors, well... it happens.
>>> There are many legitimate use cases where an actual GroupByKey should be
>>> used (regardless of sideInputs), such as sequencing of events in a
> window,
>>> and I don't see the difference here.
>>>
>>> As stated above, I'm (almost) not recapping anyones notes as they are
>>> persisted in BEAM-696, so if you had something to say please provide you
>>> input here.
>>> I will note that Ben Chambers and Pei He mentioned that even with
>>> differing, this could still run into some non-determinism if there are
>>> triggers controlling when we extract output because non-merging windows'
>>> trigger firing is non-deterministic.
>>>
>>> Thanks,
>>> Amit
>>>
>>>

Re: [DISCUSS] Deferring (pre) combine for merging windows.

Posted by Amit Sela <am...@gmail.com>.
I understand the semantics, but I feel like there might be a different
point of view for open-source runners.
Dataflow is a service, and it tries to do it's best to optimize execution
while users don't have to worry about internal implementation (they are not
aware of it).
I can assure
<https://databricks.gitbooks.io/databricks-spark-knowledge-base/content/best_practices/prefer_reducebykey_over_groupbykey.html>
you that for Spark users, applying groupByKey instead of combinePerKey is
an important note.
@Aljoscha do Flink users (working on Flink native API) usually care about
this difference of implementation ?
Any other runners that can provide input ?

On Sat, Oct 22, 2016 at 2:25 AM Robert Bradshaw <ro...@google.com.invalid>
wrote:

Combine.perKey() is defined as GroupByKey() | Combine.values().

A runner is free, in fact encouraged, to take advantage of the
associative properties of CombineFn to compute the result of
GroupByKey() | Combine.values() as cheaply as possible, but it is
incorrect to produce something that could not have been produced by
this composite implementation. (In the case of deterministic trigger
firing, (e.g. the default trigger), plus assuming of course a
associative, deterministic CombineFn, there is exactly one correct
output for every input no matter the WindowFns).

A corollary to this is that we cannot apply combining operations that
inspect the main input window (including side inputs where the mapping
is anything but the constant map (like to GlobalWindow)) until the
main input window is known.


On Fri, Oct 21, 2016 at 3:50 PM, Amit Sela <am...@gmail.com> wrote:
> Please excuse my typos and apply "s/differ/defer/g" ;-).
> Amit.
>
> On Fri, Oct 21, 2016 at 2:59 PM Amit Sela <am...@gmail.com> wrote:
>
>> I'd like to raise an issue that was discussed in BEAM-696
>> <https://issues.apache.org/jira/browse/BEAM-696>.
>> I won't recap here because it would be extensive (and probably
>> exhaustive), and I'd also like to restart the discussion here rather then
>> summarize it.
>>
>> *The problem*
>> In the case of (main) input in a merging window (e.g. Sessions) with
>> sideInputs, pre-combining might lead to non-deterministic behaviour, for
>> example:
>> Main input: e1 (time: 3), e2 (time: 5)
>> Session: gap duration of 3 -> e1 alone belongs to [3, 6), e2 alone [5,
8),
>> combined together the merging of their windows yields [3, 8).
>> Matching SideInputs with FixedWindows of size 2 should yield - e1
matching
>> sideInput window [4, 6), e2 [6, 8), merged [6, 8).
>> Now, if the sideInput is used in a merging step of the combine, and both
>> elements are a part of the same bundle, the sideInput accessed will
>> correspond to [6, 8) which is the expected behaviour, but if e1 is
>> pre-combined in a separate bundle, it will access sideInput for [4, 6)
>> which is wrong.
>> ** this can tends to be a bit confusing, so any
clarifications/corrections
>> are most welcomed.*
>>
>> *Solutions*
>> The optimal solution would be to differ until trigger in case of merging
>> windows with sideInputs that are not "agnostic" to such behaviour, but
this
>> is clearly not feasible since the nature and use of sideInputs in
>> CombineFns are opaque.
>> Second best would be to differ until trigger *only* if sideInputs are
>> used for merging windows - pretty sure this is how Flink and Dataflow
(soon
>> Spark) runners do that.
>>
>> *Tradeoffs*
>> This seems like a very user-friendly way to apply authored pipelines
>> correctly, but this also means that users who called for a Combine
>> transformation will get a Grouping transformation instead (sort of the
>> opposite of combiner lifting ? a combiner unwrapping ?).
>> For the SDK, Combine is simply a composite transform, but keep in mind
>> that this affects runner optimization.
>> The price to pay here is (1) shuffle all elements into a single bundle
>> (the cost varies according to a runner's typical bundle size) (2) state
can
>> grow as processing is differed and not compacted until triggered.
>>
>> IMHO, the execution should remain faithful to what the pipeline states,
>> and if this results in errors, well... it happens.
>> There are many legitimate use cases where an actual GroupByKey should be
>> used (regardless of sideInputs), such as sequencing of events in a
window,
>> and I don't see the difference here.
>>
>> As stated above, I'm (almost) not recapping anyones notes as they are
>> persisted in BEAM-696, so if you had something to say please provide you
>> input here.
>> I will note that Ben Chambers and Pei He mentioned that even with
>> differing, this could still run into some non-determinism if there are
>> triggers controlling when we extract output because non-merging windows'
>> trigger firing is non-deterministic.
>>
>> Thanks,
>> Amit
>>
>>

Re: [DISCUSS] Deferring (pre) combine for merging windows.

Posted by Robert Bradshaw <ro...@google.com.INVALID>.
Combine.perKey() is defined as GroupByKey() | Combine.values().

A runner is free, in fact encouraged, to take advantage of the
associative properties of CombineFn to compute the result of
GroupByKey() | Combine.values() as cheaply as possible, but it is
incorrect to produce something that could not have been produced by
this composite implementation. (In the case of deterministic trigger
firing, (e.g. the default trigger), plus assuming of course a
associative, deterministic CombineFn, there is exactly one correct
output for every input no matter the WindowFns).

A corollary to this is that we cannot apply combining operations that
inspect the main input window (including side inputs where the mapping
is anything but the constant map (like to GlobalWindow)) until the
main input window is known.


On Fri, Oct 21, 2016 at 3:50 PM, Amit Sela <am...@gmail.com> wrote:
> Please excuse my typos and apply "s/differ/defer/g" ;-).
> Amit.
>
> On Fri, Oct 21, 2016 at 2:59 PM Amit Sela <am...@gmail.com> wrote:
>
>> I'd like to raise an issue that was discussed in BEAM-696
>> <https://issues.apache.org/jira/browse/BEAM-696>.
>> I won't recap here because it would be extensive (and probably
>> exhaustive), and I'd also like to restart the discussion here rather then
>> summarize it.
>>
>> *The problem*
>> In the case of (main) input in a merging window (e.g. Sessions) with
>> sideInputs, pre-combining might lead to non-deterministic behaviour, for
>> example:
>> Main input: e1 (time: 3), e2 (time: 5)
>> Session: gap duration of 3 -> e1 alone belongs to [3, 6), e2 alone [5, 8),
>> combined together the merging of their windows yields [3, 8).
>> Matching SideInputs with FixedWindows of size 2 should yield - e1 matching
>> sideInput window [4, 6), e2 [6, 8), merged [6, 8).
>> Now, if the sideInput is used in a merging step of the combine, and both
>> elements are a part of the same bundle, the sideInput accessed will
>> correspond to [6, 8) which is the expected behaviour, but if e1 is
>> pre-combined in a separate bundle, it will access sideInput for [4, 6)
>> which is wrong.
>> ** this can tends to be a bit confusing, so any clarifications/corrections
>> are most welcomed.*
>>
>> *Solutions*
>> The optimal solution would be to differ until trigger in case of merging
>> windows with sideInputs that are not "agnostic" to such behaviour, but this
>> is clearly not feasible since the nature and use of sideInputs in
>> CombineFns are opaque.
>> Second best would be to differ until trigger *only* if sideInputs are
>> used for merging windows - pretty sure this is how Flink and Dataflow (soon
>> Spark) runners do that.
>>
>> *Tradeoffs*
>> This seems like a very user-friendly way to apply authored pipelines
>> correctly, but this also means that users who called for a Combine
>> transformation will get a Grouping transformation instead (sort of the
>> opposite of combiner lifting ? a combiner unwrapping ?).
>> For the SDK, Combine is simply a composite transform, but keep in mind
>> that this affects runner optimization.
>> The price to pay here is (1) shuffle all elements into a single bundle
>> (the cost varies according to a runner's typical bundle size) (2) state can
>> grow as processing is differed and not compacted until triggered.
>>
>> IMHO, the execution should remain faithful to what the pipeline states,
>> and if this results in errors, well... it happens.
>> There are many legitimate use cases where an actual GroupByKey should be
>> used (regardless of sideInputs), such as sequencing of events in a window,
>> and I don't see the difference here.
>>
>> As stated above, I'm (almost) not recapping anyones notes as they are
>> persisted in BEAM-696, so if you had something to say please provide you
>> input here.
>> I will note that Ben Chambers and Pei He mentioned that even with
>> differing, this could still run into some non-determinism if there are
>> triggers controlling when we extract output because non-merging windows'
>> trigger firing is non-deterministic.
>>
>> Thanks,
>> Amit
>>
>>

Re: [DISCUSS] Deferring (pre) combine for merging windows.

Posted by Amit Sela <am...@gmail.com>.
Please excuse my typos and apply "s/differ/defer/g" ;-).
Amit.

On Fri, Oct 21, 2016 at 2:59 PM Amit Sela <am...@gmail.com> wrote:

> I'd like to raise an issue that was discussed in BEAM-696
> <https://issues.apache.org/jira/browse/BEAM-696>.
> I won't recap here because it would be extensive (and probably
> exhaustive), and I'd also like to restart the discussion here rather then
> summarize it.
>
> *The problem*
> In the case of (main) input in a merging window (e.g. Sessions) with
> sideInputs, pre-combining might lead to non-deterministic behaviour, for
> example:
> Main input: e1 (time: 3), e2 (time: 5)
> Session: gap duration of 3 -> e1 alone belongs to [3, 6), e2 alone [5, 8),
> combined together the merging of their windows yields [3, 8).
> Matching SideInputs with FixedWindows of size 2 should yield - e1 matching
> sideInput window [4, 6), e2 [6, 8), merged [6, 8).
> Now, if the sideInput is used in a merging step of the combine, and both
> elements are a part of the same bundle, the sideInput accessed will
> correspond to [6, 8) which is the expected behaviour, but if e1 is
> pre-combined in a separate bundle, it will access sideInput for [4, 6)
> which is wrong.
> ** this can tends to be a bit confusing, so any clarifications/corrections
> are most welcomed.*
>
> *Solutions*
> The optimal solution would be to differ until trigger in case of merging
> windows with sideInputs that are not "agnostic" to such behaviour, but this
> is clearly not feasible since the nature and use of sideInputs in
> CombineFns are opaque.
> Second best would be to differ until trigger *only* if sideInputs are
> used for merging windows - pretty sure this is how Flink and Dataflow (soon
> Spark) runners do that.
>
> *Tradeoffs*
> This seems like a very user-friendly way to apply authored pipelines
> correctly, but this also means that users who called for a Combine
> transformation will get a Grouping transformation instead (sort of the
> opposite of combiner lifting ? a combiner unwrapping ?).
> For the SDK, Combine is simply a composite transform, but keep in mind
> that this affects runner optimization.
> The price to pay here is (1) shuffle all elements into a single bundle
> (the cost varies according to a runner's typical bundle size) (2) state can
> grow as processing is differed and not compacted until triggered.
>
> IMHO, the execution should remain faithful to what the pipeline states,
> and if this results in errors, well... it happens.
> There are many legitimate use cases where an actual GroupByKey should be
> used (regardless of sideInputs), such as sequencing of events in a window,
> and I don't see the difference here.
>
> As stated above, I'm (almost) not recapping anyones notes as they are
> persisted in BEAM-696, so if you had something to say please provide you
> input here.
> I will note that Ben Chambers and Pei He mentioned that even with
> differing, this could still run into some non-determinism if there are
> triggers controlling when we extract output because non-merging windows'
> trigger firing is non-deterministic.
>
> Thanks,
> Amit
>
>