You are viewing a plain text version of this content. The canonical link for it is here.
Posted to dev@beam.apache.org by Jan Lukavský <je...@seznam.cz> on 2019/05/23 14:10:37 UTC

@RequireTimeSortedInput design draft

Hi,

I have written a very brief draft of how it might be possible to 
implement @RequireTimeSortedInput discussed in [1]. I see the document 
[2] a starting point for a discussion. There are several open questions, 
which I believe can be resolved by this great community. :-)

Jan

[1] http://mail-archives.apache.org/mod_mbox/beam-dev/201905.mbox/browser

[2] 
https://docs.google.com/document/d/1ObLVUFsf1NcG8ZuIZE4aVy2RYKx2FfyMhkZYWPnI9-c/


Re: @RequireTimeSortedInput design draft

Posted by Jan Lukavský <je...@seznam.cz>.
Hi Reza,

 > if you are interested and have the bandwidth would be great to have 
you as a reviewer for the PR.

I'd be happy to.

Cheers,

  Jan

On 6/10/19 3:52 AM, Reza Rokni wrote:
> Hi,
>
> Interesting reading on the issue 143 :-) My example is more specific 
> in its scope but the general pattern will have uses with most 
> timeseries I suspect.
>
> The specific Jira is:
>
> https://issues.apache.org/jira/browse/BEAM-7386?filter=-2
>
> The signature is currently of the form:
> public static class BiTemporalJoin<K,V1,V2>
>      extends PTransform<KeyedPCollectionTuple<K>, PCollection<BiTemporalJoinResult<K,V1,V2>>> 
> if you are interested and have the bandwidth would be great to have 
> you as a reviewer for the PR. Also I hope to get time to contribute 
> more around timeseries utilities and would be great to have 
> collaborators! I have note looked into the detail of euphoria (looks 
> interesting!) but it should be reasonably straightforward to make use 
> of the class in other places.
>
> Cheers
>
> Reza
>
>
> On Fri, 7 Jun 2019 at 14:50, Jan Lukavský <je.ik@seznam.cz 
> <ma...@seznam.cz>> wrote:
>
>     Hi Reza, interesting suggestions, thanks.
>
>     When you mentioned join, I recalled an older issue (which
>     apparently was not yet transfered to Beam's JIRA)  [1]. Is this
>     anyhow related to what you are implementing? Would you like to
>     make your implementation accessible via Euphoria DSL [2]?
>
>      Jan
>
>     [1] https://github.com/seznam/euphoria/issues/143
>
>     [2]
>     https://github.com/apache/beam/blob/master/sdks/java/extensions/euphoria/src/main/java/org/apache/beam/sdk/extensions/euphoria/core/client/operator/Join.java
>
>     On 6/7/19 7:06 AM, Reza Rokni wrote:
>>     Hi Jan,
>>
>>     I have been working on a timeseries extension which makes use of
>>     many of these techniques for joining two temporal streams, it's
>>     almost ready for the PR, will ping it here when it is as it might
>>     be useful for you. In general, I borrowed a lot of techniques
>>     from CoGroupBy code.
>>
>>     /1) need to figure out how to get Coder of input PCollection
>>     of stateful ParDo inside StatefulDoFnRunner/
>>     My join takes in a <K, V1, V2> , in the outer transform I use
>>     things like leftCollection.getCoder()).getValueCoder(); Then when
>>     creating the Join transform I can defer the StateSpec object
>>     creation until the constructor is called.
>>
>>     /2) there are performance considerations, that can be solved
>>     probably only by Sorted Map State [2]/
>>     Sorted Map is going to be awesome, until then the only option is
>>     to create a Cache in the DoFn to make it more efficient. For the
>>     cache to work you need to key on Window + key and do things like
>>     clear the cache @Startbundle. Better to wait for Sorted Map if
>>     this is not time critical.
>>
>>     /3) additional work is needed for allowedLateness to work
>>     correctly (and there are at least two ways how to solve this),
>>     see the design doc [3]/
>>     Yup, in my case I can support this by not GC the right side of
>>     the join for now, but that is a compromise.
>>
>>     /4) more tests (for batch and validatesRunner) are needed/
>>     I just posted a question on the best way to make use of
>>     the @ValidateRunner annotation on this list, sounds like it might
>>     be useful to you as well :-)
>>
>>     On Thu, 6 Jun 2019 at 23:03, Jan Lukavský <je.ik@seznam.cz
>>     <ma...@seznam.cz>> wrote:
>>
>>         Hi,
>>
>>         I have written a PoC implementation of this in [1] and I'd
>>         like to
>>         discuss some implementation details. First of all, I'd
>>         appreciate any
>>         feedback about this. There are some known issues:
>>
>>           1) need to figure out how to get Coder of input PCollection of
>>         stateful ParDo inside StatefulDoFnRunner
>>
>>           2) there are performance considerations, that can be solved
>>         probably
>>         only by Sorted Map State [2]
>>
>>           3) additional work is needed for allowedLateness to work
>>         correctly
>>         (and there are at least two ways how to solve this), see the
>>         design doc [3]
>>
>>           4) more tests (for batch and validatesRunner) are needed
>>
>>         I have come across a few bugs in DirectRunner, which I tried
>>         to solve:
>>
>>           a) timers seem to be broken in stateful pardo with side inputs
>>
>>           b) timers need to be sorted by timestamp, otherwise state
>>         might be
>>         cleared before it gets chance to be flushed
>>
>>
>>         Thanks for feedback,
>>
>>           Jan
>>
>>
>>         [1] https://github.com/apache/beam/pull/8774
>>
>>         [2]
>>         http://mail-archives.apache.org/mod_mbox/beam-dev/201905.mbox/%3cCALsTK6+LdEmTjmnUYSn3vCufywjkhMgv1iSFBdMXTHoqH91xTQ@mail.gmail.com%3e
>>
>>         [3]
>>         https://docs.google.com/document/d/1ObLVUFsf1NcG8ZuIZE4aVy2RYKx2FfyMhkZYWPnI9-c/
>>
>>
>>         On 5/23/19 4:40 PM, Robert Bradshaw wrote:
>>         > Thanks for writing this up.
>>         >
>>         > I think the justification for adding this to the model
>>         needs to be
>>         > that it is useful (you have this covered, though some
>>         examples would
>>         > be nice) and that it's something that can't easily be done
>>         by users
>>         > themselves (specifically, though it can be (relatively)
>>         cheaply done
>>         > in streaming and batch, it's done in very different ways,
>>         and also
>>         > that it's hard to do via composition).
>>         >
>>         > On Thu, May 23, 2019 at 4:10 PM Jan Lukavský
>>         <je.ik@seznam.cz <ma...@seznam.cz>> wrote:
>>         >> Hi,
>>         >>
>>         >> I have written a very brief draft of how it might be
>>         possible to
>>         >> implement @RequireTimeSortedInput discussed in [1]. I see
>>         the document
>>         >> [2] a starting point for a discussion. There are several
>>         open questions,
>>         >> which I believe can be resolved by this great community. :-)
>>         >>
>>         >> Jan
>>         >>
>>         >> [1]
>>         http://mail-archives.apache.org/mod_mbox/beam-dev/201905.mbox/browser
>>         >>
>>         >> [2]
>>         >>
>>         https://docs.google.com/document/d/1ObLVUFsf1NcG8ZuIZE4aVy2RYKx2FfyMhkZYWPnI9-c/
>>         >>
>>
>>
>>
>>     -- 
>>
>>     This email may be confidential and privileged. If you received
>>     this communication by mistake, please don't forward it to anyone
>>     else, please erase all copies and attachments, and please let me
>>     know that it has gone to the wrong person.
>>
>>     The above terms reflect a potential business arrangement, are
>>     provided solely as a basis for further discussion, and are not
>>     intended to be and do not constitute a legally binding
>>     obligation. No legally binding obligations will be created,
>>     implied, or inferred until an agreement in final form is executed
>>     in writing by all parties involved.
>>
>
>
> -- 
>
> This email may be confidential and privileged. If you received this 
> communication by mistake, please don't forward it to anyone else, 
> please erase all copies and attachments, and please let me know that 
> it has gone to the wrong person.
>
> The above terms reflect a potential business arrangement, are provided 
> solely as a basis for further discussion, and are not intended to be 
> and do not constitute a legally binding obligation. No legally binding 
> obligations will be created, implied, or inferred until an agreement 
> in final form is executed in writing by all parties involved.
>

Re: @RequireTimeSortedInput design draft

Posted by Reza Rokni <re...@google.com>.
Hi,

Interesting reading on the issue 143 :-) My example is more specific in its
scope but the general pattern will have uses with most timeseries I suspect.

The specific Jira is:

https://issues.apache.org/jira/browse/BEAM-7386?filter=-2

The signature is currently of the form:

public static class BiTemporalJoin<K, V1, V2>
    extends PTransform<KeyedPCollectionTuple<K>,
PCollection<BiTemporalJoinResult<K, V1, V2>>>

if you are interested and have the bandwidth would be great to have you as
a reviewer for the PR. Also I hope to get time to contribute more around
timeseries utilities and would be great to have collaborators! I have note
looked into the detail of euphoria (looks interesting!) but it should be
reasonably straightforward to make use of the class in other places.

Cheers

Reza


On Fri, 7 Jun 2019 at 14:50, Jan Lukavský <je...@seznam.cz> wrote:

> Hi Reza, interesting suggestions, thanks.
>
> When you mentioned join, I recalled an older issue (which apparently was
> not yet transfered to Beam's JIRA)  [1]. Is this anyhow related to what you
> are implementing? Would you like to make your implementation accessible via
> Euphoria DSL [2]?
>
>  Jan
>
> [1] https://github.com/seznam/euphoria/issues/143
>
> [2]
> https://github.com/apache/beam/blob/master/sdks/java/extensions/euphoria/src/main/java/org/apache/beam/sdk/extensions/euphoria/core/client/operator/Join.java
> On 6/7/19 7:06 AM, Reza Rokni wrote:
>
> Hi Jan,
>
> I have been working on a timeseries extension which makes use of many of
> these techniques for joining two temporal streams, it's almost ready for
> the PR, will ping it here when it is as it might be useful for you. In
> general, I borrowed a lot of techniques from CoGroupBy code.
>
> *1) need to figure out how to get Coder of input PCollection of stateful
> ParDo inside StatefulDoFnRunner*
> My join takes in a <K, V1, V2> , in the outer transform I use things like
> leftCollection.getCoder()).getValueCoder(); Then when creating the Join
> transform I can defer the StateSpec object creation until the constructor
> is called.
>
> *2) there are performance considerations, that can be solved probably only
> by Sorted Map State [2]*
> Sorted Map is going to be awesome, until then the only option is to create
> a Cache in the DoFn to make it more efficient. For the cache to work you
> need to key on Window + key and do things like clear the
> cache @Startbundle. Better to wait for Sorted Map if this is not time
> critical.
>
> *3) additional work is needed for allowedLateness to work correctly (and
> there are at least two ways how to solve this), see the design doc [3]*
> Yup, in my case I can support this by not GC the right side of the join
> for now, but that is a compromise.
>
> *4) more tests (for batch and validatesRunner) are needed*
> I just posted a question on the best way to make use of
> the @ValidateRunner annotation on this list, sounds like it might be useful
> to you as well :-)
>
>
> On Thu, 6 Jun 2019 at 23:03, Jan Lukavský <je...@seznam.cz> wrote:
>
>> Hi,
>>
>> I have written a PoC implementation of this in [1] and I'd like to
>> discuss some implementation details. First of all, I'd appreciate any
>> feedback about this. There are some known issues:
>>
>>   1) need to figure out how to get Coder of input PCollection of
>> stateful ParDo inside StatefulDoFnRunner
>>
>>   2) there are performance considerations, that can be solved probably
>> only by Sorted Map State [2]
>>
>>   3) additional work is needed for allowedLateness to work correctly
>> (and there are at least two ways how to solve this), see the design doc
>> [3]
>>
>>   4) more tests (for batch and validatesRunner) are needed
>>
>> I have come across a few bugs in DirectRunner, which I tried to solve:
>>
>>   a) timers seem to be broken in stateful pardo with side inputs
>>
>>   b) timers need to be sorted by timestamp, otherwise state might be
>> cleared before it gets chance to be flushed
>>
>>
>> Thanks for feedback,
>>
>>   Jan
>>
>>
>> [1] https://github.com/apache/beam/pull/8774
>>
>> [2]
>>
>> http://mail-archives.apache.org/mod_mbox/beam-dev/201905.mbox/%3cCALsTK6+LdEmTjmnUYSn3vCufywjkhMgv1iSFBdMXTHoqH91xTQ@mail.gmail.com%3e
>>
>> [3]
>>
>> https://docs.google.com/document/d/1ObLVUFsf1NcG8ZuIZE4aVy2RYKx2FfyMhkZYWPnI9-c/
>>
>>
>> On 5/23/19 4:40 PM, Robert Bradshaw wrote:
>> > Thanks for writing this up.
>> >
>> > I think the justification for adding this to the model needs to be
>> > that it is useful (you have this covered, though some examples would
>> > be nice) and that it's something that can't easily be done by users
>> > themselves (specifically, though it can be (relatively) cheaply done
>> > in streaming and batch, it's done in very different ways, and also
>> > that it's hard to do via composition).
>> >
>> > On Thu, May 23, 2019 at 4:10 PM Jan Lukavský <je...@seznam.cz> wrote:
>> >> Hi,
>> >>
>> >> I have written a very brief draft of how it might be possible to
>> >> implement @RequireTimeSortedInput discussed in [1]. I see the document
>> >> [2] a starting point for a discussion. There are several open
>> questions,
>> >> which I believe can be resolved by this great community. :-)
>> >>
>> >> Jan
>> >>
>> >> [1]
>> http://mail-archives.apache.org/mod_mbox/beam-dev/201905.mbox/browser
>> >>
>> >> [2]
>> >>
>> https://docs.google.com/document/d/1ObLVUFsf1NcG8ZuIZE4aVy2RYKx2FfyMhkZYWPnI9-c/
>> >>
>>
>
>
> --
>
> This email may be confidential and privileged. If you received this
> communication by mistake, please don't forward it to anyone else, please
> erase all copies and attachments, and please let me know that it has gone
> to the wrong person.
>
> The above terms reflect a potential business arrangement, are provided
> solely as a basis for further discussion, and are not intended to be and do
> not constitute a legally binding obligation. No legally binding obligations
> will be created, implied, or inferred until an agreement in final form is
> executed in writing by all parties involved.
>
>

-- 

This email may be confidential and privileged. If you received this
communication by mistake, please don't forward it to anyone else, please
erase all copies and attachments, and please let me know that it has gone
to the wrong person.

The above terms reflect a potential business arrangement, are provided
solely as a basis for further discussion, and are not intended to be and do
not constitute a legally binding obligation. No legally binding obligations
will be created, implied, or inferred until an agreement in final form is
executed in writing by all parties involved.

Re: @RequireTimeSortedInput design draft

Posted by Jan Lukavský <je...@seznam.cz>.
Hi Reza, interesting suggestions, thanks.

When you mentioned join, I recalled an older issue (which apparently was 
not yet transfered to Beam's JIRA)  [1]. Is this anyhow related to what 
you are implementing? Would you like to make your implementation 
accessible via Euphoria DSL [2]?

  Jan

[1] https://github.com/seznam/euphoria/issues/143

[2] 
https://github.com/apache/beam/blob/master/sdks/java/extensions/euphoria/src/main/java/org/apache/beam/sdk/extensions/euphoria/core/client/operator/Join.java

On 6/7/19 7:06 AM, Reza Rokni wrote:
> Hi Jan,
>
> I have been working on a timeseries extension which makes use of many 
> of these techniques for joining two temporal streams, it's almost 
> ready for the PR, will ping it here when it is as it might be useful 
> for you. In general, I borrowed a lot of techniques from CoGroupBy code.
>
> /1) need to figure out how to get Coder of input PCollection 
> of stateful ParDo inside StatefulDoFnRunner/
> My join takes in a <K, V1, V2> , in the outer transform I use things 
> like leftCollection.getCoder()).getValueCoder(); Then when creating 
> the Join transform I can defer the StateSpec object creation until the 
> constructor is called.
>
> /2) there are performance considerations, that can be solved 
> probably only by Sorted Map State [2]/
> Sorted Map is going to be awesome, until then the only option is to 
> create a Cache in the DoFn to make it more efficient. For the cache to 
> work you need to key on Window + key and do things like clear the 
> cache @Startbundle. Better to wait for Sorted Map if this is not time 
> critical.
>
> /3) additional work is needed for allowedLateness to work 
> correctly (and there are at least two ways how to solve this), see the 
> design doc [3]/
> Yup, in my case I can support this by not GC the right side of the 
> join for now, but that is a compromise.
>
> /4) more tests (for batch and validatesRunner) are needed/
> I just posted a question on the best way to make use of 
> the @ValidateRunner annotation on this list, sounds like it might be 
> useful to you as well :-)
>
> On Thu, 6 Jun 2019 at 23:03, Jan Lukavský <je.ik@seznam.cz 
> <ma...@seznam.cz>> wrote:
>
>     Hi,
>
>     I have written a PoC implementation of this in [1] and I'd like to
>     discuss some implementation details. First of all, I'd appreciate any
>     feedback about this. There are some known issues:
>
>       1) need to figure out how to get Coder of input PCollection of
>     stateful ParDo inside StatefulDoFnRunner
>
>       2) there are performance considerations, that can be solved
>     probably
>     only by Sorted Map State [2]
>
>       3) additional work is needed for allowedLateness to work correctly
>     (and there are at least two ways how to solve this), see the
>     design doc [3]
>
>       4) more tests (for batch and validatesRunner) are needed
>
>     I have come across a few bugs in DirectRunner, which I tried to solve:
>
>       a) timers seem to be broken in stateful pardo with side inputs
>
>       b) timers need to be sorted by timestamp, otherwise state might be
>     cleared before it gets chance to be flushed
>
>
>     Thanks for feedback,
>
>       Jan
>
>
>     [1] https://github.com/apache/beam/pull/8774
>
>     [2]
>     http://mail-archives.apache.org/mod_mbox/beam-dev/201905.mbox/%3cCALsTK6+LdEmTjmnUYSn3vCufywjkhMgv1iSFBdMXTHoqH91xTQ@mail.gmail.com%3e
>
>     [3]
>     https://docs.google.com/document/d/1ObLVUFsf1NcG8ZuIZE4aVy2RYKx2FfyMhkZYWPnI9-c/
>
>
>     On 5/23/19 4:40 PM, Robert Bradshaw wrote:
>     > Thanks for writing this up.
>     >
>     > I think the justification for adding this to the model needs to be
>     > that it is useful (you have this covered, though some examples would
>     > be nice) and that it's something that can't easily be done by users
>     > themselves (specifically, though it can be (relatively) cheaply done
>     > in streaming and batch, it's done in very different ways, and also
>     > that it's hard to do via composition).
>     >
>     > On Thu, May 23, 2019 at 4:10 PM Jan Lukavský <je.ik@seznam.cz
>     <ma...@seznam.cz>> wrote:
>     >> Hi,
>     >>
>     >> I have written a very brief draft of how it might be possible to
>     >> implement @RequireTimeSortedInput discussed in [1]. I see the
>     document
>     >> [2] a starting point for a discussion. There are several open
>     questions,
>     >> which I believe can be resolved by this great community. :-)
>     >>
>     >> Jan
>     >>
>     >> [1]
>     http://mail-archives.apache.org/mod_mbox/beam-dev/201905.mbox/browser
>     >>
>     >> [2]
>     >>
>     https://docs.google.com/document/d/1ObLVUFsf1NcG8ZuIZE4aVy2RYKx2FfyMhkZYWPnI9-c/
>     >>
>
>
>
> -- 
>
> This email may be confidential and privileged. If you received this 
> communication by mistake, please don't forward it to anyone else, 
> please erase all copies and attachments, and please let me know that 
> it has gone to the wrong person.
>
> The above terms reflect a potential business arrangement, are provided 
> solely as a basis for further discussion, and are not intended to be 
> and do not constitute a legally binding obligation. No legally binding 
> obligations will be created, implied, or inferred until an agreement 
> in final form is executed in writing by all parties involved.
>

Re: @RequireTimeSortedInput design draft

Posted by Reza Rokni <re...@google.com>.
Hi Jan,

I have been working on a timeseries extension which makes use of many of
these techniques for joining two temporal streams, it's almost ready for
the PR, will ping it here when it is as it might be useful for you. In
general, I borrowed a lot of techniques from CoGroupBy code.

*1) need to figure out how to get Coder of input PCollection of stateful
ParDo inside StatefulDoFnRunner*
My join takes in a <K, V1, V2> , in the outer transform I use things like
leftCollection.getCoder()).getValueCoder(); Then when creating the Join
transform I can defer the StateSpec object creation until the constructor
is called.

*2) there are performance considerations, that can be solved probably only
by Sorted Map State [2]*
Sorted Map is going to be awesome, until then the only option is to create
a Cache in the DoFn to make it more efficient. For the cache to work you
need to key on Window + key and do things like clear the
cache @Startbundle. Better to wait for Sorted Map if this is not time
critical.

*3) additional work is needed for allowedLateness to work correctly (and
there are at least two ways how to solve this), see the design doc [3]*
Yup, in my case I can support this by not GC the right side of the join for
now, but that is a compromise.

*4) more tests (for batch and validatesRunner) are needed*
I just posted a question on the best way to make use of the @ValidateRunner
annotation on this list, sounds like it might be useful to you as well :-)


On Thu, 6 Jun 2019 at 23:03, Jan Lukavský <je...@seznam.cz> wrote:

> Hi,
>
> I have written a PoC implementation of this in [1] and I'd like to
> discuss some implementation details. First of all, I'd appreciate any
> feedback about this. There are some known issues:
>
>   1) need to figure out how to get Coder of input PCollection of
> stateful ParDo inside StatefulDoFnRunner
>
>   2) there are performance considerations, that can be solved probably
> only by Sorted Map State [2]
>
>   3) additional work is needed for allowedLateness to work correctly
> (and there are at least two ways how to solve this), see the design doc [3]
>
>   4) more tests (for batch and validatesRunner) are needed
>
> I have come across a few bugs in DirectRunner, which I tried to solve:
>
>   a) timers seem to be broken in stateful pardo with side inputs
>
>   b) timers need to be sorted by timestamp, otherwise state might be
> cleared before it gets chance to be flushed
>
>
> Thanks for feedback,
>
>   Jan
>
>
> [1] https://github.com/apache/beam/pull/8774
>
> [2]
>
> http://mail-archives.apache.org/mod_mbox/beam-dev/201905.mbox/%3cCALsTK6+LdEmTjmnUYSn3vCufywjkhMgv1iSFBdMXTHoqH91xTQ@mail.gmail.com%3e
>
> [3]
>
> https://docs.google.com/document/d/1ObLVUFsf1NcG8ZuIZE4aVy2RYKx2FfyMhkZYWPnI9-c/
>
>
> On 5/23/19 4:40 PM, Robert Bradshaw wrote:
> > Thanks for writing this up.
> >
> > I think the justification for adding this to the model needs to be
> > that it is useful (you have this covered, though some examples would
> > be nice) and that it's something that can't easily be done by users
> > themselves (specifically, though it can be (relatively) cheaply done
> > in streaming and batch, it's done in very different ways, and also
> > that it's hard to do via composition).
> >
> > On Thu, May 23, 2019 at 4:10 PM Jan Lukavský <je...@seznam.cz> wrote:
> >> Hi,
> >>
> >> I have written a very brief draft of how it might be possible to
> >> implement @RequireTimeSortedInput discussed in [1]. I see the document
> >> [2] a starting point for a discussion. There are several open questions,
> >> which I believe can be resolved by this great community. :-)
> >>
> >> Jan
> >>
> >> [1]
> http://mail-archives.apache.org/mod_mbox/beam-dev/201905.mbox/browser
> >>
> >> [2]
> >>
> https://docs.google.com/document/d/1ObLVUFsf1NcG8ZuIZE4aVy2RYKx2FfyMhkZYWPnI9-c/
> >>
>


-- 

This email may be confidential and privileged. If you received this
communication by mistake, please don't forward it to anyone else, please
erase all copies and attachments, and please let me know that it has gone
to the wrong person.

The above terms reflect a potential business arrangement, are provided
solely as a basis for further discussion, and are not intended to be and do
not constitute a legally binding obligation. No legally binding obligations
will be created, implied, or inferred until an agreement in final form is
executed in writing by all parties involved.

Re: @RequireTimeSortedInput design draft

Posted by Jan Lukavský <je...@seznam.cz>.
Hi,

I have written a PoC implementation of this in [1] and I'd like to 
discuss some implementation details. First of all, I'd appreciate any 
feedback about this. There are some known issues:

  1) need to figure out how to get Coder of input PCollection of 
stateful ParDo inside StatefulDoFnRunner

  2) there are performance considerations, that can be solved probably 
only by Sorted Map State [2]

  3) additional work is needed for allowedLateness to work correctly 
(and there are at least two ways how to solve this), see the design doc [3]

  4) more tests (for batch and validatesRunner) are needed

I have come across a few bugs in DirectRunner, which I tried to solve:

  a) timers seem to be broken in stateful pardo with side inputs

  b) timers need to be sorted by timestamp, otherwise state might be 
cleared before it gets chance to be flushed


Thanks for feedback,

  Jan


[1] https://github.com/apache/beam/pull/8774

[2] 
http://mail-archives.apache.org/mod_mbox/beam-dev/201905.mbox/%3cCALsTK6+LdEmTjmnUYSn3vCufywjkhMgv1iSFBdMXTHoqH91xTQ@mail.gmail.com%3e

[3] 
https://docs.google.com/document/d/1ObLVUFsf1NcG8ZuIZE4aVy2RYKx2FfyMhkZYWPnI9-c/


On 5/23/19 4:40 PM, Robert Bradshaw wrote:
> Thanks for writing this up.
>
> I think the justification for adding this to the model needs to be
> that it is useful (you have this covered, though some examples would
> be nice) and that it's something that can't easily be done by users
> themselves (specifically, though it can be (relatively) cheaply done
> in streaming and batch, it's done in very different ways, and also
> that it's hard to do via composition).
>
> On Thu, May 23, 2019 at 4:10 PM Jan Lukavský <je...@seznam.cz> wrote:
>> Hi,
>>
>> I have written a very brief draft of how it might be possible to
>> implement @RequireTimeSortedInput discussed in [1]. I see the document
>> [2] a starting point for a discussion. There are several open questions,
>> which I believe can be resolved by this great community. :-)
>>
>> Jan
>>
>> [1] http://mail-archives.apache.org/mod_mbox/beam-dev/201905.mbox/browser
>>
>> [2]
>> https://docs.google.com/document/d/1ObLVUFsf1NcG8ZuIZE4aVy2RYKx2FfyMhkZYWPnI9-c/
>>

Re: @RequireTimeSortedInput design draft

Posted by Jan Lukavský <je...@seznam.cz>.
Hi,

I have updated the proposal with gathered feedback, and I tried to 
resolve most of the issues that were mentioned. I'd be grateful for 
another round of comments.

Thanks,

  Jan

[1] 
https://docs.google.com/document/d/1ObLVUFsf1NcG8ZuIZE4aVy2RYKx2FfyMhkZYWPnI9-c/

On 5/23/19 4:40 PM, Robert Bradshaw wrote:
> Thanks for writing this up.
>
> I think the justification for adding this to the model needs to be
> that it is useful (you have this covered, though some examples would
> be nice) and that it's something that can't easily be done by users
> themselves (specifically, though it can be (relatively) cheaply done
> in streaming and batch, it's done in very different ways, and also
> that it's hard to do via composition).
>
> On Thu, May 23, 2019 at 4:10 PM Jan Lukavský <je...@seznam.cz> wrote:
>> Hi,
>>
>> I have written a very brief draft of how it might be possible to
>> implement @RequireTimeSortedInput discussed in [1]. I see the document
>> [2] a starting point for a discussion. There are several open questions,
>> which I believe can be resolved by this great community. :-)
>>
>> Jan
>>
>> [1] http://mail-archives.apache.org/mod_mbox/beam-dev/201905.mbox/browser
>>
>> [2]
>> https://docs.google.com/document/d/1ObLVUFsf1NcG8ZuIZE4aVy2RYKx2FfyMhkZYWPnI9-c/
>>

Re: @RequireTimeSortedInput design draft

Posted by Robert Bradshaw <ro...@google.com>.
Thanks for writing this up.

I think the justification for adding this to the model needs to be
that it is useful (you have this covered, though some examples would
be nice) and that it's something that can't easily be done by users
themselves (specifically, though it can be (relatively) cheaply done
in streaming and batch, it's done in very different ways, and also
that it's hard to do via composition).

On Thu, May 23, 2019 at 4:10 PM Jan Lukavský <je...@seznam.cz> wrote:
>
> Hi,
>
> I have written a very brief draft of how it might be possible to
> implement @RequireTimeSortedInput discussed in [1]. I see the document
> [2] a starting point for a discussion. There are several open questions,
> which I believe can be resolved by this great community. :-)
>
> Jan
>
> [1] http://mail-archives.apache.org/mod_mbox/beam-dev/201905.mbox/browser
>
> [2]
> https://docs.google.com/document/d/1ObLVUFsf1NcG8ZuIZE4aVy2RYKx2FfyMhkZYWPnI9-c/
>

Re: Proposal: @RequiresTimeSortedInput

Posted by Jan Lukavský <je...@seznam.cz>.
Hi Kenn,

I would continue with this discussion on the thread [1] (as you propose 
as well) and consider all the other threads regarding this as closed.

I will take your latest note in a reply there.

Jan

[1] 
https://lists.apache.org/thread.html/e2f729c7cea22553fc34421d4547132fa1c2ec01035eb4fb1a426873@%3Cdev.beam.apache.org%3E

On 11/15/19 5:56 AM, Kenneth Knowles wrote:
> Hi Jan,
>
> Sorry for the very slow reply.
>
> Your proposed feature is sensitive to all data that is not in 
> timestamp order, which is not the same as late. In Beam "late" is 
> defined as "assigned to a window where the watermark has passed the 
> end of the window and a 'final' aggregate has been produced". Your 
> proposal is not really sensitive to this form of late data.
>
> I think there is some published work that will help you particularly 
> in addressing out-of-order data. Note that this is not the normal 
> notion of late. . Trill has a high-watermark driven sorting buffer 
> prior to sending elements in order to stateful operators. It is 
> similar to your sketched algorithm for emitting elements as the 
> watermark passes. I believe Gearpump also uses a sorting buffer and 
> processes in order, and we do have a Gearpump runner still here in our 
> repo.
>
> Kenn
>
> On Mon, Nov 4, 2019 at 3:54 AM Jan Lukavský <je.ik@seznam.cz 
> <ma...@seznam.cz>> wrote:
>
>     Hi,
>
>     there has been some development around this [1], which essentially
>     concludes that currently this feature can be safely supported only by
>     direct runner, flink runner (both batch and streaming, non-portable
>     only) and spark (batch, legacy only). This is due to the fact,
>     that time
>     sorting relies heavily on timers to be strictly ordered. Failing
>     to do
>     so might result in unpredictable data loss, due to window-cleanup of
>     state occurring prior to all elements being emitted (note that this
>     generally might happen even to current user pipelines!). I can link
>     issues [2], [3] and [4] to [5], but the question is, with only so few
>     runners being able to support this, what should be the best way to
>     incorporate this into any upcoming release (I'm assuming that this
>     will
>     pass a vote, which is not known yet)? I'd say that the best way
>     would be
>     the affected runners to fail to execute the pipeline until the
>     respective issues are resolved. Another option would be to block this
>     until the issues are resolved in runners, but that might delay the
>     availability of this feature for some unknown time.
>
>     Thanks for any opinions,
>
>     Jan
>
>     [1]
>     https://lists.apache.org/thread.html/71a8f48ca518f1f2e6e9b1284114624670884775d209b0097f68264b@%3Cdev.beam.apache.org%3E
>
>     [2] https://issues.apache.org/jira/browse/BEAM-8459
>
>     [3] https://issues.apache.org/jira/browse/BEAM-8460
>
>     [4] https://issues.apache.org/jira/browse/BEAM-8543.
>
>     [5] https://issues.apache.org/jira/browse/BEAM-8550
>
>     On 10/31/19 2:59 PM, Jan Lukavský wrote:
>     > Hi,
>     >
>     > as a follow-up from previous design draft, I'd like to promote the
>     > document [1] and associated PR [2] to proposal.
>     >
>     > The PR contains working implementation for:
>     >
>     >  - non-portable batch flink and batch spark (legacy)
>     >
>     >  - all non-portable streaming runners that use StatefulDoFnRunner
>     > (direct, samza, dataflow)
>     >
>     >  - portable flink (batch, streaming)
>     >
>     > There are still some unresolved issues:
>     >
>     >  a) no way to specify allowed lateness (currently is simply
>     zero, late
>     > data should be dropped)
>     >
>     >  b) need a way to specify user UDF for extracting timestamp
>     (according
>     > to [3] it would be useful to have that option)
>     >
>     >  c) need to add more tests (e.g. late data)
>     >
>     > The plan is to postpone resolution of issues a) and b) after the
>     > proposal is merged. I'd like to gather some more feedback on the
>     > proposal, iterate over that again, add more tests and then pass
>     this
>     > to a vote.
>     >
>     > Unrelated - during implementation a bug [4] in Samza runner was
>     found.
>     >
>     > Looking forward to any comments!
>     >
>     > Jan
>     >
>     > [1]
>     >
>     https://docs.google.com/document/d/1ObLVUFsf1NcG8ZuIZE4aVy2RYKx2FfyMhkZYWPnI9-c/
>
>     >
>     >
>     > [2] https://github.com/apache/beam/pull/8774
>     >
>     > [3]
>     >
>     https://lists.apache.org/thread.html/813429e78b895a336d4f5507e3b2330282e2904fa25d52d6d441741a@%3Cdev.beam.apache.org%3E
>     >
>     > [4] https://issues.apache.org/jira/browse/BEAM-8529
>     >
>     >
>     > On 5/23/19 4:10 PM, Jan Lukavský wrote:
>     >> Hi,
>     >>
>     >> I have written a very brief draft of how it might be possible to
>     >> implement @RequireTimeSortedInput discussed in [1]. I see the
>     >> document [2] a starting point for a discussion. There are several
>     >> open questions, which I believe can be resolved by this great
>     >> community. :-)
>     >>
>     >> Jan
>     >>
>     >> [1]
>     >>
>     https://lists.apache.org/thread.html/4609a1bb1662690d67950e76d2f1108b51327b8feaf9580de659552e@%3Cdev.beam.apache.org%3E
>     >>
>     >> [2]
>     >>
>     https://docs.google.com/document/d/1ObLVUFsf1NcG8ZuIZE4aVy2RYKx2FfyMhkZYWPnI9-c/
>     >>
>

Re: Proposal: @RequiresTimeSortedInput

Posted by Kenneth Knowles <ke...@apache.org>.
Hi Jan,

Sorry for the very slow reply.

Your proposed feature is sensitive to all data that is not in timestamp
order, which is not the same as late. In Beam "late" is defined as
"assigned to a window where the watermark has passed the end of the window
and a 'final' aggregate has been produced". Your proposal is not really
sensitive to this form of late data.

I think there is some published work that will help you particularly in
addressing out-of-order data. Note that this is not the normal notion of
late. . Trill has a high-watermark driven sorting buffer prior to sending
elements in order to stateful operators. It is similar to your sketched
algorithm for emitting elements as the watermark passes. I believe Gearpump
also uses a sorting buffer and processes in order, and we do have a
Gearpump runner still here in our repo.

Kenn

On Mon, Nov 4, 2019 at 3:54 AM Jan Lukavský <je...@seznam.cz> wrote:

> Hi,
>
> there has been some development around this [1], which essentially
> concludes that currently this feature can be safely supported only by
> direct runner, flink runner (both batch and streaming, non-portable
> only) and spark (batch, legacy only). This is due to the fact, that time
> sorting relies heavily on timers to be strictly ordered. Failing to do
> so might result in unpredictable data loss, due to window-cleanup of
> state occurring prior to all elements being emitted (note that this
> generally might happen even to current user pipelines!). I can link
> issues [2], [3] and [4] to [5], but the question is, with only so few
> runners being able to support this, what should be the best way to
> incorporate this into any upcoming release (I'm assuming that this will
> pass a vote, which is not known yet)? I'd say that the best way would be
> the affected runners to fail to execute the pipeline until the
> respective issues are resolved. Another option would be to block this
> until the issues are resolved in runners, but that might delay the
> availability of this feature for some unknown time.
>
> Thanks for any opinions,
>
> Jan
>
> [1]
>
> https://lists.apache.org/thread.html/71a8f48ca518f1f2e6e9b1284114624670884775d209b0097f68264b@%3Cdev.beam.apache.org%3E
>
> [2] https://issues.apache.org/jira/browse/BEAM-8459
>
> [3] https://issues.apache.org/jira/browse/BEAM-8460
>
> [4] https://issues.apache.org/jira/browse/BEAM-8543.
>
> [5] https://issues.apache.org/jira/browse/BEAM-8550
>
> On 10/31/19 2:59 PM, Jan Lukavský wrote:
> > Hi,
> >
> > as a follow-up from previous design draft, I'd like to promote the
> > document [1] and associated PR [2] to proposal.
> >
> > The PR contains working implementation for:
> >
> >  - non-portable batch flink and batch spark (legacy)
> >
> >  - all non-portable streaming runners that use StatefulDoFnRunner
> > (direct, samza, dataflow)
> >
> >  - portable flink (batch, streaming)
> >
> > There are still some unresolved issues:
> >
> >  a) no way to specify allowed lateness (currently is simply zero, late
> > data should be dropped)
> >
> >  b) need a way to specify user UDF for extracting timestamp (according
> > to [3] it would be useful to have that option)
> >
> >  c) need to add more tests (e.g. late data)
> >
> > The plan is to postpone resolution of issues a) and b) after the
> > proposal is merged. I'd like to gather some more feedback on the
> > proposal, iterate over that again, add more tests and then pass this
> > to a vote.
> >
> > Unrelated - during implementation a bug [4] in Samza runner was found.
> >
> > Looking forward to any comments!
> >
> > Jan
> >
> > [1]
> >
> https://docs.google.com/document/d/1ObLVUFsf1NcG8ZuIZE4aVy2RYKx2FfyMhkZYWPnI9-c/
> >
> >
> > [2] https://github.com/apache/beam/pull/8774
> >
> > [3]
> >
> https://lists.apache.org/thread.html/813429e78b895a336d4f5507e3b2330282e2904fa25d52d6d441741a@%3Cdev.beam.apache.org%3E
> >
> > [4] https://issues.apache.org/jira/browse/BEAM-8529
> >
> >
> > On 5/23/19 4:10 PM, Jan Lukavský wrote:
> >> Hi,
> >>
> >> I have written a very brief draft of how it might be possible to
> >> implement @RequireTimeSortedInput discussed in [1]. I see the
> >> document [2] a starting point for a discussion. There are several
> >> open questions, which I believe can be resolved by this great
> >> community. :-)
> >>
> >> Jan
> >>
> >> [1]
> >>
> https://lists.apache.org/thread.html/4609a1bb1662690d67950e76d2f1108b51327b8feaf9580de659552e@%3Cdev.beam.apache.org%3E
> >>
> >> [2]
> >>
> https://docs.google.com/document/d/1ObLVUFsf1NcG8ZuIZE4aVy2RYKx2FfyMhkZYWPnI9-c/
> >>
>

Re: Proposal: @RequiresTimeSortedInput

Posted by Jan Lukavský <je...@seznam.cz>.
Hi,

there has been some development around this [1], which essentially 
concludes that currently this feature can be safely supported only by 
direct runner, flink runner (both batch and streaming, non-portable 
only) and spark (batch, legacy only). This is due to the fact, that time 
sorting relies heavily on timers to be strictly ordered. Failing to do 
so might result in unpredictable data loss, due to window-cleanup of 
state occurring prior to all elements being emitted (note that this 
generally might happen even to current user pipelines!). I can link 
issues [2], [3] and [4] to [5], but the question is, with only so few 
runners being able to support this, what should be the best way to 
incorporate this into any upcoming release (I'm assuming that this will 
pass a vote, which is not known yet)? I'd say that the best way would be 
the affected runners to fail to execute the pipeline until the 
respective issues are resolved. Another option would be to block this 
until the issues are resolved in runners, but that might delay the 
availability of this feature for some unknown time.

Thanks for any opinions,

Jan

[1] 
https://lists.apache.org/thread.html/71a8f48ca518f1f2e6e9b1284114624670884775d209b0097f68264b@%3Cdev.beam.apache.org%3E

[2] https://issues.apache.org/jira/browse/BEAM-8459

[3] https://issues.apache.org/jira/browse/BEAM-8460

[4] https://issues.apache.org/jira/browse/BEAM-8543.

[5] https://issues.apache.org/jira/browse/BEAM-8550

On 10/31/19 2:59 PM, Jan Lukavský wrote:
> Hi,
>
> as a follow-up from previous design draft, I'd like to promote the 
> document [1] and associated PR [2] to proposal.
>
> The PR contains working implementation for:
>
>  - non-portable batch flink and batch spark (legacy)
>
>  - all non-portable streaming runners that use StatefulDoFnRunner 
> (direct, samza, dataflow)
>
>  - portable flink (batch, streaming)
>
> There are still some unresolved issues:
>
>  a) no way to specify allowed lateness (currently is simply zero, late 
> data should be dropped)
>
>  b) need a way to specify user UDF for extracting timestamp (according 
> to [3] it would be useful to have that option)
>
>  c) need to add more tests (e.g. late data)
>
> The plan is to postpone resolution of issues a) and b) after the 
> proposal is merged. I'd like to gather some more feedback on the 
> proposal, iterate over that again, add more tests and then pass this 
> to a vote.
>
> Unrelated - during implementation a bug [4] in Samza runner was found.
>
> Looking forward to any comments!
>
> Jan
>
> [1] 
> https://docs.google.com/document/d/1ObLVUFsf1NcG8ZuIZE4aVy2RYKx2FfyMhkZYWPnI9-c/ 
>
>
> [2] https://github.com/apache/beam/pull/8774
>
> [3] 
> https://lists.apache.org/thread.html/813429e78b895a336d4f5507e3b2330282e2904fa25d52d6d441741a@%3Cdev.beam.apache.org%3E
>
> [4] https://issues.apache.org/jira/browse/BEAM-8529
>
>
> On 5/23/19 4:10 PM, Jan Lukavský wrote:
>> Hi,
>>
>> I have written a very brief draft of how it might be possible to 
>> implement @RequireTimeSortedInput discussed in [1]. I see the 
>> document [2] a starting point for a discussion. There are several 
>> open questions, which I believe can be resolved by this great 
>> community. :-)
>>
>> Jan
>>
>> [1] 
>> https://lists.apache.org/thread.html/4609a1bb1662690d67950e76d2f1108b51327b8feaf9580de659552e@%3Cdev.beam.apache.org%3E
>>
>> [2] 
>> https://docs.google.com/document/d/1ObLVUFsf1NcG8ZuIZE4aVy2RYKx2FfyMhkZYWPnI9-c/
>>

Proposal: @RequiresTimeSortedInput

Posted by Jan Lukavský <je...@seznam.cz>.
Hi,

as a follow-up from previous design draft, I'd like to promote the 
document [1] and associated PR [2] to proposal.

The PR contains working implementation for:

  - non-portable batch flink and batch spark (legacy)

  - all non-portable streaming runners that use StatefulDoFnRunner 
(direct, samza, dataflow)

  - portable flink (batch, streaming)

There are still some unresolved issues:

  a) no way to specify allowed lateness (currently is simply zero, late 
data should be dropped)

  b) need a way to specify user UDF for extracting timestamp (according 
to [3] it would be useful to have that option)

  c) need to add more tests (e.g. late data)

The plan is to postpone resolution of issues a) and b) after the 
proposal is merged. I'd like to gather some more feedback on the 
proposal, iterate over that again, add more tests and then pass this to 
a vote.

Unrelated - during implementation a bug [4] in Samza runner was found.

Looking forward to any comments!

Jan

[1] 
https://docs.google.com/document/d/1ObLVUFsf1NcG8ZuIZE4aVy2RYKx2FfyMhkZYWPnI9-c/ 


[2] https://github.com/apache/beam/pull/8774

[3] 
https://lists.apache.org/thread.html/813429e78b895a336d4f5507e3b2330282e2904fa25d52d6d441741a@%3Cdev.beam.apache.org%3E

[4] https://issues.apache.org/jira/browse/BEAM-8529


On 5/23/19 4:10 PM, Jan Lukavský wrote:
> Hi,
>
> I have written a very brief draft of how it might be possible to 
> implement @RequireTimeSortedInput discussed in [1]. I see the document 
> [2] a starting point for a discussion. There are several open 
> questions, which I believe can be resolved by this great community. :-)
>
> Jan
>
> [1] 
> https://lists.apache.org/thread.html/4609a1bb1662690d67950e76d2f1108b51327b8feaf9580de659552e@%3Cdev.beam.apache.org%3E
>
> [2] 
> https://docs.google.com/document/d/1ObLVUFsf1NcG8ZuIZE4aVy2RYKx2FfyMhkZYWPnI9-c/
>