You are viewing a plain text version of this content. The canonical link for it is here.
Posted to dev@beam.apache.org by Jan Lukavský <je...@seznam.cz> on 2019/10/31 13:59:20 UTC
Proposal: @RequiresTimeSortedInput
Hi,
as a follow-up from previous design draft, I'd like to promote the
document [1] and associated PR [2] to proposal.
The PR contains working implementation for:
- non-portable batch flink and batch spark (legacy)
- all non-portable streaming runners that use StatefulDoFnRunner
(direct, samza, dataflow)
- portable flink (batch, streaming)
There are still some unresolved issues:
a) no way to specify allowed lateness (currently is simply zero, late
data should be dropped)
b) need a way to specify user UDF for extracting timestamp (according
to [3] it would be useful to have that option)
c) need to add more tests (e.g. late data)
The plan is to postpone resolution of issues a) and b) after the
proposal is merged. I'd like to gather some more feedback on the
proposal, iterate over that again, add more tests and then pass this to
a vote.
Unrelated - during implementation a bug [4] in Samza runner was found.
Looking forward to any comments!
Jan
[1]
https://docs.google.com/document/d/1ObLVUFsf1NcG8ZuIZE4aVy2RYKx2FfyMhkZYWPnI9-c/
[2] https://github.com/apache/beam/pull/8774
[3]
https://lists.apache.org/thread.html/813429e78b895a336d4f5507e3b2330282e2904fa25d52d6d441741a@%3Cdev.beam.apache.org%3E
[4] https://issues.apache.org/jira/browse/BEAM-8529
On 5/23/19 4:10 PM, Jan Lukavský wrote:
> Hi,
>
> I have written a very brief draft of how it might be possible to
> implement @RequireTimeSortedInput discussed in [1]. I see the document
> [2] a starting point for a discussion. There are several open
> questions, which I believe can be resolved by this great community. :-)
>
> Jan
>
> [1]
> https://lists.apache.org/thread.html/4609a1bb1662690d67950e76d2f1108b51327b8feaf9580de659552e@%3Cdev.beam.apache.org%3E
>
> [2]
> https://docs.google.com/document/d/1ObLVUFsf1NcG8ZuIZE4aVy2RYKx2FfyMhkZYWPnI9-c/
>
Re: Proposal: @RequiresTimeSortedInput
Posted by Jan Lukavský <je...@seznam.cz>.
Hi Kenn,
I would continue with this discussion on the thread [1] (as you propose
as well) and consider all the other threads regarding this as closed.
I will take your latest note in a reply there.
Jan
[1]
https://lists.apache.org/thread.html/e2f729c7cea22553fc34421d4547132fa1c2ec01035eb4fb1a426873@%3Cdev.beam.apache.org%3E
On 11/15/19 5:56 AM, Kenneth Knowles wrote:
> Hi Jan,
>
> Sorry for the very slow reply.
>
> Your proposed feature is sensitive to all data that is not in
> timestamp order, which is not the same as late. In Beam "late" is
> defined as "assigned to a window where the watermark has passed the
> end of the window and a 'final' aggregate has been produced". Your
> proposal is not really sensitive to this form of late data.
>
> I think there is some published work that will help you particularly
> in addressing out-of-order data. Note that this is not the normal
> notion of late. . Trill has a high-watermark driven sorting buffer
> prior to sending elements in order to stateful operators. It is
> similar to your sketched algorithm for emitting elements as the
> watermark passes. I believe Gearpump also uses a sorting buffer and
> processes in order, and we do have a Gearpump runner still here in our
> repo.
>
> Kenn
>
> On Mon, Nov 4, 2019 at 3:54 AM Jan Lukavský <je.ik@seznam.cz
> <ma...@seznam.cz>> wrote:
>
> Hi,
>
> there has been some development around this [1], which essentially
> concludes that currently this feature can be safely supported only by
> direct runner, flink runner (both batch and streaming, non-portable
> only) and spark (batch, legacy only). This is due to the fact,
> that time
> sorting relies heavily on timers to be strictly ordered. Failing
> to do
> so might result in unpredictable data loss, due to window-cleanup of
> state occurring prior to all elements being emitted (note that this
> generally might happen even to current user pipelines!). I can link
> issues [2], [3] and [4] to [5], but the question is, with only so few
> runners being able to support this, what should be the best way to
> incorporate this into any upcoming release (I'm assuming that this
> will
> pass a vote, which is not known yet)? I'd say that the best way
> would be
> the affected runners to fail to execute the pipeline until the
> respective issues are resolved. Another option would be to block this
> until the issues are resolved in runners, but that might delay the
> availability of this feature for some unknown time.
>
> Thanks for any opinions,
>
> Jan
>
> [1]
> https://lists.apache.org/thread.html/71a8f48ca518f1f2e6e9b1284114624670884775d209b0097f68264b@%3Cdev.beam.apache.org%3E
>
> [2] https://issues.apache.org/jira/browse/BEAM-8459
>
> [3] https://issues.apache.org/jira/browse/BEAM-8460
>
> [4] https://issues.apache.org/jira/browse/BEAM-8543.
>
> [5] https://issues.apache.org/jira/browse/BEAM-8550
>
> On 10/31/19 2:59 PM, Jan Lukavský wrote:
> > Hi,
> >
> > as a follow-up from previous design draft, I'd like to promote the
> > document [1] and associated PR [2] to proposal.
> >
> > The PR contains working implementation for:
> >
> > - non-portable batch flink and batch spark (legacy)
> >
> > - all non-portable streaming runners that use StatefulDoFnRunner
> > (direct, samza, dataflow)
> >
> > - portable flink (batch, streaming)
> >
> > There are still some unresolved issues:
> >
> > a) no way to specify allowed lateness (currently is simply
> zero, late
> > data should be dropped)
> >
> > b) need a way to specify user UDF for extracting timestamp
> (according
> > to [3] it would be useful to have that option)
> >
> > c) need to add more tests (e.g. late data)
> >
> > The plan is to postpone resolution of issues a) and b) after the
> > proposal is merged. I'd like to gather some more feedback on the
> > proposal, iterate over that again, add more tests and then pass
> this
> > to a vote.
> >
> > Unrelated - during implementation a bug [4] in Samza runner was
> found.
> >
> > Looking forward to any comments!
> >
> > Jan
> >
> > [1]
> >
> https://docs.google.com/document/d/1ObLVUFsf1NcG8ZuIZE4aVy2RYKx2FfyMhkZYWPnI9-c/
>
> >
> >
> > [2] https://github.com/apache/beam/pull/8774
> >
> > [3]
> >
> https://lists.apache.org/thread.html/813429e78b895a336d4f5507e3b2330282e2904fa25d52d6d441741a@%3Cdev.beam.apache.org%3E
> >
> > [4] https://issues.apache.org/jira/browse/BEAM-8529
> >
> >
> > On 5/23/19 4:10 PM, Jan Lukavský wrote:
> >> Hi,
> >>
> >> I have written a very brief draft of how it might be possible to
> >> implement @RequireTimeSortedInput discussed in [1]. I see the
> >> document [2] a starting point for a discussion. There are several
> >> open questions, which I believe can be resolved by this great
> >> community. :-)
> >>
> >> Jan
> >>
> >> [1]
> >>
> https://lists.apache.org/thread.html/4609a1bb1662690d67950e76d2f1108b51327b8feaf9580de659552e@%3Cdev.beam.apache.org%3E
> >>
> >> [2]
> >>
> https://docs.google.com/document/d/1ObLVUFsf1NcG8ZuIZE4aVy2RYKx2FfyMhkZYWPnI9-c/
> >>
>
Re: Proposal: @RequiresTimeSortedInput
Posted by Kenneth Knowles <ke...@apache.org>.
Hi Jan,
Sorry for the very slow reply.
Your proposed feature is sensitive to all data that is not in timestamp
order, which is not the same as late. In Beam "late" is defined as
"assigned to a window where the watermark has passed the end of the window
and a 'final' aggregate has been produced". Your proposal is not really
sensitive to this form of late data.
I think there is some published work that will help you particularly in
addressing out-of-order data. Note that this is not the normal notion of
late. . Trill has a high-watermark driven sorting buffer prior to sending
elements in order to stateful operators. It is similar to your sketched
algorithm for emitting elements as the watermark passes. I believe Gearpump
also uses a sorting buffer and processes in order, and we do have a
Gearpump runner still here in our repo.
Kenn
On Mon, Nov 4, 2019 at 3:54 AM Jan Lukavský <je...@seznam.cz> wrote:
> Hi,
>
> there has been some development around this [1], which essentially
> concludes that currently this feature can be safely supported only by
> direct runner, flink runner (both batch and streaming, non-portable
> only) and spark (batch, legacy only). This is due to the fact, that time
> sorting relies heavily on timers to be strictly ordered. Failing to do
> so might result in unpredictable data loss, due to window-cleanup of
> state occurring prior to all elements being emitted (note that this
> generally might happen even to current user pipelines!). I can link
> issues [2], [3] and [4] to [5], but the question is, with only so few
> runners being able to support this, what should be the best way to
> incorporate this into any upcoming release (I'm assuming that this will
> pass a vote, which is not known yet)? I'd say that the best way would be
> the affected runners to fail to execute the pipeline until the
> respective issues are resolved. Another option would be to block this
> until the issues are resolved in runners, but that might delay the
> availability of this feature for some unknown time.
>
> Thanks for any opinions,
>
> Jan
>
> [1]
>
> https://lists.apache.org/thread.html/71a8f48ca518f1f2e6e9b1284114624670884775d209b0097f68264b@%3Cdev.beam.apache.org%3E
>
> [2] https://issues.apache.org/jira/browse/BEAM-8459
>
> [3] https://issues.apache.org/jira/browse/BEAM-8460
>
> [4] https://issues.apache.org/jira/browse/BEAM-8543.
>
> [5] https://issues.apache.org/jira/browse/BEAM-8550
>
> On 10/31/19 2:59 PM, Jan Lukavský wrote:
> > Hi,
> >
> > as a follow-up from previous design draft, I'd like to promote the
> > document [1] and associated PR [2] to proposal.
> >
> > The PR contains working implementation for:
> >
> > - non-portable batch flink and batch spark (legacy)
> >
> > - all non-portable streaming runners that use StatefulDoFnRunner
> > (direct, samza, dataflow)
> >
> > - portable flink (batch, streaming)
> >
> > There are still some unresolved issues:
> >
> > a) no way to specify allowed lateness (currently is simply zero, late
> > data should be dropped)
> >
> > b) need a way to specify user UDF for extracting timestamp (according
> > to [3] it would be useful to have that option)
> >
> > c) need to add more tests (e.g. late data)
> >
> > The plan is to postpone resolution of issues a) and b) after the
> > proposal is merged. I'd like to gather some more feedback on the
> > proposal, iterate over that again, add more tests and then pass this
> > to a vote.
> >
> > Unrelated - during implementation a bug [4] in Samza runner was found.
> >
> > Looking forward to any comments!
> >
> > Jan
> >
> > [1]
> >
> https://docs.google.com/document/d/1ObLVUFsf1NcG8ZuIZE4aVy2RYKx2FfyMhkZYWPnI9-c/
> >
> >
> > [2] https://github.com/apache/beam/pull/8774
> >
> > [3]
> >
> https://lists.apache.org/thread.html/813429e78b895a336d4f5507e3b2330282e2904fa25d52d6d441741a@%3Cdev.beam.apache.org%3E
> >
> > [4] https://issues.apache.org/jira/browse/BEAM-8529
> >
> >
> > On 5/23/19 4:10 PM, Jan Lukavský wrote:
> >> Hi,
> >>
> >> I have written a very brief draft of how it might be possible to
> >> implement @RequireTimeSortedInput discussed in [1]. I see the
> >> document [2] a starting point for a discussion. There are several
> >> open questions, which I believe can be resolved by this great
> >> community. :-)
> >>
> >> Jan
> >>
> >> [1]
> >>
> https://lists.apache.org/thread.html/4609a1bb1662690d67950e76d2f1108b51327b8feaf9580de659552e@%3Cdev.beam.apache.org%3E
> >>
> >> [2]
> >>
> https://docs.google.com/document/d/1ObLVUFsf1NcG8ZuIZE4aVy2RYKx2FfyMhkZYWPnI9-c/
> >>
>
Re: Proposal: @RequiresTimeSortedInput
Posted by Jan Lukavský <je...@seznam.cz>.
Hi,
there has been some development around this [1], which essentially
concludes that currently this feature can be safely supported only by
direct runner, flink runner (both batch and streaming, non-portable
only) and spark (batch, legacy only). This is due to the fact, that time
sorting relies heavily on timers to be strictly ordered. Failing to do
so might result in unpredictable data loss, due to window-cleanup of
state occurring prior to all elements being emitted (note that this
generally might happen even to current user pipelines!). I can link
issues [2], [3] and [4] to [5], but the question is, with only so few
runners being able to support this, what should be the best way to
incorporate this into any upcoming release (I'm assuming that this will
pass a vote, which is not known yet)? I'd say that the best way would be
the affected runners to fail to execute the pipeline until the
respective issues are resolved. Another option would be to block this
until the issues are resolved in runners, but that might delay the
availability of this feature for some unknown time.
Thanks for any opinions,
Jan
[1]
https://lists.apache.org/thread.html/71a8f48ca518f1f2e6e9b1284114624670884775d209b0097f68264b@%3Cdev.beam.apache.org%3E
[2] https://issues.apache.org/jira/browse/BEAM-8459
[3] https://issues.apache.org/jira/browse/BEAM-8460
[4] https://issues.apache.org/jira/browse/BEAM-8543.
[5] https://issues.apache.org/jira/browse/BEAM-8550
On 10/31/19 2:59 PM, Jan Lukavský wrote:
> Hi,
>
> as a follow-up from previous design draft, I'd like to promote the
> document [1] and associated PR [2] to proposal.
>
> The PR contains working implementation for:
>
> - non-portable batch flink and batch spark (legacy)
>
> - all non-portable streaming runners that use StatefulDoFnRunner
> (direct, samza, dataflow)
>
> - portable flink (batch, streaming)
>
> There are still some unresolved issues:
>
> a) no way to specify allowed lateness (currently is simply zero, late
> data should be dropped)
>
> b) need a way to specify user UDF for extracting timestamp (according
> to [3] it would be useful to have that option)
>
> c) need to add more tests (e.g. late data)
>
> The plan is to postpone resolution of issues a) and b) after the
> proposal is merged. I'd like to gather some more feedback on the
> proposal, iterate over that again, add more tests and then pass this
> to a vote.
>
> Unrelated - during implementation a bug [4] in Samza runner was found.
>
> Looking forward to any comments!
>
> Jan
>
> [1]
> https://docs.google.com/document/d/1ObLVUFsf1NcG8ZuIZE4aVy2RYKx2FfyMhkZYWPnI9-c/
>
>
> [2] https://github.com/apache/beam/pull/8774
>
> [3]
> https://lists.apache.org/thread.html/813429e78b895a336d4f5507e3b2330282e2904fa25d52d6d441741a@%3Cdev.beam.apache.org%3E
>
> [4] https://issues.apache.org/jira/browse/BEAM-8529
>
>
> On 5/23/19 4:10 PM, Jan Lukavský wrote:
>> Hi,
>>
>> I have written a very brief draft of how it might be possible to
>> implement @RequireTimeSortedInput discussed in [1]. I see the
>> document [2] a starting point for a discussion. There are several
>> open questions, which I believe can be resolved by this great
>> community. :-)
>>
>> Jan
>>
>> [1]
>> https://lists.apache.org/thread.html/4609a1bb1662690d67950e76d2f1108b51327b8feaf9580de659552e@%3Cdev.beam.apache.org%3E
>>
>> [2]
>> https://docs.google.com/document/d/1ObLVUFsf1NcG8ZuIZE4aVy2RYKx2FfyMhkZYWPnI9-c/
>>