You are viewing a plain text version of this content. The canonical link for it is here.
Posted to dev@beam.apache.org by Jan Lukavský <je...@seznam.cz> on 2021/06/23 17:12:47 UTC

[DISCUSS] Do we have all the building block(s) to support iterations in Beam?

Hi,

I'd like to discuss a very rough idea. I didn't walk through all the 
corner cases and the whole idea has a lot of rough edges, so please bear 
with me. I was thinking about non-IO applications of splittable DoFn, 
and the main idea - and why it is called splittable - is that it can 
handle unbounded outputs per element. Then I was thinking about what can 
generate unbounded outputs per element _without reading from external 
source_ (as that would be IO application) - and then I realized that the 
data can - at least theoretically - come from a downstream transform. It 
would have to be passed over an RPC (gRPC probably) connection, it would 
probably require some sort of service discovery - as the feedback loop 
would have to be correctly targeted based on key - and so on (those are 
the rough edges).

But supposing this can be solved - what iterations actually mean is the 
we have a side channel, that come from downstream processing - and we 
need a watermark estimator for this channel, that is able to hold the 
watermark back until the very last element (at a certain watermark) 
finishes the iteration. The idea is then we could - in theory - create 
an Iteration PTransform, that would take another PTransform (probably 
something like PTransform<PCollection<KV<K, V>>, PCollection<KV<K, 
IterationResult<K, V>>>, where the IterationResult<K, V> would contain 
the original KV<K, V> and a stopping condition (true, false) and by 
creating the feedback loop from the output of this PCollection we could 
actually implement this without any need of support on the side of runners.

Does that seem like something that might be worth exploring?

  Jan


Re: [DISCUSS] Do we have all the building block(s) to support iterations in Beam?

Posted by Jan Lukavský <je...@seznam.cz>.
BTW, the iterations might break the (otherwise very useful) concept that 
elements arriving ON_TIME should stay ON_TIME throughout the complete 
computation. If an element has an excessive amount of iterations to 
complete, it _could_ be output late even though it would have arrived 
ON_TIME. But that would only mean, we might need two timeouts - one for 
releasing the watermark hold and another for cancelling the iteration 
completely.

On 6/23/21 10:43 PM, Jan Lukavský wrote:
>
> Right, one can "outsource" this functionality through external source, 
> but that is a sort-of hackish solution. The most serious problem is 
> that it "disconnects" the watermark of the feedback loop which can 
> make it tricky to correctly compute the downstream watermark. The SDF 
> approach seems to compute the watermark correctly (using per-key 
> watermark hold until the end of the cycle).
>
> On 6/23/21 10:25 PM, Luke Cwik wrote:
>> SDF isn't required as users already try to do things like this using 
>> UnboundedSource and Pubsub.
>>
>> On Wed, Jun 23, 2021 at 11:39 AM Reuven Lax <relax@google.com 
>> <ma...@google.com>> wrote:
>>
>>     This was explored in the past, though the design started getting
>>     very complex (watermarks of unbounded dimension, where each
>>     iteration has its own watermark dimension). At the time, the
>>     exploration petered out.
>>
>>     On Wed, Jun 23, 2021 at 10:13 AM Jan Lukavský <je.ik@seznam.cz
>>     <ma...@seznam.cz>> wrote:
>>
>>         Hi,
>>
>>         I'd like to discuss a very rough idea. I didn't walk through
>>         all the
>>         corner cases and the whole idea has a lot of rough edges, so
>>         please bear
>>         with me. I was thinking about non-IO applications of
>>         splittable DoFn,
>>         and the main idea - and why it is called splittable - is that
>>         it can
>>         handle unbounded outputs per element. Then I was thinking
>>         about what can
>>         generate unbounded outputs per element _without reading from
>>         external
>>         source_ (as that would be IO application) - and then I
>>         realized that the
>>         data can - at least theoretically - come from a downstream
>>         transform. It
>>         would have to be passed over an RPC (gRPC probably)
>>         connection, it would
>>         probably require some sort of service discovery - as the
>>         feedback loop
>>         would have to be correctly targeted based on key - and so on
>>         (those are
>>         the rough edges).
>>
>>         But supposing this can be solved - what iterations actually
>>         mean is the
>>         we have a side channel, that come from downstream processing
>>         - and we
>>         need a watermark estimator for this channel, that is able to
>>         hold the
>>         watermark back until the very last element (at a certain
>>         watermark)
>>         finishes the iteration. The idea is then we could - in theory
>>         - create
>>         an Iteration PTransform, that would take another PTransform
>>         (probably
>>         something like PTransform<PCollection<KV<K, V>>,
>>         PCollection<KV<K,
>>         IterationResult<K, V>>>, where the IterationResult<K, V>
>>         would contain
>>         the original KV<K, V> and a stopping condition (true, false)
>>         and by
>>         creating the feedback loop from the output of this
>>         PCollection we could
>>         actually implement this without any need of support on the
>>         side of runners.
>>
>>         Does that seem like something that might be worth exploring?
>>
>>           Jan
>>

Re: [DISCUSS] Do we have all the building block(s) to support iterations in Beam?

Posted by Jan Lukavský <je...@seznam.cz>.
Right, one can "outsource" this functionality through external source, 
but that is a sort-of hackish solution. The most serious problem is that 
it "disconnects" the watermark of the feedback loop which can make it 
tricky to correctly compute the downstream watermark. The SDF approach 
seems to compute the watermark correctly (using per-key watermark hold 
until the end of the cycle).

On 6/23/21 10:25 PM, Luke Cwik wrote:
> SDF isn't required as users already try to do things like this using 
> UnboundedSource and Pubsub.
>
> On Wed, Jun 23, 2021 at 11:39 AM Reuven Lax <relax@google.com 
> <ma...@google.com>> wrote:
>
>     This was explored in the past, though the design started getting
>     very complex (watermarks of unbounded dimension, where each
>     iteration has its own watermark dimension). At the time, the
>     exploration petered out.
>
>     On Wed, Jun 23, 2021 at 10:13 AM Jan Lukavský <je.ik@seznam.cz
>     <ma...@seznam.cz>> wrote:
>
>         Hi,
>
>         I'd like to discuss a very rough idea. I didn't walk through
>         all the
>         corner cases and the whole idea has a lot of rough edges, so
>         please bear
>         with me. I was thinking about non-IO applications of
>         splittable DoFn,
>         and the main idea - and why it is called splittable - is that
>         it can
>         handle unbounded outputs per element. Then I was thinking
>         about what can
>         generate unbounded outputs per element _without reading from
>         external
>         source_ (as that would be IO application) - and then I
>         realized that the
>         data can - at least theoretically - come from a downstream
>         transform. It
>         would have to be passed over an RPC (gRPC probably)
>         connection, it would
>         probably require some sort of service discovery - as the
>         feedback loop
>         would have to be correctly targeted based on key - and so on
>         (those are
>         the rough edges).
>
>         But supposing this can be solved - what iterations actually
>         mean is the
>         we have a side channel, that come from downstream processing -
>         and we
>         need a watermark estimator for this channel, that is able to
>         hold the
>         watermark back until the very last element (at a certain
>         watermark)
>         finishes the iteration. The idea is then we could - in theory
>         - create
>         an Iteration PTransform, that would take another PTransform
>         (probably
>         something like PTransform<PCollection<KV<K, V>>,
>         PCollection<KV<K,
>         IterationResult<K, V>>>, where the IterationResult<K, V> would
>         contain
>         the original KV<K, V> and a stopping condition (true, false)
>         and by
>         creating the feedback loop from the output of this PCollection
>         we could
>         actually implement this without any need of support on the
>         side of runners.
>
>         Does that seem like something that might be worth exploring?
>
>           Jan
>

Re: [DISCUSS] Do we have all the building block(s) to support iterations in Beam?

Posted by Luke Cwik <lc...@google.com>.
SDF isn't required as users already try to do things like this using
UnboundedSource and Pubsub.

On Wed, Jun 23, 2021 at 11:39 AM Reuven Lax <re...@google.com> wrote:

> This was explored in the past, though the design started getting very
> complex (watermarks of unbounded dimension, where each iteration has its
> own watermark dimension). At the time, the exploration petered out.
>
> On Wed, Jun 23, 2021 at 10:13 AM Jan Lukavský <je...@seznam.cz> wrote:
>
>> Hi,
>>
>> I'd like to discuss a very rough idea. I didn't walk through all the
>> corner cases and the whole idea has a lot of rough edges, so please bear
>> with me. I was thinking about non-IO applications of splittable DoFn,
>> and the main idea - and why it is called splittable - is that it can
>> handle unbounded outputs per element. Then I was thinking about what can
>> generate unbounded outputs per element _without reading from external
>> source_ (as that would be IO application) - and then I realized that the
>> data can - at least theoretically - come from a downstream transform. It
>> would have to be passed over an RPC (gRPC probably) connection, it would
>> probably require some sort of service discovery - as the feedback loop
>> would have to be correctly targeted based on key - and so on (those are
>> the rough edges).
>>
>> But supposing this can be solved - what iterations actually mean is the
>> we have a side channel, that come from downstream processing - and we
>> need a watermark estimator for this channel, that is able to hold the
>> watermark back until the very last element (at a certain watermark)
>> finishes the iteration. The idea is then we could - in theory - create
>> an Iteration PTransform, that would take another PTransform (probably
>> something like PTransform<PCollection<KV<K, V>>, PCollection<KV<K,
>> IterationResult<K, V>>>, where the IterationResult<K, V> would contain
>> the original KV<K, V> and a stopping condition (true, false) and by
>> creating the feedback loop from the output of this PCollection we could
>> actually implement this without any need of support on the side of
>> runners.
>>
>> Does that seem like something that might be worth exploring?
>>
>>   Jan
>>
>>

Re: [DISCUSS] Do we have all the building block(s) to support iterations in Beam?

Posted by Reuven Lax <re...@google.com>.
Yeah, exactly. Systems like Flink that have implemented iteration have (I
believe) simply disabled watermark propagation around cycles instead of
taking on the complexity of implementing vector watermarks.

On Thu, Jun 24, 2021 at 3:06 AM Jan Lukavský <je...@seznam.cz> wrote:

> Hi,
>
> I went through the papers and first of all, sorry, I misinterpreted the
> name, this is not specific to Cloud Dataflow. I think that the biggest
> issue is how to ensure that the iterative transform can make progress _even
> trough the watermark cannot make progress_ (chicken-egg problem, watermark
> can move after the cycle finishes, but the cycle finishes when the
> watermark moves). That is where the vector watermark must come in place and
> where all the stuff gets complicated.
>
> Thanks for this discussion, it helped me clarify some things.
>
>  Jan
> On 6/24/21 1:44 AM, Jan Lukavský wrote:
>
> Hi Kenn,
>
> thanks for the pointers, that is really interesting reading that probably
> could (should) be part of the Beam docs. On the other hand Beam is no
> longer Dataflow only - and that could mean that some of the concepts can be
> reiterated, possibly?
>
> I don't quite understand where is the difference of "source watermark" -
> where the "source SDF" can use output any downstream watermark - and an
> "iterative SDF", cannot ... this feels like it should be the same.
> On 6/24/21 12:47 AM, Kenneth Knowles wrote:
>
> Most of the theory is particularly well-treated in "Timely Dataflow" and
> "Differential Dataflow". There is a brief summary of the latter at
> https://blog.acolyer.org/2015/06/17/differential-dataflow/ but I
> recommend actually reading both papers. It uses clock ticks rather than
> Beam's continuous style of watermark, but I don't think this changes the
> general approach.
>
> There are very few implementations of watermark-correct cycles AFAIK. For
> Beam runners where the watermark is simulated (for example using Spark's
> state) we could possibly implement at the Beam layer. For engines where the
> Beam watermark is implemented more directly (for example Dataflow & Flink)
> there would be a lot of added complexity, probably performance loss, if it
> could be done at all.
>
> Kenn
>
> On Wed, Jun 23, 2021 at 3:31 PM Reuven Lax <re...@google.com> wrote:
>
>>
>>
>> On Wed, Jun 23, 2021 at 2:33 PM Jan Lukavský <je...@seznam.cz> wrote:
>>
>>> On 6/23/21 11:13 PM, Reuven Lax wrote:
>>>
>>>
>>>
>>> On Wed, Jun 23, 2021 at 2:00 PM Jan Lukavský <je...@seznam.cz> wrote:
>>>
>>>> The most qualitatively import use-case I see are ACID transactions -
>>>> transactions naturally involve cycles, because the most natural
>>>> implementation would be of something like "optimistic locking" where the
>>>> transaction is allowed to progress until a downstream "commit" sees a
>>>> conflict, when it needs to return the transaction back to the beginning
>>>> (pretty much the same as how git resolves conflict in a push).
>>>>
>>> True, however within a transform one could use timers to implement this
>>> (there are currently some bugs around looping timers I believe, but those
>>> are easier to fix than implementing a brand new programming model).
>>> Iterative is only really necessary if you need to iterate an entire
>>> subgraph, including GroupByKeys, etc.
>>>
>>> There necessarily needs to be GBK to support transactions. If we take
>>> the most prominent example of a transaction - moving some amount of cash
>>> between two bank accounts - we can have a state for the current amount of
>>> cash per user. This state would be keyed, request to transfer some amount
>>> would come as a atomic event "move amount X from user A to user B". This
>>> would lead to updates of state in keyed state of A and B, but we don't know
>>> if A or B have the required amount on their account. And this is where we
>>> need to do two GBKs - one will assign a sequential ID to each transaction
>>> (that is a serialized order) and the other downstream will verify that the
>>> result was computed from the correct data.
>>>
>>
>> Fair point. This could be done with state/timers in an
>> eventually-consistent way (though not fully ACID) by simply sending
>> messages. However in these sorts of workflow scenarios, the need for back
>> edges will probably come up regardless (e.g. if a failure happens and you
>> want to cancel, you might need a back edge to tell the previous key to
>> cancel).
>>
>> However I'm still not convinced watermarks are needed.
>>
>>> This is maybe too complex to describe in short, but there definitely has
>>> to be GBK (actually GroupAll operation) downstream and a cycle when a
>>> post-condition fails.
>>>
>>>
>>>
>>>> Another application would be graph algorithms on changing graphs, where
>>>> adding or removing an edge might trigger an iterative algorithm on the
>>>> graph (and I'm absolutely not sure that the discussed approach can do that,
>>>> this is just something, that would be cool to do :)).
>>>>
>>> Yes, that's what I had in mind. I'm just not sure that these algorithms
>>> lend themselves to windowing. I.e. if we added iterative support, but did
>>> not have support for windowing or watermarks across iterations, have we
>>> solved most of the problem?
>>>
>>> I don't think there is any windowing involved. When a new road is built
>>> between cities A and B it _immediately_ makes traveling between these two
>>> cities faster. There is no discrete boundary.
>>>
>>> I don't understand why we would drop support for watermarks - they would
>>> be perfectly supported, every iteration key will have a watermark hold that
>>> would be released when the key finished iterating - or was terminated due
>>> to timeout. I'm not sure if windowing as such plays any role in this, but
>>> maybe can.
>>>
>>
>> You'd have to make sure things don't deadlock. If a step inside the
>> transform that was being iterated had an event-time timer, what triggers
>> that timer? If that timer is triggered by the watermark of the previous
>> step and that watermark is being held up by the entire iteration, then this
>> timer will never fire and the whole transform could deadlock. This was one
>> reason for multi-dimensional watermarks - the timer can fire based on the
>> watermark from the previous iterations, and so never deadlocks (though
>> figuring out how to efficiently implement watermarks of unbounded
>> dimensionality might be difficult).
>>
>>
>>
>>
>>> YOn 6/23/21 10:53 PM, Reuven Lax wrote:
>>>>
>>>> One question I have is whether the use cases for cyclic graphs overlap
>>>> substantially with the use cases for event-time watermarks. Many of the
>>>> uses I'm aware of are ML-type algorithms (e.g. clustering) or iterative
>>>> algorithms on large graphs (connected components, etc.), and it's unclear
>>>> how many of these problems have a natural time component.
>>>>
>>>> On Wed, Jun 23, 2021 at 1:49 PM Jan Lukavský <je...@seznam.cz> wrote:
>>>>
>>>>> Reuven, can you please elaborate a little on that? Why do you need
>>>>> watermark per iteration? Letting the watermark progress as soon as all the
>>>>> keys arriving before the upstream watermark terminate the cycle seems like
>>>>> a valid definition without the need to make the watermark multidimensional.
>>>>> Yes, it introduces (possibly unbounded) latency in downstream processing,
>>>>> but that is something that should be probably expected. The unboundness of
>>>>> the latency can be limited by either fixed timeout or number of iterations.
>>>>> On 6/23/21 8:39 PM, Reuven Lax wrote:
>>>>>
>>>>> This was explored in the past, though the design started getting very
>>>>> complex (watermarks of unbounded dimension, where each iteration has its
>>>>> own watermark dimension). At the time, the exploration petered out.
>>>>>
>>>>> On Wed, Jun 23, 2021 at 10:13 AM Jan Lukavský <je...@seznam.cz> wrote:
>>>>>
>>>>>> Hi,
>>>>>>
>>>>>> I'd like to discuss a very rough idea. I didn't walk through all the
>>>>>> corner cases and the whole idea has a lot of rough edges, so please
>>>>>> bear
>>>>>> with me. I was thinking about non-IO applications of splittable DoFn,
>>>>>> and the main idea - and why it is called splittable - is that it can
>>>>>> handle unbounded outputs per element. Then I was thinking about what
>>>>>> can
>>>>>> generate unbounded outputs per element _without reading from external
>>>>>> source_ (as that would be IO application) - and then I realized that
>>>>>> the
>>>>>> data can - at least theoretically - come from a downstream transform.
>>>>>> It
>>>>>> would have to be passed over an RPC (gRPC probably) connection, it
>>>>>> would
>>>>>> probably require some sort of service discovery - as the feedback
>>>>>> loop
>>>>>> would have to be correctly targeted based on key - and so on (those
>>>>>> are
>>>>>> the rough edges).
>>>>>>
>>>>>> But supposing this can be solved - what iterations actually mean is
>>>>>> the
>>>>>> we have a side channel, that come from downstream processing - and we
>>>>>> need a watermark estimator for this channel, that is able to hold the
>>>>>> watermark back until the very last element (at a certain watermark)
>>>>>> finishes the iteration. The idea is then we could - in theory -
>>>>>> create
>>>>>> an Iteration PTransform, that would take another PTransform (probably
>>>>>> something like PTransform<PCollection<KV<K, V>>, PCollection<KV<K,
>>>>>> IterationResult<K, V>>>, where the IterationResult<K, V> would
>>>>>> contain
>>>>>> the original KV<K, V> and a stopping condition (true, false) and by
>>>>>> creating the feedback loop from the output of this PCollection we
>>>>>> could
>>>>>> actually implement this without any need of support on the side of
>>>>>> runners.
>>>>>>
>>>>>> Does that seem like something that might be worth exploring?
>>>>>>
>>>>>>   Jan
>>>>>>
>>>>>>

Re: [DISCUSS] Do we have all the building block(s) to support iterations in Beam?

Posted by Jan Lukavský <je...@seznam.cz>.
Hi,

I went through the papers and first of all, sorry, I misinterpreted the 
name, this is not specific to Cloud Dataflow. I think that the biggest 
issue is how to ensure that the iterative transform can make progress 
_even trough the watermark cannot make progress_ (chicken-egg problem, 
watermark can move after the cycle finishes, but the cycle finishes when 
the watermark moves). That is where the vector watermark must come in 
place and where all the stuff gets complicated.

Thanks for this discussion, it helped me clarify some things.

  Jan

On 6/24/21 1:44 AM, Jan Lukavský wrote:
>
> Hi Kenn,
>
> thanks for the pointers, that is really interesting reading that 
> probably could (should) be part of the Beam docs. On the other hand 
> Beam is no longer Dataflow only - and that could mean that some of the 
> concepts can be reiterated, possibly?
>
> I don't quite understand where is the difference of "source watermark" 
> - where the "source SDF" can use output any downstream watermark - and 
> an "iterative SDF", cannot ... this feels like it should be the same.
>
> On 6/24/21 12:47 AM, Kenneth Knowles wrote:
>> Most of the theory is particularly well-treated in "Timely Dataflow" 
>> and "Differential Dataflow". There is a brief summary of the latter 
>> at https://blog.acolyer.org/2015/06/17/differential-dataflow/ 
>> <https://blog.acolyer.org/2015/06/17/differential-dataflow/> but I 
>> recommend actually reading both papers. It uses clock ticks rather 
>> than Beam's continuous style of watermark, but I don't think this 
>> changes the general approach.
>>
>> There are very few implementations of watermark-correct cycles AFAIK. 
>> For Beam runners where the watermark is simulated (for example using 
>> Spark's state) we could possibly implement at the Beam layer. For 
>> engines where the Beam watermark is implemented more directly (for 
>> example Dataflow & Flink) there would be a lot of added complexity, 
>> probably performance loss, if it could be done at all.
>>
>> Kenn
>>
>> On Wed, Jun 23, 2021 at 3:31 PM Reuven Lax <relax@google.com 
>> <ma...@google.com>> wrote:
>>
>>
>>
>>     On Wed, Jun 23, 2021 at 2:33 PM Jan Lukavský <je.ik@seznam.cz
>>     <ma...@seznam.cz>> wrote:
>>
>>         On 6/23/21 11:13 PM, Reuven Lax wrote:
>>>
>>>
>>>         On Wed, Jun 23, 2021 at 2:00 PM Jan Lukavský
>>>         <je.ik@seznam.cz <ma...@seznam.cz>> wrote:
>>>
>>>             The most qualitatively import use-case I see are ACID
>>>             transactions - transactions naturally involve cycles,
>>>             because the most natural implementation would be of
>>>             something like "optimistic locking" where the
>>>             transaction is allowed to progress until a downstream
>>>             "commit" sees a conflict, when it needs to return the
>>>             transaction back to the beginning (pretty much the same
>>>             as how git resolves conflict in a push).
>>>
>>>         True, however within a transform one could use timers to
>>>         implement this (there are currently some bugs around looping
>>>         timers I believe, but those are easier to fix than
>>>         implementing a brand new programming model). Iterative is
>>>         only really necessary if you need to iterate an entire
>>>         subgraph, including GroupByKeys, etc.
>>
>>         There necessarily needs to be GBK to support transactions. If
>>         we take the most prominent example of a transaction - moving
>>         some amount of cash between two bank accounts - we can have a
>>         state for the current amount of cash per user. This state
>>         would be keyed, request to transfer some amount would come as
>>         a atomic event "move amount X from user A to user B". This
>>         would lead to updates of state in keyed state of A and B, but
>>         we don't know if A or B have the required amount on their
>>         account. And this is where we need to do two GBKs - one will
>>         assign a sequential ID to each transaction (that is a
>>         serialized order) and the other downstream will verify that
>>         the result was computed from the correct data.
>>
>>
>>     Fair point. This could be done with state/timers in an
>>     eventually-consistent way (though not fully ACID) by simply
>>     sending messages. However in these sorts of workflow scenarios,
>>     the need for back edges will probably come up regardless (e.g. if
>>     a failure happens and you want to cancel, you might need a back
>>     edge to tell the previous key to cancel).
>>
>>     However I'm still not convinced watermarks are needed.
>>
>>         This is maybe too complex to describe in short, but there
>>         definitely has to be GBK (actually GroupAll operation)
>>         downstream and a cycle when a post-condition fails.
>>
>>>
>>>             Another application would be graph algorithms on
>>>             changing graphs, where adding or removing an edge might
>>>             trigger an iterative algorithm on the graph (and I'm
>>>             absolutely not sure that the discussed approach can do
>>>             that, this is just something, that would be cool to do :)).
>>>
>>>         Yes, that's what I had in mind. I'm just not sure that these
>>>         algorithms lend themselves to windowing. I.e. if we added
>>>         iterative support, but did not have support for windowing or
>>>         watermarks across iterations, have we solved most of the
>>>         problem?
>>
>>         I don't think there is any windowing involved. When a new
>>         road is built between cities A and B it _immediately_ makes
>>         traveling between these two cities faster. There is no
>>         discrete boundary.
>>
>>         I don't understand why we would drop support for watermarks -
>>         they would be perfectly supported, every iteration key will
>>         have a watermark hold that would be released when the key
>>         finished iterating - or was terminated due to timeout. I'm
>>         not sure if windowing as such plays any role in this, but
>>         maybe can.
>>
>>
>>     You'd have to make sure things don't deadlock. If a step inside
>>     the transform that was being iterated had an event-time timer,
>>     what triggers that timer? If that timer is triggered by the
>>     watermark of the previous step and that watermark is being held
>>     up by the entire iteration, then this timer will never fire and
>>     the whole transform could deadlock. This was one reason for
>>     multi-dimensional watermarks - the timer can fire based on the
>>     watermark from the previous iterations, and so never deadlocks
>>     (though figuring out how to efficiently implement watermarks of
>>     unbounded dimensionality might be difficult).
>>
>>
>>>             YOn 6/23/21 10:53 PM, Reuven Lax wrote:
>>>>             One question I have is whether the use cases for cyclic
>>>>             graphs overlap substantially with the use cases for
>>>>             event-time watermarks. Many of the uses I'm aware of
>>>>             are ML-type algorithms (e.g. clustering) or iterative
>>>>             algorithms on large graphs (connected components,
>>>>             etc.), and it's unclear how many of these problems have
>>>>             a natural time component.
>>>>
>>>>             On Wed, Jun 23, 2021 at 1:49 PM Jan Lukavský
>>>>             <je.ik@seznam.cz <ma...@seznam.cz>> wrote:
>>>>
>>>>                 Reuven, can you please elaborate a little on that?
>>>>                 Why do you need watermark per iteration? Letting
>>>>                 the watermark progress as soon as all the keys
>>>>                 arriving before the upstream watermark terminate
>>>>                 the cycle seems like a valid definition without the
>>>>                 need to make the watermark multidimensional. Yes,
>>>>                 it introduces (possibly unbounded) latency in
>>>>                 downstream processing, but that is something that
>>>>                 should be probably expected. The unboundness of the
>>>>                 latency can be limited by either fixed timeout or
>>>>                 number of iterations.
>>>>
>>>>                 On 6/23/21 8:39 PM, Reuven Lax wrote:
>>>>>                 This was explored in the past, though the design
>>>>>                 started getting very complex (watermarks of
>>>>>                 unbounded dimension, where each iteration has its
>>>>>                 own watermark dimension). At the time, the
>>>>>                 exploration petered out.
>>>>>
>>>>>                 On Wed, Jun 23, 2021 at 10:13 AM Jan Lukavský
>>>>>                 <je.ik@seznam.cz <ma...@seznam.cz>> wrote:
>>>>>
>>>>>                     Hi,
>>>>>
>>>>>                     I'd like to discuss a very rough idea. I
>>>>>                     didn't walk through all the
>>>>>                     corner cases and the whole idea has a lot of
>>>>>                     rough edges, so please bear
>>>>>                     with me. I was thinking about non-IO
>>>>>                     applications of splittable DoFn,
>>>>>                     and the main idea - and why it is called
>>>>>                     splittable - is that it can
>>>>>                     handle unbounded outputs per element. Then I
>>>>>                     was thinking about what can
>>>>>                     generate unbounded outputs per element
>>>>>                     _without reading from external
>>>>>                     source_ (as that would be IO application) -
>>>>>                     and then I realized that the
>>>>>                     data can - at least theoretically - come from
>>>>>                     a downstream transform. It
>>>>>                     would have to be passed over an RPC (gRPC
>>>>>                     probably) connection, it would
>>>>>                     probably require some sort of service
>>>>>                     discovery - as the feedback loop
>>>>>                     would have to be correctly targeted based on
>>>>>                     key - and so on (those are
>>>>>                     the rough edges).
>>>>>
>>>>>                     But supposing this can be solved - what
>>>>>                     iterations actually mean is the
>>>>>                     we have a side channel, that come from
>>>>>                     downstream processing - and we
>>>>>                     need a watermark estimator for this channel,
>>>>>                     that is able to hold the
>>>>>                     watermark back until the very last element (at
>>>>>                     a certain watermark)
>>>>>                     finishes the iteration. The idea is then we
>>>>>                     could - in theory - create
>>>>>                     an Iteration PTransform, that would take
>>>>>                     another PTransform (probably
>>>>>                     something like PTransform<PCollection<KV<K,
>>>>>                     V>>, PCollection<KV<K,
>>>>>                     IterationResult<K, V>>>, where the
>>>>>                     IterationResult<K, V> would contain
>>>>>                     the original KV<K, V> and a stopping condition
>>>>>                     (true, false) and by
>>>>>                     creating the feedback loop from the output of
>>>>>                     this PCollection we could
>>>>>                     actually implement this without any need of
>>>>>                     support on the side of runners.
>>>>>
>>>>>                     Does that seem like something that might be
>>>>>                     worth exploring?
>>>>>
>>>>>                       Jan
>>>>>

Re: [DISCUSS] Do we have all the building block(s) to support iterations in Beam?

Posted by Jan Lukavský <je...@seznam.cz>.
Hi Kenn,

thanks for the pointers, that is really interesting reading that 
probably could (should) be part of the Beam docs. On the other hand Beam 
is no longer Dataflow only - and that could mean that some of the 
concepts can be reiterated, possibly?

I don't quite understand where is the difference of "source watermark" - 
where the "source SDF" can use output any downstream watermark - and an 
"iterative SDF", cannot ... this feels like it should be the same.

On 6/24/21 12:47 AM, Kenneth Knowles wrote:
> Most of the theory is particularly well-treated in "Timely Dataflow" 
> and "Differential Dataflow". There is a brief summary of the latter at 
> https://blog.acolyer.org/2015/06/17/differential-dataflow/ 
> <https://blog.acolyer.org/2015/06/17/differential-dataflow/> but I 
> recommend actually reading both papers. It uses clock ticks rather 
> than Beam's continuous style of watermark, but I don't think this 
> changes the general approach.
>
> There are very few implementations of watermark-correct cycles AFAIK. 
> For Beam runners where the watermark is simulated (for example using 
> Spark's state) we could possibly implement at the Beam layer. For 
> engines where the Beam watermark is implemented more directly (for 
> example Dataflow & Flink) there would be a lot of added complexity, 
> probably performance loss, if it could be done at all.
>
> Kenn
>
> On Wed, Jun 23, 2021 at 3:31 PM Reuven Lax <relax@google.com 
> <ma...@google.com>> wrote:
>
>
>
>     On Wed, Jun 23, 2021 at 2:33 PM Jan Lukavský <je.ik@seznam.cz
>     <ma...@seznam.cz>> wrote:
>
>         On 6/23/21 11:13 PM, Reuven Lax wrote:
>>
>>
>>         On Wed, Jun 23, 2021 at 2:00 PM Jan Lukavský <je.ik@seznam.cz
>>         <ma...@seznam.cz>> wrote:
>>
>>             The most qualitatively import use-case I see are ACID
>>             transactions - transactions naturally involve cycles,
>>             because the most natural implementation would be of
>>             something like "optimistic locking" where the transaction
>>             is allowed to progress until a downstream "commit" sees a
>>             conflict, when it needs to return the transaction back to
>>             the beginning (pretty much the same as how git resolves
>>             conflict in a push).
>>
>>         True, however within a transform one could use timers to
>>         implement this (there are currently some bugs around looping
>>         timers I believe, but those are easier to fix than
>>         implementing a brand new programming model). Iterative is
>>         only really necessary if you need to iterate an entire
>>         subgraph, including GroupByKeys, etc.
>
>         There necessarily needs to be GBK to support transactions. If
>         we take the most prominent example of a transaction - moving
>         some amount of cash between two bank accounts - we can have a
>         state for the current amount of cash per user. This state
>         would be keyed, request to transfer some amount would come as
>         a atomic event "move amount X from user A to user B". This
>         would lead to updates of state in keyed state of A and B, but
>         we don't know if A or B have the required amount on their
>         account. And this is where we need to do two GBKs - one will
>         assign a sequential ID to each transaction (that is a
>         serialized order) and the other downstream will verify that
>         the result was computed from the correct data.
>
>
>     Fair point. This could be done with state/timers in an
>     eventually-consistent way (though not fully ACID) by simply
>     sending messages. However in these sorts of workflow scenarios,
>     the need for back edges will probably come up regardless (e.g. if
>     a failure happens and you want to cancel, you might need a back
>     edge to tell the previous key to cancel).
>
>     However I'm still not convinced watermarks are needed.
>
>         This is maybe too complex to describe in short, but there
>         definitely has to be GBK (actually GroupAll operation)
>         downstream and a cycle when a post-condition fails.
>
>>
>>             Another application would be graph algorithms on changing
>>             graphs, where adding or removing an edge might trigger an
>>             iterative algorithm on the graph (and I'm absolutely not
>>             sure that the discussed approach can do that, this is
>>             just something, that would be cool to do :)).
>>
>>         Yes, that's what I had in mind. I'm just not sure that these
>>         algorithms lend themselves to windowing. I.e. if we added
>>         iterative support, but did not have support for windowing or
>>         watermarks across iterations, have we solved most of the
>>         problem?
>
>         I don't think there is any windowing involved. When a new road
>         is built between cities A and B it _immediately_ makes
>         traveling between these two cities faster. There is no
>         discrete boundary.
>
>         I don't understand why we would drop support for watermarks -
>         they would be perfectly supported, every iteration key will
>         have a watermark hold that would be released when the key
>         finished iterating - or was terminated due to timeout. I'm not
>         sure if windowing as such plays any role in this, but maybe can.
>
>
>     You'd have to make sure things don't deadlock. If a step inside
>     the transform that was being iterated had an event-time timer,
>     what triggers that timer? If that timer is triggered by the
>     watermark of the previous step and that watermark is being held up
>     by the entire iteration, then this timer will never fire and the
>     whole transform could deadlock. This was one reason for
>     multi-dimensional watermarks - the timer can fire based on the
>     watermark from the previous iterations, and so never deadlocks
>     (though figuring out how to efficiently implement watermarks of
>     unbounded dimensionality might be difficult).
>
>
>>             YOn 6/23/21 10:53 PM, Reuven Lax wrote:
>>>             One question I have is whether the use cases for cyclic
>>>             graphs overlap substantially with the use cases for
>>>             event-time watermarks. Many of the uses I'm aware of are
>>>             ML-type algorithms (e.g. clustering) or iterative
>>>             algorithms on large graphs (connected components, etc.),
>>>             and it's unclear how many of these problems have a
>>>             natural time component.
>>>
>>>             On Wed, Jun 23, 2021 at 1:49 PM Jan Lukavský
>>>             <je.ik@seznam.cz <ma...@seznam.cz>> wrote:
>>>
>>>                 Reuven, can you please elaborate a little on that?
>>>                 Why do you need watermark per iteration? Letting the
>>>                 watermark progress as soon as all the keys arriving
>>>                 before the upstream watermark terminate the cycle
>>>                 seems like a valid definition without the need to
>>>                 make the watermark multidimensional. Yes, it
>>>                 introduces (possibly unbounded) latency in
>>>                 downstream processing, but that is something that
>>>                 should be probably expected. The unboundness of the
>>>                 latency can be limited by either fixed timeout or
>>>                 number of iterations.
>>>
>>>                 On 6/23/21 8:39 PM, Reuven Lax wrote:
>>>>                 This was explored in the past, though the design
>>>>                 started getting very complex (watermarks of
>>>>                 unbounded dimension, where each iteration has its
>>>>                 own watermark dimension). At the time, the
>>>>                 exploration petered out.
>>>>
>>>>                 On Wed, Jun 23, 2021 at 10:13 AM Jan Lukavský
>>>>                 <je.ik@seznam.cz <ma...@seznam.cz>> wrote:
>>>>
>>>>                     Hi,
>>>>
>>>>                     I'd like to discuss a very rough idea. I didn't
>>>>                     walk through all the
>>>>                     corner cases and the whole idea has a lot of
>>>>                     rough edges, so please bear
>>>>                     with me. I was thinking about non-IO
>>>>                     applications of splittable DoFn,
>>>>                     and the main idea - and why it is called
>>>>                     splittable - is that it can
>>>>                     handle unbounded outputs per element. Then I
>>>>                     was thinking about what can
>>>>                     generate unbounded outputs per element _without
>>>>                     reading from external
>>>>                     source_ (as that would be IO application) - and
>>>>                     then I realized that the
>>>>                     data can - at least theoretically - come from a
>>>>                     downstream transform. It
>>>>                     would have to be passed over an RPC (gRPC
>>>>                     probably) connection, it would
>>>>                     probably require some sort of service discovery
>>>>                     - as the feedback loop
>>>>                     would have to be correctly targeted based on
>>>>                     key - and so on (those are
>>>>                     the rough edges).
>>>>
>>>>                     But supposing this can be solved - what
>>>>                     iterations actually mean is the
>>>>                     we have a side channel, that come from
>>>>                     downstream processing - and we
>>>>                     need a watermark estimator for this channel,
>>>>                     that is able to hold the
>>>>                     watermark back until the very last element (at
>>>>                     a certain watermark)
>>>>                     finishes the iteration. The idea is then we
>>>>                     could - in theory - create
>>>>                     an Iteration PTransform, that would take
>>>>                     another PTransform (probably
>>>>                     something like PTransform<PCollection<KV<K,
>>>>                     V>>, PCollection<KV<K,
>>>>                     IterationResult<K, V>>>, where the
>>>>                     IterationResult<K, V> would contain
>>>>                     the original KV<K, V> and a stopping condition
>>>>                     (true, false) and by
>>>>                     creating the feedback loop from the output of
>>>>                     this PCollection we could
>>>>                     actually implement this without any need of
>>>>                     support on the side of runners.
>>>>
>>>>                     Does that seem like something that might be
>>>>                     worth exploring?
>>>>
>>>>                       Jan
>>>>

Re: [DISCUSS] Do we have all the building block(s) to support iterations in Beam?

Posted by Kenneth Knowles <ke...@apache.org>.
Most of the theory is particularly well-treated in "Timely Dataflow" and
"Differential Dataflow". There is a brief summary of the latter at
https://blog.acolyer.org/2015/06/17/differential-dataflow/ but I recommend
actually reading both papers. It uses clock ticks rather than Beam's
continuous style of watermark, but I don't think this changes the general
approach.

There are very few implementations of watermark-correct cycles AFAIK. For
Beam runners where the watermark is simulated (for example using Spark's
state) we could possibly implement at the Beam layer. For engines where the
Beam watermark is implemented more directly (for example Dataflow & Flink)
there would be a lot of added complexity, probably performance loss, if it
could be done at all.

Kenn

On Wed, Jun 23, 2021 at 3:31 PM Reuven Lax <re...@google.com> wrote:

>
>
> On Wed, Jun 23, 2021 at 2:33 PM Jan Lukavský <je...@seznam.cz> wrote:
>
>> On 6/23/21 11:13 PM, Reuven Lax wrote:
>>
>>
>>
>> On Wed, Jun 23, 2021 at 2:00 PM Jan Lukavský <je...@seznam.cz> wrote:
>>
>>> The most qualitatively import use-case I see are ACID transactions -
>>> transactions naturally involve cycles, because the most natural
>>> implementation would be of something like "optimistic locking" where the
>>> transaction is allowed to progress until a downstream "commit" sees a
>>> conflict, when it needs to return the transaction back to the beginning
>>> (pretty much the same as how git resolves conflict in a push).
>>>
>> True, however within a transform one could use timers to implement this
>> (there are currently some bugs around looping timers I believe, but those
>> are easier to fix than implementing a brand new programming model).
>> Iterative is only really necessary if you need to iterate an entire
>> subgraph, including GroupByKeys, etc.
>>
>> There necessarily needs to be GBK to support transactions. If we take the
>> most prominent example of a transaction - moving some amount of cash
>> between two bank accounts - we can have a state for the current amount of
>> cash per user. This state would be keyed, request to transfer some amount
>> would come as a atomic event "move amount X from user A to user B". This
>> would lead to updates of state in keyed state of A and B, but we don't know
>> if A or B have the required amount on their account. And this is where we
>> need to do two GBKs - one will assign a sequential ID to each transaction
>> (that is a serialized order) and the other downstream will verify that the
>> result was computed from the correct data.
>>
>
> Fair point. This could be done with state/timers in an
> eventually-consistent way (though not fully ACID) by simply sending
> messages. However in these sorts of workflow scenarios, the need for back
> edges will probably come up regardless (e.g. if a failure happens and you
> want to cancel, you might need a back edge to tell the previous key to
> cancel).
>
> However I'm still not convinced watermarks are needed.
>
>> This is maybe too complex to describe in short, but there definitely has
>> to be GBK (actually GroupAll operation) downstream and a cycle when a
>> post-condition fails.
>>
>>
>>
>>> Another application would be graph algorithms on changing graphs, where
>>> adding or removing an edge might trigger an iterative algorithm on the
>>> graph (and I'm absolutely not sure that the discussed approach can do that,
>>> this is just something, that would be cool to do :)).
>>>
>> Yes, that's what I had in mind. I'm just not sure that these algorithms
>> lend themselves to windowing. I.e. if we added iterative support, but did
>> not have support for windowing or watermarks across iterations, have we
>> solved most of the problem?
>>
>> I don't think there is any windowing involved. When a new road is built
>> between cities A and B it _immediately_ makes traveling between these two
>> cities faster. There is no discrete boundary.
>>
>> I don't understand why we would drop support for watermarks - they would
>> be perfectly supported, every iteration key will have a watermark hold that
>> would be released when the key finished iterating - or was terminated due
>> to timeout. I'm not sure if windowing as such plays any role in this, but
>> maybe can.
>>
>
> You'd have to make sure things don't deadlock. If a step inside the
> transform that was being iterated had an event-time timer, what triggers
> that timer? If that timer is triggered by the watermark of the previous
> step and that watermark is being held up by the entire iteration, then this
> timer will never fire and the whole transform could deadlock. This was one
> reason for multi-dimensional watermarks - the timer can fire based on the
> watermark from the previous iterations, and so never deadlocks (though
> figuring out how to efficiently implement watermarks of unbounded
> dimensionality might be difficult).
>
>
>
>
>> YOn 6/23/21 10:53 PM, Reuven Lax wrote:
>>>
>>> One question I have is whether the use cases for cyclic graphs overlap
>>> substantially with the use cases for event-time watermarks. Many of the
>>> uses I'm aware of are ML-type algorithms (e.g. clustering) or iterative
>>> algorithms on large graphs (connected components, etc.), and it's unclear
>>> how many of these problems have a natural time component.
>>>
>>> On Wed, Jun 23, 2021 at 1:49 PM Jan Lukavský <je...@seznam.cz> wrote:
>>>
>>>> Reuven, can you please elaborate a little on that? Why do you need
>>>> watermark per iteration? Letting the watermark progress as soon as all the
>>>> keys arriving before the upstream watermark terminate the cycle seems like
>>>> a valid definition without the need to make the watermark multidimensional.
>>>> Yes, it introduces (possibly unbounded) latency in downstream processing,
>>>> but that is something that should be probably expected. The unboundness of
>>>> the latency can be limited by either fixed timeout or number of iterations.
>>>> On 6/23/21 8:39 PM, Reuven Lax wrote:
>>>>
>>>> This was explored in the past, though the design started getting very
>>>> complex (watermarks of unbounded dimension, where each iteration has its
>>>> own watermark dimension). At the time, the exploration petered out.
>>>>
>>>> On Wed, Jun 23, 2021 at 10:13 AM Jan Lukavský <je...@seznam.cz> wrote:
>>>>
>>>>> Hi,
>>>>>
>>>>> I'd like to discuss a very rough idea. I didn't walk through all the
>>>>> corner cases and the whole idea has a lot of rough edges, so please
>>>>> bear
>>>>> with me. I was thinking about non-IO applications of splittable DoFn,
>>>>> and the main idea - and why it is called splittable - is that it can
>>>>> handle unbounded outputs per element. Then I was thinking about what
>>>>> can
>>>>> generate unbounded outputs per element _without reading from external
>>>>> source_ (as that would be IO application) - and then I realized that
>>>>> the
>>>>> data can - at least theoretically - come from a downstream transform.
>>>>> It
>>>>> would have to be passed over an RPC (gRPC probably) connection, it
>>>>> would
>>>>> probably require some sort of service discovery - as the feedback loop
>>>>> would have to be correctly targeted based on key - and so on (those
>>>>> are
>>>>> the rough edges).
>>>>>
>>>>> But supposing this can be solved - what iterations actually mean is
>>>>> the
>>>>> we have a side channel, that come from downstream processing - and we
>>>>> need a watermark estimator for this channel, that is able to hold the
>>>>> watermark back until the very last element (at a certain watermark)
>>>>> finishes the iteration. The idea is then we could - in theory - create
>>>>> an Iteration PTransform, that would take another PTransform (probably
>>>>> something like PTransform<PCollection<KV<K, V>>, PCollection<KV<K,
>>>>> IterationResult<K, V>>>, where the IterationResult<K, V> would contain
>>>>> the original KV<K, V> and a stopping condition (true, false) and by
>>>>> creating the feedback loop from the output of this PCollection we
>>>>> could
>>>>> actually implement this without any need of support on the side of
>>>>> runners.
>>>>>
>>>>> Does that seem like something that might be worth exploring?
>>>>>
>>>>>   Jan
>>>>>
>>>>>

Re: [DISCUSS] Do we have all the building block(s) to support iterations in Beam?

Posted by Jan Lukavský <je...@seznam.cz>.
 > If that timer is triggered by the watermark of the previous step and 
that watermark is being held up by the entire iteration, then this timer 
will never fire and the whole transform could deadlock. This was one 
reason for multi-dimensional watermarks - the timer can fire based on 
the watermark from the previous iterations, and so never deadlocks 
(though figuring out how to efficiently implement watermarks of 
unbounded dimensionality might be difficult).

That makes sense. However, if you implement endless "for" cycle, that 
will cycle for ever - is that something that the tool you are using 
should avoid? Should we ban for-cycles just because it can lead to 
infinite loops? Another important detail - there is no change in the 
Beam model needed. The purpose of this thread is - are we already there? 
Is really SDF filling the gap in both the source, but as well the 
iteration "gap"? Iteration is for the time being the domain of batch, 
which is where the unified approach looses its points.

On 6/24/21 12:31 AM, Reuven Lax wrote:
>
>
> On Wed, Jun 23, 2021 at 2:33 PM Jan Lukavský <je.ik@seznam.cz 
> <ma...@seznam.cz>> wrote:
>
>     On 6/23/21 11:13 PM, Reuven Lax wrote:
>>
>>
>>     On Wed, Jun 23, 2021 at 2:00 PM Jan Lukavský <je.ik@seznam.cz
>>     <ma...@seznam.cz>> wrote:
>>
>>         The most qualitatively import use-case I see are ACID
>>         transactions - transactions naturally involve cycles, because
>>         the most natural implementation would be of something like
>>         "optimistic locking" where the transaction is allowed to
>>         progress until a downstream "commit" sees a conflict, when it
>>         needs to return the transaction back to the beginning (pretty
>>         much the same as how git resolves conflict in a push).
>>
>>     True, however within a transform one could use timers to
>>     implement this (there are currently some bugs around looping
>>     timers I believe, but those are easier to fix than implementing a
>>     brand new programming model). Iterative is only really necessary
>>     if you need to iterate an entire subgraph, including GroupByKeys,
>>     etc.
>
>     There necessarily needs to be GBK to support transactions. If we
>     take the most prominent example of a transaction - moving some
>     amount of cash between two bank accounts - we can have a state for
>     the current amount of cash per user. This state would be keyed,
>     request to transfer some amount would come as a atomic event "move
>     amount X from user A to user B". This would lead to updates of
>     state in keyed state of A and B, but we don't know if A or B have
>     the required amount on their account. And this is where we need to
>     do two GBKs - one will assign a sequential ID to each transaction
>     (that is a serialized order) and the other downstream will verify
>     that the result was computed from the correct data.
>
>
> Fair point. This could be done with state/timers in an 
> eventually-consistent way (though not fully ACID) by simply sending 
> messages. However in these sorts of workflow scenarios, the need for 
> back edges will probably come up regardless (e.g. if a failure happens 
> and you want to cancel, you might need a back edge to tell the 
> previous key to cancel).
>
> However I'm still not convinced watermarks are needed.
>
>     This is maybe too complex to describe in short, but there
>     definitely has to be GBK (actually GroupAll operation) downstream
>     and a cycle when a post-condition fails.
>
>>
>>         Another application would be graph algorithms on changing
>>         graphs, where adding or removing an edge might trigger an
>>         iterative algorithm on the graph (and I'm absolutely not sure
>>         that the discussed approach can do that, this is just
>>         something, that would be cool to do :)).
>>
>>     Yes, that's what I had in mind. I'm just not sure that these
>>     algorithms lend themselves to windowing. I.e. if we added
>>     iterative support, but did not have support for windowing or
>>     watermarks across iterations, have we solved most of the problem?
>
>     I don't think there is any windowing involved. When a new road is
>     built between cities A and B it _immediately_ makes traveling
>     between these two cities faster. There is no discrete boundary.
>
>     I don't understand why we would drop support for watermarks - they
>     would be perfectly supported, every iteration key will have a
>     watermark hold that would be released when the key finished
>     iterating - or was terminated due to timeout. I'm not sure if
>     windowing as such plays any role in this, but maybe can.
>
>
> You'd have to make sure things don't deadlock. If a step inside the 
> transform that was being iterated had an event-time timer, what 
> triggers that timer? If that timer is triggered by the watermark of 
> the previous step and that watermark is being held up by the entire 
> iteration, then this timer will never fire and the whole transform 
> could deadlock. This was one reason for multi-dimensional watermarks - 
> the timer can fire based on the watermark from the previous 
> iterations, and so never deadlocks (though figuring out how to 
> efficiently implement watermarks of unbounded dimensionality might be 
> difficult).
>
>
>>         YOn 6/23/21 10:53 PM, Reuven Lax wrote:
>>>         One question I have is whether the use cases for cyclic
>>>         graphs overlap substantially with the use cases for
>>>         event-time watermarks. Many of the uses I'm aware of are
>>>         ML-type algorithms (e.g. clustering) or iterative algorithms
>>>         on large graphs (connected components, etc.), and it's
>>>         unclear how many of these problems have a natural time
>>>         component.
>>>
>>>         On Wed, Jun 23, 2021 at 1:49 PM Jan Lukavský
>>>         <je.ik@seznam.cz <ma...@seznam.cz>> wrote:
>>>
>>>             Reuven, can you please elaborate a little on that? Why
>>>             do you need watermark per iteration? Letting the
>>>             watermark progress as soon as all the keys arriving
>>>             before the upstream watermark terminate the cycle seems
>>>             like a valid definition without the need to make the
>>>             watermark multidimensional. Yes, it introduces (possibly
>>>             unbounded) latency in downstream processing, but that is
>>>             something that should be probably expected. The
>>>             unboundness of the latency can be limited by either
>>>             fixed timeout or number of iterations.
>>>
>>>             On 6/23/21 8:39 PM, Reuven Lax wrote:
>>>>             This was explored in the past, though the design
>>>>             started getting very complex (watermarks of unbounded
>>>>             dimension, where each iteration has its own watermark
>>>>             dimension). At the time, the exploration petered out.
>>>>
>>>>             On Wed, Jun 23, 2021 at 10:13 AM Jan Lukavský
>>>>             <je.ik@seznam.cz <ma...@seznam.cz>> wrote:
>>>>
>>>>                 Hi,
>>>>
>>>>                 I'd like to discuss a very rough idea. I didn't
>>>>                 walk through all the
>>>>                 corner cases and the whole idea has a lot of rough
>>>>                 edges, so please bear
>>>>                 with me. I was thinking about non-IO applications
>>>>                 of splittable DoFn,
>>>>                 and the main idea - and why it is called splittable
>>>>                 - is that it can
>>>>                 handle unbounded outputs per element. Then I was
>>>>                 thinking about what can
>>>>                 generate unbounded outputs per element _without
>>>>                 reading from external
>>>>                 source_ (as that would be IO application) - and
>>>>                 then I realized that the
>>>>                 data can - at least theoretically - come from a
>>>>                 downstream transform. It
>>>>                 would have to be passed over an RPC (gRPC probably)
>>>>                 connection, it would
>>>>                 probably require some sort of service discovery -
>>>>                 as the feedback loop
>>>>                 would have to be correctly targeted based on key -
>>>>                 and so on (those are
>>>>                 the rough edges).
>>>>
>>>>                 But supposing this can be solved - what iterations
>>>>                 actually mean is the
>>>>                 we have a side channel, that come from downstream
>>>>                 processing - and we
>>>>                 need a watermark estimator for this channel, that
>>>>                 is able to hold the
>>>>                 watermark back until the very last element (at a
>>>>                 certain watermark)
>>>>                 finishes the iteration. The idea is then we could -
>>>>                 in theory - create
>>>>                 an Iteration PTransform, that would take another
>>>>                 PTransform (probably
>>>>                 something like PTransform<PCollection<KV<K, V>>,
>>>>                 PCollection<KV<K,
>>>>                 IterationResult<K, V>>>, where the
>>>>                 IterationResult<K, V> would contain
>>>>                 the original KV<K, V> and a stopping condition
>>>>                 (true, false) and by
>>>>                 creating the feedback loop from the output of this
>>>>                 PCollection we could
>>>>                 actually implement this without any need of support
>>>>                 on the side of runners.
>>>>
>>>>                 Does that seem like something that might be worth
>>>>                 exploring?
>>>>
>>>>                   Jan
>>>>

Re: [DISCUSS] Do we have all the building block(s) to support iterations in Beam?

Posted by Reuven Lax <re...@google.com>.
On Wed, Jun 23, 2021 at 2:33 PM Jan Lukavský <je...@seznam.cz> wrote:

> On 6/23/21 11:13 PM, Reuven Lax wrote:
>
>
>
> On Wed, Jun 23, 2021 at 2:00 PM Jan Lukavský <je...@seznam.cz> wrote:
>
>> The most qualitatively import use-case I see are ACID transactions -
>> transactions naturally involve cycles, because the most natural
>> implementation would be of something like "optimistic locking" where the
>> transaction is allowed to progress until a downstream "commit" sees a
>> conflict, when it needs to return the transaction back to the beginning
>> (pretty much the same as how git resolves conflict in a push).
>>
> True, however within a transform one could use timers to implement this
> (there are currently some bugs around looping timers I believe, but those
> are easier to fix than implementing a brand new programming model).
> Iterative is only really necessary if you need to iterate an entire
> subgraph, including GroupByKeys, etc.
>
> There necessarily needs to be GBK to support transactions. If we take the
> most prominent example of a transaction - moving some amount of cash
> between two bank accounts - we can have a state for the current amount of
> cash per user. This state would be keyed, request to transfer some amount
> would come as a atomic event "move amount X from user A to user B". This
> would lead to updates of state in keyed state of A and B, but we don't know
> if A or B have the required amount on their account. And this is where we
> need to do two GBKs - one will assign a sequential ID to each transaction
> (that is a serialized order) and the other downstream will verify that the
> result was computed from the correct data.
>

Fair point. This could be done with state/timers in an
eventually-consistent way (though not fully ACID) by simply sending
messages. However in these sorts of workflow scenarios, the need for back
edges will probably come up regardless (e.g. if a failure happens and you
want to cancel, you might need a back edge to tell the previous key to
cancel).

However I'm still not convinced watermarks are needed.

> This is maybe too complex to describe in short, but there definitely has
> to be GBK (actually GroupAll operation) downstream and a cycle when a
> post-condition fails.
>
>
>
>> Another application would be graph algorithms on changing graphs, where
>> adding or removing an edge might trigger an iterative algorithm on the
>> graph (and I'm absolutely not sure that the discussed approach can do that,
>> this is just something, that would be cool to do :)).
>>
> Yes, that's what I had in mind. I'm just not sure that these algorithms
> lend themselves to windowing. I.e. if we added iterative support, but did
> not have support for windowing or watermarks across iterations, have we
> solved most of the problem?
>
> I don't think there is any windowing involved. When a new road is built
> between cities A and B it _immediately_ makes traveling between these two
> cities faster. There is no discrete boundary.
>
> I don't understand why we would drop support for watermarks - they would
> be perfectly supported, every iteration key will have a watermark hold that
> would be released when the key finished iterating - or was terminated due
> to timeout. I'm not sure if windowing as such plays any role in this, but
> maybe can.
>

You'd have to make sure things don't deadlock. If a step inside the
transform that was being iterated had an event-time timer, what triggers
that timer? If that timer is triggered by the watermark of the previous
step and that watermark is being held up by the entire iteration, then this
timer will never fire and the whole transform could deadlock. This was one
reason for multi-dimensional watermarks - the timer can fire based on the
watermark from the previous iterations, and so never deadlocks (though
figuring out how to efficiently implement watermarks of unbounded
dimensionality might be difficult).




> YOn 6/23/21 10:53 PM, Reuven Lax wrote:
>>
>> One question I have is whether the use cases for cyclic graphs overlap
>> substantially with the use cases for event-time watermarks. Many of the
>> uses I'm aware of are ML-type algorithms (e.g. clustering) or iterative
>> algorithms on large graphs (connected components, etc.), and it's unclear
>> how many of these problems have a natural time component.
>>
>> On Wed, Jun 23, 2021 at 1:49 PM Jan Lukavský <je...@seznam.cz> wrote:
>>
>>> Reuven, can you please elaborate a little on that? Why do you need
>>> watermark per iteration? Letting the watermark progress as soon as all the
>>> keys arriving before the upstream watermark terminate the cycle seems like
>>> a valid definition without the need to make the watermark multidimensional.
>>> Yes, it introduces (possibly unbounded) latency in downstream processing,
>>> but that is something that should be probably expected. The unboundness of
>>> the latency can be limited by either fixed timeout or number of iterations.
>>> On 6/23/21 8:39 PM, Reuven Lax wrote:
>>>
>>> This was explored in the past, though the design started getting very
>>> complex (watermarks of unbounded dimension, where each iteration has its
>>> own watermark dimension). At the time, the exploration petered out.
>>>
>>> On Wed, Jun 23, 2021 at 10:13 AM Jan Lukavský <je...@seznam.cz> wrote:
>>>
>>>> Hi,
>>>>
>>>> I'd like to discuss a very rough idea. I didn't walk through all the
>>>> corner cases and the whole idea has a lot of rough edges, so please
>>>> bear
>>>> with me. I was thinking about non-IO applications of splittable DoFn,
>>>> and the main idea - and why it is called splittable - is that it can
>>>> handle unbounded outputs per element. Then I was thinking about what
>>>> can
>>>> generate unbounded outputs per element _without reading from external
>>>> source_ (as that would be IO application) - and then I realized that
>>>> the
>>>> data can - at least theoretically - come from a downstream transform.
>>>> It
>>>> would have to be passed over an RPC (gRPC probably) connection, it
>>>> would
>>>> probably require some sort of service discovery - as the feedback loop
>>>> would have to be correctly targeted based on key - and so on (those are
>>>> the rough edges).
>>>>
>>>> But supposing this can be solved - what iterations actually mean is the
>>>> we have a side channel, that come from downstream processing - and we
>>>> need a watermark estimator for this channel, that is able to hold the
>>>> watermark back until the very last element (at a certain watermark)
>>>> finishes the iteration. The idea is then we could - in theory - create
>>>> an Iteration PTransform, that would take another PTransform (probably
>>>> something like PTransform<PCollection<KV<K, V>>, PCollection<KV<K,
>>>> IterationResult<K, V>>>, where the IterationResult<K, V> would contain
>>>> the original KV<K, V> and a stopping condition (true, false) and by
>>>> creating the feedback loop from the output of this PCollection we could
>>>> actually implement this without any need of support on the side of
>>>> runners.
>>>>
>>>> Does that seem like something that might be worth exploring?
>>>>
>>>>   Jan
>>>>
>>>>

Re: [DISCUSS] Do we have all the building block(s) to support iterations in Beam?

Posted by Jan Lukavský <je...@seznam.cz>.
On 6/23/21 11:13 PM, Reuven Lax wrote:
>
>
> On Wed, Jun 23, 2021 at 2:00 PM Jan Lukavský <je.ik@seznam.cz 
> <ma...@seznam.cz>> wrote:
>
>     The most qualitatively import use-case I see are ACID transactions
>     - transactions naturally involve cycles, because the most natural
>     implementation would be of something like "optimistic locking"
>     where the transaction is allowed to progress until a downstream
>     "commit" sees a conflict, when it needs to return the transaction
>     back to the beginning (pretty much the same as how git resolves
>     conflict in a push).
>
> True, however within a transform one could use timers to implement 
> this (there are currently some bugs around looping timers I believe, 
> but those are easier to fix than implementing a brand new programming 
> model). Iterative is only really necessary if you need to iterate an 
> entire subgraph, including GroupByKeys, etc.

There necessarily needs to be GBK to support transactions. If we take 
the most prominent example of a transaction - moving some amount of cash 
between two bank accounts - we can have a state for the current amount 
of cash per user. This state would be keyed, request to transfer some 
amount would come as a atomic event "move amount X from user A to user 
B". This would lead to updates of state in keyed state of A and B, but 
we don't know if A or B have the required amount on their account. And 
this is where we need to do two GBKs - one will assign a sequential ID 
to each transaction (that is a serialized order) and the other 
downstream will verify that the result was computed from the correct data.

This is maybe too complex to describe in short, but there definitely has 
to be GBK (actually GroupAll operation) downstream and a cycle when a 
post-condition fails.

>
>     Another application would be graph algorithms on changing graphs,
>     where adding or removing an edge might trigger an iterative
>     algorithm on the graph (and I'm absolutely not sure that the
>     discussed approach can do that, this is just something, that would
>     be cool to do :)).
>
> Yes, that's what I had in mind. I'm just not sure that these 
> algorithms lend themselves to windowing. I.e. if we added iterative 
> support, but did not have support for windowing or watermarks across 
> iterations, have we solved most of the problem?

I don't think there is any windowing involved. When a new road is built 
between cities A and B it _immediately_ makes traveling between these 
two cities faster. There is no discrete boundary.

I don't understand why we would drop support for watermarks - they would 
be perfectly supported, every iteration key will have a watermark hold 
that would be released when the key finished iterating - or was 
terminated due to timeout. I'm not sure if windowing as such plays any 
role in this, but maybe can.

>     On 6/23/21 10:53 PM, Reuven Lax wrote:
>>     One question I have is whether the use cases for cyclic graphs
>>     overlap substantially with the use cases for event-time
>>     watermarks. Many of the uses I'm aware of are ML-type algorithms
>>     (e.g. clustering) or iterative algorithms on large graphs
>>     (connected components, etc.), and it's unclear how many of these
>>     problems have a natural time component.
>>
>>     On Wed, Jun 23, 2021 at 1:49 PM Jan Lukavský <je.ik@seznam.cz
>>     <ma...@seznam.cz>> wrote:
>>
>>         Reuven, can you please elaborate a little on that? Why do you
>>         need watermark per iteration? Letting the watermark progress
>>         as soon as all the keys arriving before the upstream
>>         watermark terminate the cycle seems like a valid definition
>>         without the need to make the watermark multidimensional. Yes,
>>         it introduces (possibly unbounded) latency in downstream
>>         processing, but that is something that should be probably
>>         expected. The unboundness of the latency can be limited by
>>         either fixed timeout or number of iterations.
>>
>>         On 6/23/21 8:39 PM, Reuven Lax wrote:
>>>         This was explored in the past, though the design started
>>>         getting very complex (watermarks of unbounded dimension,
>>>         where each iteration has its own watermark dimension). At
>>>         the time, the exploration petered out.
>>>
>>>         On Wed, Jun 23, 2021 at 10:13 AM Jan Lukavský
>>>         <je.ik@seznam.cz <ma...@seznam.cz>> wrote:
>>>
>>>             Hi,
>>>
>>>             I'd like to discuss a very rough idea. I didn't walk
>>>             through all the
>>>             corner cases and the whole idea has a lot of rough
>>>             edges, so please bear
>>>             with me. I was thinking about non-IO applications of
>>>             splittable DoFn,
>>>             and the main idea - and why it is called splittable - is
>>>             that it can
>>>             handle unbounded outputs per element. Then I was
>>>             thinking about what can
>>>             generate unbounded outputs per element _without reading
>>>             from external
>>>             source_ (as that would be IO application) - and then I
>>>             realized that the
>>>             data can - at least theoretically - come from a
>>>             downstream transform. It
>>>             would have to be passed over an RPC (gRPC probably)
>>>             connection, it would
>>>             probably require some sort of service discovery - as the
>>>             feedback loop
>>>             would have to be correctly targeted based on key - and
>>>             so on (those are
>>>             the rough edges).
>>>
>>>             But supposing this can be solved - what iterations
>>>             actually mean is the
>>>             we have a side channel, that come from downstream
>>>             processing - and we
>>>             need a watermark estimator for this channel, that is
>>>             able to hold the
>>>             watermark back until the very last element (at a certain
>>>             watermark)
>>>             finishes the iteration. The idea is then we could - in
>>>             theory - create
>>>             an Iteration PTransform, that would take another
>>>             PTransform (probably
>>>             something like PTransform<PCollection<KV<K, V>>,
>>>             PCollection<KV<K,
>>>             IterationResult<K, V>>>, where the IterationResult<K, V>
>>>             would contain
>>>             the original KV<K, V> and a stopping condition (true,
>>>             false) and by
>>>             creating the feedback loop from the output of this
>>>             PCollection we could
>>>             actually implement this without any need of support on
>>>             the side of runners.
>>>
>>>             Does that seem like something that might be worth exploring?
>>>
>>>               Jan
>>>

Re: [DISCUSS] Do we have all the building block(s) to support iterations in Beam?

Posted by Reuven Lax <re...@google.com>.
On Wed, Jun 23, 2021 at 2:00 PM Jan Lukavský <je...@seznam.cz> wrote:

> The most qualitatively import use-case I see are ACID transactions -
> transactions naturally involve cycles, because the most natural
> implementation would be of something like "optimistic locking" where the
> transaction is allowed to progress until a downstream "commit" sees a
> conflict, when it needs to return the transaction back to the beginning
> (pretty much the same as how git resolves conflict in a push).
>
True, however within a transform one could use timers to implement this
(there are currently some bugs around looping timers I believe, but those
are easier to fix than implementing a brand new programming model).
Iterative is only really necessary if you need to iterate an entire
subgraph, including GroupByKeys, etc.


> Another application would be graph algorithms on changing graphs, where
> adding or removing an edge might trigger an iterative algorithm on the
> graph (and I'm absolutely not sure that the discussed approach can do that,
> this is just something, that would be cool to do :)).
>
Yes, that's what I had in mind. I'm just not sure that these algorithms
lend themselves to windowing. I.e. if we added iterative support, but did
not have support for windowing or watermarks across iterations, have we
solved most of the problem?

> On 6/23/21 10:53 PM, Reuven Lax wrote:
>
> One question I have is whether the use cases for cyclic graphs overlap
> substantially with the use cases for event-time watermarks. Many of the
> uses I'm aware of are ML-type algorithms (e.g. clustering) or iterative
> algorithms on large graphs (connected components, etc.), and it's unclear
> how many of these problems have a natural time component.
>
> On Wed, Jun 23, 2021 at 1:49 PM Jan Lukavský <je...@seznam.cz> wrote:
>
>> Reuven, can you please elaborate a little on that? Why do you need
>> watermark per iteration? Letting the watermark progress as soon as all the
>> keys arriving before the upstream watermark terminate the cycle seems like
>> a valid definition without the need to make the watermark multidimensional.
>> Yes, it introduces (possibly unbounded) latency in downstream processing,
>> but that is something that should be probably expected. The unboundness of
>> the latency can be limited by either fixed timeout or number of iterations.
>> On 6/23/21 8:39 PM, Reuven Lax wrote:
>>
>> This was explored in the past, though the design started getting very
>> complex (watermarks of unbounded dimension, where each iteration has its
>> own watermark dimension). At the time, the exploration petered out.
>>
>> On Wed, Jun 23, 2021 at 10:13 AM Jan Lukavský <je...@seznam.cz> wrote:
>>
>>> Hi,
>>>
>>> I'd like to discuss a very rough idea. I didn't walk through all the
>>> corner cases and the whole idea has a lot of rough edges, so please bear
>>> with me. I was thinking about non-IO applications of splittable DoFn,
>>> and the main idea - and why it is called splittable - is that it can
>>> handle unbounded outputs per element. Then I was thinking about what can
>>> generate unbounded outputs per element _without reading from external
>>> source_ (as that would be IO application) - and then I realized that the
>>> data can - at least theoretically - come from a downstream transform. It
>>> would have to be passed over an RPC (gRPC probably) connection, it would
>>> probably require some sort of service discovery - as the feedback loop
>>> would have to be correctly targeted based on key - and so on (those are
>>> the rough edges).
>>>
>>> But supposing this can be solved - what iterations actually mean is the
>>> we have a side channel, that come from downstream processing - and we
>>> need a watermark estimator for this channel, that is able to hold the
>>> watermark back until the very last element (at a certain watermark)
>>> finishes the iteration. The idea is then we could - in theory - create
>>> an Iteration PTransform, that would take another PTransform (probably
>>> something like PTransform<PCollection<KV<K, V>>, PCollection<KV<K,
>>> IterationResult<K, V>>>, where the IterationResult<K, V> would contain
>>> the original KV<K, V> and a stopping condition (true, false) and by
>>> creating the feedback loop from the output of this PCollection we could
>>> actually implement this without any need of support on the side of
>>> runners.
>>>
>>> Does that seem like something that might be worth exploring?
>>>
>>>   Jan
>>>
>>>

Re: [DISCUSS] Do we have all the building block(s) to support iterations in Beam?

Posted by Jan Lukavský <je...@seznam.cz>.
The most qualitatively import use-case I see are ACID transactions - 
transactions naturally involve cycles, because the most natural 
implementation would be of something like "optimistic locking" where the 
transaction is allowed to progress until a downstream "commit" sees a 
conflict, when it needs to return the transaction back to the beginning 
(pretty much the same as how git resolves conflict in a push).

Another application would be graph algorithms on changing graphs, where 
adding or removing an edge might trigger an iterative algorithm on the 
graph (and I'm absolutely not sure that the discussed approach can do 
that, this is just something, that would be cool to do :)).

On 6/23/21 10:53 PM, Reuven Lax wrote:
> One question I have is whether the use cases for cyclic graphs overlap 
> substantially with the use cases for event-time watermarks. Many of 
> the uses I'm aware of are ML-type algorithms (e.g. clustering) or 
> iterative algorithms on large graphs (connected components, etc.), and 
> it's unclear how many of these problems have a natural time component.
>
> On Wed, Jun 23, 2021 at 1:49 PM Jan Lukavský <je.ik@seznam.cz 
> <ma...@seznam.cz>> wrote:
>
>     Reuven, can you please elaborate a little on that? Why do you need
>     watermark per iteration? Letting the watermark progress as soon as
>     all the keys arriving before the upstream watermark terminate the
>     cycle seems like a valid definition without the need to make the
>     watermark multidimensional. Yes, it introduces (possibly
>     unbounded) latency in downstream processing, but that is something
>     that should be probably expected. The unboundness of the latency
>     can be limited by either fixed timeout or number of iterations.
>
>     On 6/23/21 8:39 PM, Reuven Lax wrote:
>>     This was explored in the past, though the design started getting
>>     very complex (watermarks of unbounded dimension, where each
>>     iteration has its own watermark dimension). At the time, the
>>     exploration petered out.
>>
>>     On Wed, Jun 23, 2021 at 10:13 AM Jan Lukavský <je.ik@seznam.cz
>>     <ma...@seznam.cz>> wrote:
>>
>>         Hi,
>>
>>         I'd like to discuss a very rough idea. I didn't walk through
>>         all the
>>         corner cases and the whole idea has a lot of rough edges, so
>>         please bear
>>         with me. I was thinking about non-IO applications of
>>         splittable DoFn,
>>         and the main idea - and why it is called splittable - is that
>>         it can
>>         handle unbounded outputs per element. Then I was thinking
>>         about what can
>>         generate unbounded outputs per element _without reading from
>>         external
>>         source_ (as that would be IO application) - and then I
>>         realized that the
>>         data can - at least theoretically - come from a downstream
>>         transform. It
>>         would have to be passed over an RPC (gRPC probably)
>>         connection, it would
>>         probably require some sort of service discovery - as the
>>         feedback loop
>>         would have to be correctly targeted based on key - and so on
>>         (those are
>>         the rough edges).
>>
>>         But supposing this can be solved - what iterations actually
>>         mean is the
>>         we have a side channel, that come from downstream processing
>>         - and we
>>         need a watermark estimator for this channel, that is able to
>>         hold the
>>         watermark back until the very last element (at a certain
>>         watermark)
>>         finishes the iteration. The idea is then we could - in theory
>>         - create
>>         an Iteration PTransform, that would take another PTransform
>>         (probably
>>         something like PTransform<PCollection<KV<K, V>>,
>>         PCollection<KV<K,
>>         IterationResult<K, V>>>, where the IterationResult<K, V>
>>         would contain
>>         the original KV<K, V> and a stopping condition (true, false)
>>         and by
>>         creating the feedback loop from the output of this
>>         PCollection we could
>>         actually implement this without any need of support on the
>>         side of runners.
>>
>>         Does that seem like something that might be worth exploring?
>>
>>           Jan
>>

Re: [DISCUSS] Do we have all the building block(s) to support iterations in Beam?

Posted by Reuven Lax <re...@google.com>.
One question I have is whether the use cases for cyclic graphs overlap
substantially with the use cases for event-time watermarks. Many of the
uses I'm aware of are ML-type algorithms (e.g. clustering) or iterative
algorithms on large graphs (connected components, etc.), and it's unclear
how many of these problems have a natural time component.

On Wed, Jun 23, 2021 at 1:49 PM Jan Lukavský <je...@seznam.cz> wrote:

> Reuven, can you please elaborate a little on that? Why do you need
> watermark per iteration? Letting the watermark progress as soon as all the
> keys arriving before the upstream watermark terminate the cycle seems like
> a valid definition without the need to make the watermark multidimensional.
> Yes, it introduces (possibly unbounded) latency in downstream processing,
> but that is something that should be probably expected. The unboundness of
> the latency can be limited by either fixed timeout or number of iterations.
> On 6/23/21 8:39 PM, Reuven Lax wrote:
>
> This was explored in the past, though the design started getting very
> complex (watermarks of unbounded dimension, where each iteration has its
> own watermark dimension). At the time, the exploration petered out.
>
> On Wed, Jun 23, 2021 at 10:13 AM Jan Lukavský <je...@seznam.cz> wrote:
>
>> Hi,
>>
>> I'd like to discuss a very rough idea. I didn't walk through all the
>> corner cases and the whole idea has a lot of rough edges, so please bear
>> with me. I was thinking about non-IO applications of splittable DoFn,
>> and the main idea - and why it is called splittable - is that it can
>> handle unbounded outputs per element. Then I was thinking about what can
>> generate unbounded outputs per element _without reading from external
>> source_ (as that would be IO application) - and then I realized that the
>> data can - at least theoretically - come from a downstream transform. It
>> would have to be passed over an RPC (gRPC probably) connection, it would
>> probably require some sort of service discovery - as the feedback loop
>> would have to be correctly targeted based on key - and so on (those are
>> the rough edges).
>>
>> But supposing this can be solved - what iterations actually mean is the
>> we have a side channel, that come from downstream processing - and we
>> need a watermark estimator for this channel, that is able to hold the
>> watermark back until the very last element (at a certain watermark)
>> finishes the iteration. The idea is then we could - in theory - create
>> an Iteration PTransform, that would take another PTransform (probably
>> something like PTransform<PCollection<KV<K, V>>, PCollection<KV<K,
>> IterationResult<K, V>>>, where the IterationResult<K, V> would contain
>> the original KV<K, V> and a stopping condition (true, false) and by
>> creating the feedback loop from the output of this PCollection we could
>> actually implement this without any need of support on the side of
>> runners.
>>
>> Does that seem like something that might be worth exploring?
>>
>>   Jan
>>
>>

Re: [DISCUSS] Do we have all the building block(s) to support iterations in Beam?

Posted by Jan Lukavský <je...@seznam.cz>.
Reuven, can you please elaborate a little on that? Why do you need 
watermark per iteration? Letting the watermark progress as soon as all 
the keys arriving before the upstream watermark terminate the cycle 
seems like a valid definition without the need to make the watermark 
multidimensional. Yes, it introduces (possibly unbounded) latency in 
downstream processing, but that is something that should be probably 
expected. The unboundness of the latency can be limited by either fixed 
timeout or number of iterations.

On 6/23/21 8:39 PM, Reuven Lax wrote:
> This was explored in the past, though the design started getting very 
> complex (watermarks of unbounded dimension, where each iteration has 
> its own watermark dimension). At the time, the exploration petered out.
>
> On Wed, Jun 23, 2021 at 10:13 AM Jan Lukavský <je.ik@seznam.cz 
> <ma...@seznam.cz>> wrote:
>
>     Hi,
>
>     I'd like to discuss a very rough idea. I didn't walk through all the
>     corner cases and the whole idea has a lot of rough edges, so
>     please bear
>     with me. I was thinking about non-IO applications of splittable DoFn,
>     and the main idea - and why it is called splittable - is that it can
>     handle unbounded outputs per element. Then I was thinking about
>     what can
>     generate unbounded outputs per element _without reading from external
>     source_ (as that would be IO application) - and then I realized
>     that the
>     data can - at least theoretically - come from a downstream
>     transform. It
>     would have to be passed over an RPC (gRPC probably) connection, it
>     would
>     probably require some sort of service discovery - as the feedback
>     loop
>     would have to be correctly targeted based on key - and so on
>     (those are
>     the rough edges).
>
>     But supposing this can be solved - what iterations actually mean
>     is the
>     we have a side channel, that come from downstream processing - and we
>     need a watermark estimator for this channel, that is able to hold the
>     watermark back until the very last element (at a certain watermark)
>     finishes the iteration. The idea is then we could - in theory -
>     create
>     an Iteration PTransform, that would take another PTransform (probably
>     something like PTransform<PCollection<KV<K, V>>, PCollection<KV<K,
>     IterationResult<K, V>>>, where the IterationResult<K, V> would
>     contain
>     the original KV<K, V> and a stopping condition (true, false) and by
>     creating the feedback loop from the output of this PCollection we
>     could
>     actually implement this without any need of support on the side of
>     runners.
>
>     Does that seem like something that might be worth exploring?
>
>       Jan
>

Re: [DISCUSS] Do we have all the building block(s) to support iterations in Beam?

Posted by Reuven Lax <re...@google.com>.
This was explored in the past, though the design started getting very
complex (watermarks of unbounded dimension, where each iteration has its
own watermark dimension). At the time, the exploration petered out.

On Wed, Jun 23, 2021 at 10:13 AM Jan Lukavský <je...@seznam.cz> wrote:

> Hi,
>
> I'd like to discuss a very rough idea. I didn't walk through all the
> corner cases and the whole idea has a lot of rough edges, so please bear
> with me. I was thinking about non-IO applications of splittable DoFn,
> and the main idea - and why it is called splittable - is that it can
> handle unbounded outputs per element. Then I was thinking about what can
> generate unbounded outputs per element _without reading from external
> source_ (as that would be IO application) - and then I realized that the
> data can - at least theoretically - come from a downstream transform. It
> would have to be passed over an RPC (gRPC probably) connection, it would
> probably require some sort of service discovery - as the feedback loop
> would have to be correctly targeted based on key - and so on (those are
> the rough edges).
>
> But supposing this can be solved - what iterations actually mean is the
> we have a side channel, that come from downstream processing - and we
> need a watermark estimator for this channel, that is able to hold the
> watermark back until the very last element (at a certain watermark)
> finishes the iteration. The idea is then we could - in theory - create
> an Iteration PTransform, that would take another PTransform (probably
> something like PTransform<PCollection<KV<K, V>>, PCollection<KV<K,
> IterationResult<K, V>>>, where the IterationResult<K, V> would contain
> the original KV<K, V> and a stopping condition (true, false) and by
> creating the feedback loop from the output of this PCollection we could
> actually implement this without any need of support on the side of runners.
>
> Does that seem like something that might be worth exploring?
>
>   Jan
>
>