You are viewing a plain text version of this content. The canonical link for it is here.
Posted to dev@beam.apache.org by Jan Lukavský <je...@seznam.cz> on 2020/01/30 12:39:00 UTC

[BEAM-8550] @RequiresTimeSortedInput ready for merge to master

Hi,

PR [1] (issue [2]) went though code review, and according to [3] seems 
to me to be ready for merge. Current state of the implementation is that 
it is supported only for direct runner, legacy flink runner (batch and 
streaming) and legacy spark (batch). It could be supported by all other 
(streaming) runners using StatefulDoFnRunner, provided the runner can 
make guarantees about ordering of timer firings (which is unfortunately 
the case only for legacy flink and direct runner, at least for now - 
related issues are mentioned multiple times on other threads). 
Implementation for other batch runners should be as straightforward as 
adding sorting by event timestamp before stateful dofn (in case where 
the runner doesn't sort already - e.g. Dataflow - in which case the 
annotation can be simply ignored - hence support for batch Dataflow 
seems to be a no-op).

There has been some slight controversy about this feature, but current 
feature proposing and implementing guidelines do not cover how to 
resolve those, so I'm using this opportunity to let the community know, 
that there is a plan to merge this feature, unless there is some veto 
(please provide specific reasons for that in that case). The plan is to 
merge this in the second part of next week, unless there is a veto.

Thanks,

  Jan

[1] https://github.com/apache/beam/pull/8774

[2] https://issues.apache.org/jira/browse/BEAM-8550

[3] https://beam.apache.org/contribute/committer-guide/


Re: [BEAM-8550] @RequiresTimeSortedInput ready for merge to master

Posted by Jan Lukavský <je...@seznam.cz>.
And as a quick summary a pipeline with @RequiresTimeSortedInput will:

  a) work well on streaming pipelines run on direct java and 
non-portable flink, will fail on every other streaming runner

  b) work well on batch non-portable flink, legacy spark and batch dataflow

  c) from what I can tell, the code path seems to be supported for batch 
portable flink as well (if portable batch flink runner uses mostly the 
same code path, which seems it should)

If I'm not mistaken, then the only actual issue seems to be with the new 
spark runner and then all remaining streaming runners (dataflow, 
portable flink, jet, samza). The streaming case can probably be solved 
easily in one shot (as mentioned in previous email).

Jan

On 2/7/20 8:20 PM, Jan Lukavský wrote:
>
> Hi Kenn,
>
> I think that this approach is not well maintainable and doesn't scale. 
> Main reasons:
>
>  a) modifying core has by definition some impact on runners, so 
> modifying core would imply necessity to modify all runners
>
>  b) having to implement core feature for all existing runners will 
> make any modification to core prohibitively expensive
>
>  c) even if we accept this, there can be runners that are outside of 
> beam repo (or even closed source!)
>
> Therefore I think, that the correct and scalable approach would be to 
> split this into several pieces:
>
>  1) define pipeline requirements (this is pretty much similar to how 
> we currently scope @Category(ValidatesRunner.class) tests
>
>  2) let pipeline infer it's requirements prior to being translated via 
> runner
>
>  3) runner can check the set of required features and their support 
> and reject the pipeline if some feature is missing
>
> This could even replace the annotations used in validates runner 
> tests, because each runner would simply execute all tests it has 
> enough features to run.
>
> But as I mentioned - this is pretty much deep change. I don't know how 
> to safely do this for current runners, but to actually implement the 
> feature (it seems to be to me nearly equally complicated to fail 
> pipeline in batch case and to actually implement the sorting). It 
> would be super cool if anyone would be interested in implementing this 
> in runners that don't currently support it. A side note - currently 
> the annotation is not supported by all streaming runners due to 
> missing guarantees for timers ordering (which can lead to data losss). 
> I think I have found a solution to this, see [1], but I'd like to be 
> 100% sure, before enabling the support (I'm not sure what is the 
> impact of mis-ordered timers on output timestamps, and so on, and so 
> forth).
>
> Jan
>
> [1] 
> https://github.com/apache/beam/pull/10795/files#diff-11a02ba72f437b89e35f7ad37102dfd1R209
>
> On 2/7/20 7:53 PM, Kenneth Knowles wrote:
>> I see. It is good to see that the pipeline will at least fail. 
>> However, the expect approach here is that the pipeline is rejected 
>> prior to execution. That is a primary reason for our 
>> annotation-driven API style; it allows much better "static" analysis 
>> by a runner, so we don't have to wait and fail late. Here is an 
>> example: 
>> https://github.com/apache/beam/blob/master/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/DataflowRunner.java#L1940 
>>
>>
>> Kenn
>>
>> On Thu, Feb 6, 2020 at 11:03 PM Jan Lukavský <je.ik@seznam.cz 
>> <ma...@seznam.cz>> wrote:
>>
>>     Hi Kenn,
>>
>>     that should not be the case. Care was taken to fail streaming
>>     pipeline which needs this ability and the runner doesn't support
>>     this [1]. It is true, however, that a batch pipeline will not
>>     fail, because there is no generic (runner agnostic) way of
>>     supporting this transform in batch case (which is why the
>>     annotation was needed). Failing batch pipelines in this case
>>     would mean runners have to understand this annotation, which is
>>     pretty much close to implementing this feature as a whole.
>>
>>     This applies generally to any core functionality, it might take
>>     some time before runners fully support this. I don't know how to
>>     solve it, maybe add record to capability matrix? I can imagine a
>>     fully generic solution (runners might publish their capabilities
>>     and pipeline might be validated against these capabilities at
>>     pipeline build time), but that is obviously out of scope of the
>>     annotation.
>>
>>     Jan
>>
>>     [1]
>>     https://github.com/apache/beam/blob/master/runners/core-java/src/main/java/org/apache/beam/runners/core/DoFnRunners.java#L150
>>
>>     On 2/7/20 1:01 AM, Kenneth Knowles wrote:
>>>     There is a major problem with this merge: the runners that do
>>>     not support it do not reject pipelines that need this feature.
>>>     They will silently produce the wrong answer, causing data loss.
>>>
>>>     Kenn
>>>
>>>     On Thu, Feb 6, 2020 at 3:24 AM Jan Lukavský <je.ik@seznam.cz
>>>     <ma...@seznam.cz>> wrote:
>>>
>>>         Hi,
>>>
>>>         the PR was merged to master and a few follow-up issues, were
>>>         created,
>>>         mainly [1] and [2]. I didn't find any reference to
>>>         SortedMapState in
>>>         JIRA, is there any tracking issue for that that I can link
>>>         to? I also
>>>         added link to design document here [3].
>>>
>>>         [1] https://issues.apache.org/jira/browse/BEAM-9256
>>>
>>>         [2] https://issues.apache.org/jira/browse/BEAM-9257
>>>
>>>         [3]
>>>         https://cwiki.apache.org/confluence/display/BEAM/Design+Documents
>>>
>>>         On 1/30/20 1:39 PM, Jan Lukavský wrote:
>>>         > Hi,
>>>         >
>>>         > PR [1] (issue [2]) went though code review, and according
>>>         to [3] seems
>>>         > to me to be ready for merge. Current state of the
>>>         implementation is
>>>         > that it is supported only for direct runner, legacy flink
>>>         runner
>>>         > (batch and streaming) and legacy spark (batch). It could
>>>         be supported
>>>         > by all other (streaming) runners using StatefulDoFnRunner,
>>>         provided
>>>         > the runner can make guarantees about ordering of timer
>>>         firings (which
>>>         > is unfortunately the case only for legacy flink and direct
>>>         runner, at
>>>         > least for now - related issues are mentioned multiple
>>>         times on other
>>>         > threads). Implementation for other batch runners should be as
>>>         > straightforward as adding sorting by event timestamp
>>>         before stateful
>>>         > dofn (in case where the runner doesn't sort already - e.g.
>>>         Dataflow -
>>>         > in which case the annotation can be simply ignored - hence
>>>         support for
>>>         > batch Dataflow seems to be a no-op).
>>>         >
>>>         > There has been some slight controversy about this feature,
>>>         but current
>>>         > feature proposing and implementing guidelines do not cover
>>>         how to
>>>         > resolve those, so I'm using this opportunity to let the
>>>         community
>>>         > know, that there is a plan to merge this feature, unless
>>>         there is some
>>>         > veto (please provide specific reasons for that in that
>>>         case). The plan
>>>         > is to merge this in the second part of next week, unless
>>>         there is a veto.
>>>         >
>>>         > Thanks,
>>>         >
>>>         >  Jan
>>>         >
>>>         > [1] https://github.com/apache/beam/pull/8774
>>>         >
>>>         > [2] https://issues.apache.org/jira/browse/BEAM-8550
>>>         >
>>>         > [3] https://beam.apache.org/contribute/committer-guide/
>>>         >
>>>

Re: [BEAM-8550] @RequiresTimeSortedInput ready for merge to master

Posted by Jan Lukavský <je...@seznam.cz>.
Hi Robert,

thanks for this insight. I think that this sort of uncovered additional 
question - I'm not saying that I follow every thread in dev@, but I 
didn't notice anything about "trying to stabilize the protos", which is 
again where I think these big milestones probably should be defined in 
front in a form of BIP, BEP or whatever. I will try to get more time to 
invest into this (creating the cwiki, BIP template and some basic first 
BIP to have something to iterate on).

Jan

On 2/7/20 10:51 PM, Robert Bradshaw wrote:
> There are two separable concerns here.
>
> (1) The @RequiresTimeSortedInput feature itself. This is a subtle
> feature needed for certain pipelines, and if anything Jan has gone the
> extra mile discussing, documenting, and designing this and trying to
> reach consensus. I feel like there has been a failure in the community
> (myself included) to either fully accept it, or come up with sound
> reasons to reject it, in a timely manner. (This is one of the things I
> hope BEPs could address.) The feature seems similar in spirit to
> @RequiresStableInputs which I also find a bit icky but can't think of
> a way around. (My ideal implementation for both would be to express
> this in terms of a naive implementation that could be swapped out by
> more advanced runners...) That being said, I don't think we should
> block on this forever.
>
> (2) Especially as we're trying to stabilize the protos, how can one
> safely add constraints like this such that runners will reject rather
> than execute pipelines with unsatisfied constraints? For SDKs, we're
> thinking about adding the notion of capabilities (as a list, or
> possibly mapping, of URNs that get attached to an environment. Perhaps
> a pipeline could likewise have a set of requirements for those "new"
> features that augments what can be inferred by looking at the set of
> transform URNs. In this case, @RequiresTimeSortedInput would be such a
> requirement attached to any pipeline using this feature, and its
> contract would be to look at (and respect) certain bits on the DoFns,
> and a runner must reject any pipeline with unknown requirements. (If
> it understands a requirement, it could reject it based on its ability
> to satisfy the contract as it is actually used in the pipeline).
>
> On Fri, Feb 7, 2020 at 12:31 PM Kenneth Knowles <ke...@apache.org> wrote:
>> TL;DR I am not suggesting that you must implement this for any runner. I'm afraid I do have to propose this change be rolled back before release 2.21.0 unless we fix this. I think the fix is easily achieved.
>>
>> Clarifications inline.
>>
>> On Fri, Feb 7, 2020 at 11:20 AM Jan Lukavský <je...@seznam.cz> wrote:
>>> Hi Kenn,
>>>
>>> I think that this approach is not well maintainable and doesn't scale. Main reasons:
>>>
>>>   a) modifying core has by definition some impact on runners, so modifying core would imply necessity to modify all runners
>> My concern is not about all changes to "core" but only changes to the model, which should be extraordinarily rare. They must receive extreme scrutiny and require a very high level of consensus. It is true that every runner needs to either correctly execute or refuse to execute every pipeline, to the extent possible. For the case we are talking about it is very easy to meet this requirement.
>>
>>>   b) having to implement core feature for all existing runners will make any modification to core prohibitively expensive
>> No one is suggesting this. I am saying that you need to write the 1 line that I linked to "if (usesRequiresTimeSortedInput) then reject pipeline" so the runner fails before it begins processing data, potentially consuming non-replayable messages.
>>
>>>   c) even if we accept this, there can be runners that are outside of beam repo (or even closed source!)
>> Indeed. And those runners need time to adapt to the new proto fields. I did not mention it this time, because the proto is not considered stable. But very soon it will be. At that point additions like this will have to be fully specified and added to the proto long before they are enabled for use. That way all runners can adjust. The proper order is (1) add model feature (2) make runners reject it, unsupported (3) add functionality to SDK (4) add to some runners and enable.
>>
>>> Therefore I think, that the correct and scalable approach would be to split this into several pieces:
>>>
>>>   1) define pipeline requirements (this is pretty much similar to how we currently scope @Category(ValidatesRunner.class) tests
>>>
>>>   2) let pipeline infer it's requirements prior to being translated via runner
>>>
>>>   3) runner can check the set of required features and their support and reject the pipeline if some feature is missing
>> This is exactly what happens today, but was not included in your change. The pipeline proto (or the Java pipeline object) clearly contain all the needed information. Whether pipeline summarizes it or the runner implements a trivial PipelineVisitor is not important.
>>
>>> This could even replace the annotations used in validates runner tests, because each runner would simply execute all tests it has enough features to run.
>> What you have described is exactly what happens today.
>>
>>> But as I mentioned - this is pretty much deep change. I don't know how to safely do this for current runners, but to actually implement the feature (it seems to be to me nearly equally complicated to fail pipeline in batch case and to actually implement the sorting).
>> Indeed. This feature hasn't really got consensus. The proposal thread [1] never really concluded affirmatively [1]. The [VOTE] thread indicates a clear *lack* of consensus, with all people who weighed in asking to raise awareness and build more support and consensus. Robert made the good point that if it is (a) useful and (b) not easy for users to do themselves, then we should consider it, even if most people here are not interested in the feature. So that is the closest thing to approval that this feature has. But getting more people interested and on board would get better feedback and achieve a better result for our users.
>>
>> And as a final note, the PR was not reviewed by the core people who built out state & timers, nor those who built out DoFn annotation systems, nor any runner author, nor those working on the Beam model protos. You really should have gotten most of these people involved. They would likely have caught the issues described here.
>>
>> The specific action that I am proposing is to implement the 1 liner described in all runners. It might be best to roll back and proceed with steps 1-4 I outlined above, so we can be sure things are proceeding well.
>>
>> Kenn
>>
>> [1] https://lists.apache.org/thread.html/b91f96121d37bf16403acbd88bc264cf16e40ddb636f0435276e89aa%40%3Cdev.beam.apache.org%3E
>> [2] https://lists.apache.org/thread.html/91b87940ba7736f9f1021928271a0090f8a0096e5e3f9e52de89acf2%40%3Cdev.beam.apache.org%3E
>>> It would be super cool if anyone would be interested in implementing this in runners that don't currently support it. A side note - currently the annotation is not supported by all streaming runners due to missing guarantees for timers ordering (which can lead to data losss). I think I have found a solution to this, see [1], but I'd like to be 100% sure, before enabling the support (I'm not sure what is the impact of mis-ordered timers on output timestamps, and so on, and so forth).
>>>
>>> Jan
>>>
>>> [1] https://github.com/apache/beam/pull/10795/files#diff-11a02ba72f437b89e35f7ad37102dfd1R209
>>>
>>> On 2/7/20 7:53 PM, Kenneth Knowles wrote:
>>>
>>> I see. It is good to see that the pipeline will at least fail. However, the expect approach here is that the pipeline is rejected prior to execution. That is a primary reason for our annotation-driven API style; it allows much better "static" analysis by a runner, so we don't have to wait and fail late. Here is an example: https://github.com/apache/beam/blob/master/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/DataflowRunner.java#L1940
>>>
>>> Kenn
>>>
>>> On Thu, Feb 6, 2020 at 11:03 PM Jan Lukavský <je...@seznam.cz> wrote:
>>>> Hi Kenn,
>>>>
>>>> that should not be the case. Care was taken to fail streaming pipeline which needs this ability and the runner doesn't support this [1]. It is true, however, that a batch pipeline will not fail, because there is no generic (runner agnostic) way of supporting this transform in batch case (which is why the annotation was needed). Failing batch pipelines in this case would mean runners have to understand this annotation, which is pretty much close to implementing this feature as a whole.
>>>>
>>>> This applies generally to any core functionality, it might take some time before runners fully support this. I don't know how to solve it, maybe add record to capability matrix? I can imagine a fully generic solution (runners might publish their capabilities and pipeline might be validated against these capabilities at pipeline build time), but that is obviously out of scope of the annotation.
>>>>
>>>> Jan
>>>>
>>>> [1] https://github.com/apache/beam/blob/master/runners/core-java/src/main/java/org/apache/beam/runners/core/DoFnRunners.java#L150
>>>>
>>>> On 2/7/20 1:01 AM, Kenneth Knowles wrote:
>>>>
>>>> There is a major problem with this merge: the runners that do not support it do not reject pipelines that need this feature. They will silently produce the wrong answer, causing data loss.
>>>>
>>>> Kenn
>>>>
>>>> On Thu, Feb 6, 2020 at 3:24 AM Jan Lukavský <je...@seznam.cz> wrote:
>>>>> Hi,
>>>>>
>>>>> the PR was merged to master and a few follow-up issues, were created,
>>>>> mainly [1] and [2]. I didn't find any reference to SortedMapState in
>>>>> JIRA, is there any tracking issue for that that I can link to? I also
>>>>> added link to design document here [3].
>>>>>
>>>>> [1] https://issues.apache.org/jira/browse/BEAM-9256
>>>>>
>>>>> [2] https://issues.apache.org/jira/browse/BEAM-9257
>>>>>
>>>>> [3] https://cwiki.apache.org/confluence/display/BEAM/Design+Documents
>>>>>
>>>>> On 1/30/20 1:39 PM, Jan Lukavský wrote:
>>>>>> Hi,
>>>>>>
>>>>>> PR [1] (issue [2]) went though code review, and according to [3] seems
>>>>>> to me to be ready for merge. Current state of the implementation is
>>>>>> that it is supported only for direct runner, legacy flink runner
>>>>>> (batch and streaming) and legacy spark (batch). It could be supported
>>>>>> by all other (streaming) runners using StatefulDoFnRunner, provided
>>>>>> the runner can make guarantees about ordering of timer firings (which
>>>>>> is unfortunately the case only for legacy flink and direct runner, at
>>>>>> least for now - related issues are mentioned multiple times on other
>>>>>> threads). Implementation for other batch runners should be as
>>>>>> straightforward as adding sorting by event timestamp before stateful
>>>>>> dofn (in case where the runner doesn't sort already - e.g. Dataflow -
>>>>>> in which case the annotation can be simply ignored - hence support for
>>>>>> batch Dataflow seems to be a no-op).
>>>>>>
>>>>>> There has been some slight controversy about this feature, but current
>>>>>> feature proposing and implementing guidelines do not cover how to
>>>>>> resolve those, so I'm using this opportunity to let the community
>>>>>> know, that there is a plan to merge this feature, unless there is some
>>>>>> veto (please provide specific reasons for that in that case). The plan
>>>>>> is to merge this in the second part of next week, unless there is a veto.
>>>>>>
>>>>>> Thanks,
>>>>>>
>>>>>>   Jan
>>>>>>
>>>>>> [1] https://github.com/apache/beam/pull/8774
>>>>>>
>>>>>> [2] https://issues.apache.org/jira/browse/BEAM-8550
>>>>>>
>>>>>> [3] https://beam.apache.org/contribute/committer-guide/
>>>>>>

Re: [BEAM-8550] @RequiresTimeSortedInput ready for merge to master

Posted by Robert Bradshaw <ro...@google.com>.
There are two separable concerns here.

(1) The @RequiresTimeSortedInput feature itself. This is a subtle
feature needed for certain pipelines, and if anything Jan has gone the
extra mile discussing, documenting, and designing this and trying to
reach consensus. I feel like there has been a failure in the community
(myself included) to either fully accept it, or come up with sound
reasons to reject it, in a timely manner. (This is one of the things I
hope BEPs could address.) The feature seems similar in spirit to
@RequiresStableInputs which I also find a bit icky but can't think of
a way around. (My ideal implementation for both would be to express
this in terms of a naive implementation that could be swapped out by
more advanced runners...) That being said, I don't think we should
block on this forever.

(2) Especially as we're trying to stabilize the protos, how can one
safely add constraints like this such that runners will reject rather
than execute pipelines with unsatisfied constraints? For SDKs, we're
thinking about adding the notion of capabilities (as a list, or
possibly mapping, of URNs that get attached to an environment. Perhaps
a pipeline could likewise have a set of requirements for those "new"
features that augments what can be inferred by looking at the set of
transform URNs. In this case, @RequiresTimeSortedInput would be such a
requirement attached to any pipeline using this feature, and its
contract would be to look at (and respect) certain bits on the DoFns,
and a runner must reject any pipeline with unknown requirements. (If
it understands a requirement, it could reject it based on its ability
to satisfy the contract as it is actually used in the pipeline).

On Fri, Feb 7, 2020 at 12:31 PM Kenneth Knowles <ke...@apache.org> wrote:
>
> TL;DR I am not suggesting that you must implement this for any runner. I'm afraid I do have to propose this change be rolled back before release 2.21.0 unless we fix this. I think the fix is easily achieved.
>
> Clarifications inline.
>
> On Fri, Feb 7, 2020 at 11:20 AM Jan Lukavský <je...@seznam.cz> wrote:
>>
>> Hi Kenn,
>>
>> I think that this approach is not well maintainable and doesn't scale. Main reasons:
>>
>>  a) modifying core has by definition some impact on runners, so modifying core would imply necessity to modify all runners
>
> My concern is not about all changes to "core" but only changes to the model, which should be extraordinarily rare. They must receive extreme scrutiny and require a very high level of consensus. It is true that every runner needs to either correctly execute or refuse to execute every pipeline, to the extent possible. For the case we are talking about it is very easy to meet this requirement.
>
>>  b) having to implement core feature for all existing runners will make any modification to core prohibitively expensive
>
> No one is suggesting this. I am saying that you need to write the 1 line that I linked to "if (usesRequiresTimeSortedInput) then reject pipeline" so the runner fails before it begins processing data, potentially consuming non-replayable messages.
>
>>
>>  c) even if we accept this, there can be runners that are outside of beam repo (or even closed source!)
>
> Indeed. And those runners need time to adapt to the new proto fields. I did not mention it this time, because the proto is not considered stable. But very soon it will be. At that point additions like this will have to be fully specified and added to the proto long before they are enabled for use. That way all runners can adjust. The proper order is (1) add model feature (2) make runners reject it, unsupported (3) add functionality to SDK (4) add to some runners and enable.
>
>>
>> Therefore I think, that the correct and scalable approach would be to split this into several pieces:
>>
>>  1) define pipeline requirements (this is pretty much similar to how we currently scope @Category(ValidatesRunner.class) tests
>>
>>  2) let pipeline infer it's requirements prior to being translated via runner
>>
>>  3) runner can check the set of required features and their support and reject the pipeline if some feature is missing
>
> This is exactly what happens today, but was not included in your change. The pipeline proto (or the Java pipeline object) clearly contain all the needed information. Whether pipeline summarizes it or the runner implements a trivial PipelineVisitor is not important.
>
>> This could even replace the annotations used in validates runner tests, because each runner would simply execute all tests it has enough features to run.
>
> What you have described is exactly what happens today.
>
>>
>> But as I mentioned - this is pretty much deep change. I don't know how to safely do this for current runners, but to actually implement the feature (it seems to be to me nearly equally complicated to fail pipeline in batch case and to actually implement the sorting).
>
> Indeed. This feature hasn't really got consensus. The proposal thread [1] never really concluded affirmatively [1]. The [VOTE] thread indicates a clear *lack* of consensus, with all people who weighed in asking to raise awareness and build more support and consensus. Robert made the good point that if it is (a) useful and (b) not easy for users to do themselves, then we should consider it, even if most people here are not interested in the feature. So that is the closest thing to approval that this feature has. But getting more people interested and on board would get better feedback and achieve a better result for our users.
>
> And as a final note, the PR was not reviewed by the core people who built out state & timers, nor those who built out DoFn annotation systems, nor any runner author, nor those working on the Beam model protos. You really should have gotten most of these people involved. They would likely have caught the issues described here.
>
> The specific action that I am proposing is to implement the 1 liner described in all runners. It might be best to roll back and proceed with steps 1-4 I outlined above, so we can be sure things are proceeding well.
>
> Kenn
>
> [1] https://lists.apache.org/thread.html/b91f96121d37bf16403acbd88bc264cf16e40ddb636f0435276e89aa%40%3Cdev.beam.apache.org%3E
> [2] https://lists.apache.org/thread.html/91b87940ba7736f9f1021928271a0090f8a0096e5e3f9e52de89acf2%40%3Cdev.beam.apache.org%3E
>>
>> It would be super cool if anyone would be interested in implementing this in runners that don't currently support it. A side note - currently the annotation is not supported by all streaming runners due to missing guarantees for timers ordering (which can lead to data losss). I think I have found a solution to this, see [1], but I'd like to be 100% sure, before enabling the support (I'm not sure what is the impact of mis-ordered timers on output timestamps, and so on, and so forth).
>>
>> Jan
>>
>> [1] https://github.com/apache/beam/pull/10795/files#diff-11a02ba72f437b89e35f7ad37102dfd1R209
>>
>> On 2/7/20 7:53 PM, Kenneth Knowles wrote:
>>
>> I see. It is good to see that the pipeline will at least fail. However, the expect approach here is that the pipeline is rejected prior to execution. That is a primary reason for our annotation-driven API style; it allows much better "static" analysis by a runner, so we don't have to wait and fail late. Here is an example: https://github.com/apache/beam/blob/master/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/DataflowRunner.java#L1940
>>
>> Kenn
>>
>> On Thu, Feb 6, 2020 at 11:03 PM Jan Lukavský <je...@seznam.cz> wrote:
>>>
>>> Hi Kenn,
>>>
>>> that should not be the case. Care was taken to fail streaming pipeline which needs this ability and the runner doesn't support this [1]. It is true, however, that a batch pipeline will not fail, because there is no generic (runner agnostic) way of supporting this transform in batch case (which is why the annotation was needed). Failing batch pipelines in this case would mean runners have to understand this annotation, which is pretty much close to implementing this feature as a whole.
>>>
>>> This applies generally to any core functionality, it might take some time before runners fully support this. I don't know how to solve it, maybe add record to capability matrix? I can imagine a fully generic solution (runners might publish their capabilities and pipeline might be validated against these capabilities at pipeline build time), but that is obviously out of scope of the annotation.
>>>
>>> Jan
>>>
>>> [1] https://github.com/apache/beam/blob/master/runners/core-java/src/main/java/org/apache/beam/runners/core/DoFnRunners.java#L150
>>>
>>> On 2/7/20 1:01 AM, Kenneth Knowles wrote:
>>>
>>> There is a major problem with this merge: the runners that do not support it do not reject pipelines that need this feature. They will silently produce the wrong answer, causing data loss.
>>>
>>> Kenn
>>>
>>> On Thu, Feb 6, 2020 at 3:24 AM Jan Lukavský <je...@seznam.cz> wrote:
>>>>
>>>> Hi,
>>>>
>>>> the PR was merged to master and a few follow-up issues, were created,
>>>> mainly [1] and [2]. I didn't find any reference to SortedMapState in
>>>> JIRA, is there any tracking issue for that that I can link to? I also
>>>> added link to design document here [3].
>>>>
>>>> [1] https://issues.apache.org/jira/browse/BEAM-9256
>>>>
>>>> [2] https://issues.apache.org/jira/browse/BEAM-9257
>>>>
>>>> [3] https://cwiki.apache.org/confluence/display/BEAM/Design+Documents
>>>>
>>>> On 1/30/20 1:39 PM, Jan Lukavský wrote:
>>>> > Hi,
>>>> >
>>>> > PR [1] (issue [2]) went though code review, and according to [3] seems
>>>> > to me to be ready for merge. Current state of the implementation is
>>>> > that it is supported only for direct runner, legacy flink runner
>>>> > (batch and streaming) and legacy spark (batch). It could be supported
>>>> > by all other (streaming) runners using StatefulDoFnRunner, provided
>>>> > the runner can make guarantees about ordering of timer firings (which
>>>> > is unfortunately the case only for legacy flink and direct runner, at
>>>> > least for now - related issues are mentioned multiple times on other
>>>> > threads). Implementation for other batch runners should be as
>>>> > straightforward as adding sorting by event timestamp before stateful
>>>> > dofn (in case where the runner doesn't sort already - e.g. Dataflow -
>>>> > in which case the annotation can be simply ignored - hence support for
>>>> > batch Dataflow seems to be a no-op).
>>>> >
>>>> > There has been some slight controversy about this feature, but current
>>>> > feature proposing and implementing guidelines do not cover how to
>>>> > resolve those, so I'm using this opportunity to let the community
>>>> > know, that there is a plan to merge this feature, unless there is some
>>>> > veto (please provide specific reasons for that in that case). The plan
>>>> > is to merge this in the second part of next week, unless there is a veto.
>>>> >
>>>> > Thanks,
>>>> >
>>>> >  Jan
>>>> >
>>>> > [1] https://github.com/apache/beam/pull/8774
>>>> >
>>>> > [2] https://issues.apache.org/jira/browse/BEAM-8550
>>>> >
>>>> > [3] https://beam.apache.org/contribute/committer-guide/
>>>> >

Re: [BEAM-8550] @RequiresTimeSortedInput ready for merge to master

Posted by Jan Lukavský <je...@seznam.cz>.
I created https://issues.apache.org/jira/browse/BEAM-9273 and will send 
a PR for that in a few days.

On 2/8/20 12:04 AM, Kenneth Knowles wrote:
> Regarding StatefulDoFnRunner: this fails during pipeline execution, 
> too late, and as you noted is just a utility that a runner may 
> optionally use. The change needs to be in the runner's run() method 
> prior to execution starting. Here is a specific PR that demonstrates 
> the technique: https://github.com/apache/beam/pull/3420. You could 
> make some generalized shared code that runner's could share I suppose.
>
> Regarding having a pipeline-level summary of required features: this 
> introduces an opportunity for inconsistency where the pipeline misses 
> some needed features. Jan added the needed info the proto so it can be 
> scraped out easily, though it is still easy for a runner to just not 
> be updated. So that's a proto design issue. With PTransform URNs if 
> there is a leaf PTransform with an unknown URN the runner necessarily 
> fails. That is a better model.
>
> Regarding consensus: it is true that Jan did reach out repeatedly and 
> it was the community that didn't engage. It is reasonable to move to 
> implementation eventually. Yet we still need to avoid the code-level 
> issues so need review. On this PR, to name names, I would consider 
> Reuven or Luke to be valuable reviewers. Also the email thread suffers 
> from a tragedy of the commons. You did directly ask for review. But 
> mentions and using GitHub's "review request" are probably a good way 
> to get the PR actually onto someone's dashboard.
>
> Kenn
>
> On Fri, Feb 7, 2020 at 1:52 PM Jan Lukavský <je.ik@seznam.cz 
> <ma...@seznam.cz>> wrote:
>
>     I reviewed closely the runners ad it seems to me that:
>
>      - all batch runners that would fail to support the annotation
>     will fail already (spark structured streaming, apex) due to
>     missing support for state or timers
>
>      - streaming runners must explicitly enable this, _as long as they
>     use StatefulDoFnRunner_, which is the case for apex, flink and samza
>
>     I will explicitly disable any pipeline with this annotation for:
>
>      - dataflow, jet and gearpump (because I don't see usage of
>     StatefulDoFnRunner, although I though there was one, that's my
>     mistake)
>
>      - all batch runners should either support the annotation or fail
>     already (due to missing support for state or timers)
>
>     Does this proposal solve the issues you see?
>
>     Regarding the process of introducing this annotation I tried
>     really hard to get to the best consensus I could. The same holds
>     true for getting core people involved in the review process
>     (explicitly mentioned in the PR, multiple mailing list threads).
>     The PR was opened for discussion for more than half a year. But
>     because I agree with you, I proposed the BIP, so that we can have
>     a more explicit process for arriving at a consensus for features
>     like this. I'd be happy though, if we can get to consensus about
>     what to do now (if the steps I wrote above will solve every
>     doubts) and have a deeper process for similar features for future
>     cases. As I mentioned this feature is already implemented and
>     having open PR into core for nearly a year is expensive to keep it
>     in sync with master.
>
>     On 2/7/20 9:31 PM, Kenneth Knowles wrote:
>>     TL;DR I am not suggesting that you must implement this for any
>>     runner. I'm afraid I do have to propose this change be rolled
>>     back before release 2.21.0 unless we fix this. I think the fix is
>>     easily achieved.
>>
>>     Clarifications inline.
>>
>>     On Fri, Feb 7, 2020 at 11:20 AM Jan Lukavský <je.ik@seznam.cz
>>     <ma...@seznam.cz>> wrote:
>>
>>         Hi Kenn,
>>
>>         I think that this approach is not well maintainable and
>>         doesn't scale. Main reasons:
>>
>>          a) modifying core has by definition some impact on runners,
>>         so modifying core would imply necessity to modify all runners
>>
>>     My concern is not about all changes to "core" but only changes to
>>     the model, which should be extraordinarily rare. They must
>>     receive extreme scrutiny and require a very high level of
>>     consensus. It is true that every runner needs to either correctly
>>     execute or refuse to execute every pipeline, to the extent
>>     possible. For the case we are talking about it is very easy to
>>     meet this requirement.
>>
>>          b) having to implement core feature for all existing runners
>>         will make any modification to core prohibitively expensive
>>
>>     No one is suggesting this. I am saying that you need to write the
>>     1 line that I linked to "if (usesRequiresTimeSortedInput) then
>>     reject pipeline" so the runner fails before it begins processing
>>     data, potentially consuming non-replayable messages.
>>
>>          c) even if we accept this, there can be runners that are
>>         outside of beam repo (or even closed source!)
>>
>>     Indeed. And those runners need time to adapt to the new proto
>>     fields. I did not mention it this time, because the proto is not
>>     considered stable. But very soon it will be. At that point
>>     additions like this will have to be fully specified and added to
>>     the proto long before they are enabled for use. That way all
>>     runners can adjust. The proper order is (1) add model feature (2)
>>     make runners reject it, unsupported (3) add functionality to SDK
>>     (4) add to some runners and enable.
>>
>>         Therefore I think, that the correct and scalable approach
>>         would be to split this into several pieces:
>>
>>          1) define pipeline requirements (this is pretty much similar
>>         to how we currently scope @Category(ValidatesRunner.class) tests
>>
>>          2) let pipeline infer it's requirements prior to being
>>         translated via runner
>>
>>          3) runner can check the set of required features and their
>>         support and reject the pipeline if some feature is missing
>>
>>     This is exactly what happens today, but was not included in your
>>     change. The pipeline proto (or the Java pipeline object) clearly
>>     contain all the needed information. Whether pipeline summarizes
>>     it or the runner implements a trivial PipelineVisitor is not
>>     important.
>>
>>         This could even replace the annotations used in validates
>>         runner tests, because each runner would simply execute all
>>         tests it has enough features to run.
>>
>>     What you have described is exactly what happens today.
>>
>>         But as I mentioned - this is pretty much deep change. I don't
>>         know how to safely do this for current runners, but to
>>         actually implement the feature (it seems to be to me nearly
>>         equally complicated to fail pipeline in batch case and to
>>         actually implement the sorting).
>>
>>     Indeed. This feature hasn't really got consensus. The proposal
>>     thread [1] never really concluded affirmatively [1]. The [VOTE]
>>     thread indicates a clear *lack* of consensus, with all people who
>>     weighed in asking to raise awareness and build more support and
>>     consensus. Robert made the good point that if it is (a) useful
>>     and (b) not easy for users to do themselves, then we should
>>     consider it, even if most people here are not interested in the
>>     feature. So that is the closest thing to approval that this
>>     feature has. But getting more people interested and on board
>>     would get better feedback and achieve a better result for our users.
>>
>>     And as a final note, the PR was not reviewed by the core people
>>     who built out state & timers, nor those who built out DoFn
>>     annotation systems, nor any runner author, nor those working on
>>     the Beam model protos. You really should have gotten most of
>>     these people involved. They would likely have caught the issues
>>     described here.
>>
>>     The specific action that I am proposing is to implement the 1
>>     liner described in all runners. It might be best to roll back and
>>     proceed with steps 1-4 I outlined above, so we can be sure things
>>     are proceeding well.
>>
>>     Kenn
>>
>>     [1]
>>     https://lists.apache.org/thread.html/b91f96121d37bf16403acbd88bc264cf16e40ddb636f0435276e89aa%40%3Cdev.beam.apache.org%3E
>>     [2]
>>     https://lists.apache.org/thread.html/91b87940ba7736f9f1021928271a0090f8a0096e5e3f9e52de89acf2%40%3Cdev.beam.apache.org%3E
>>
>>         It would be super cool if anyone would be interested in
>>         implementing this in runners that don't currently support it.
>>         A side note - currently the annotation is not supported by
>>         all streaming runners due to missing guarantees for timers
>>         ordering (which can lead to data losss). I think I have found
>>         a solution to this, see [1], but I'd like to be 100% sure,
>>         before enabling the support (I'm not sure what is the impact
>>         of mis-ordered timers on output timestamps, and so on, and so
>>         forth).
>>
>>         Jan
>>
>>         [1]
>>         https://github.com/apache/beam/pull/10795/files#diff-11a02ba72f437b89e35f7ad37102dfd1R209
>>
>>         On 2/7/20 7:53 PM, Kenneth Knowles wrote:
>>>         I see. It is good to see that the pipeline will at least
>>>         fail. However, the expect approach here is that the pipeline
>>>         is rejected prior to execution. That is a primary reason for
>>>         our annotation-driven API style; it allows much better
>>>         "static" analysis by a runner, so we don't have to wait and
>>>         fail late. Here is an example:
>>>         https://github.com/apache/beam/blob/master/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/DataflowRunner.java#L1940
>>>
>>>
>>>         Kenn
>>>
>>>         On Thu, Feb 6, 2020 at 11:03 PM Jan Lukavský
>>>         <je.ik@seznam.cz <ma...@seznam.cz>> wrote:
>>>
>>>             Hi Kenn,
>>>
>>>             that should not be the case. Care was taken to fail
>>>             streaming pipeline which needs this ability and the
>>>             runner doesn't support this [1]. It is true, however,
>>>             that a batch pipeline will not fail, because there is no
>>>             generic (runner agnostic) way of supporting this
>>>             transform in batch case (which is why the annotation was
>>>             needed). Failing batch pipelines in this case would mean
>>>             runners have to understand this annotation, which is
>>>             pretty much close to implementing this feature as a whole.
>>>
>>>             This applies generally to any core functionality, it
>>>             might take some time before runners fully support this.
>>>             I don't know how to solve it, maybe add record to
>>>             capability matrix? I can imagine a fully generic
>>>             solution (runners might publish their capabilities and
>>>             pipeline might be validated against these capabilities
>>>             at pipeline build time), but that is obviously out of
>>>             scope of the annotation.
>>>
>>>             Jan
>>>
>>>             [1]
>>>             https://github.com/apache/beam/blob/master/runners/core-java/src/main/java/org/apache/beam/runners/core/DoFnRunners.java#L150
>>>
>>>             On 2/7/20 1:01 AM, Kenneth Knowles wrote:
>>>>             There is a major problem with this merge: the runners
>>>>             that do not support it do not reject pipelines that
>>>>             need this feature. They will silently produce the wrong
>>>>             answer, causing data loss.
>>>>
>>>>             Kenn
>>>>
>>>>             On Thu, Feb 6, 2020 at 3:24 AM Jan Lukavský
>>>>             <je.ik@seznam.cz <ma...@seznam.cz>> wrote:
>>>>
>>>>                 Hi,
>>>>
>>>>                 the PR was merged to master and a few follow-up
>>>>                 issues, were created,
>>>>                 mainly [1] and [2]. I didn't find any reference to
>>>>                 SortedMapState in
>>>>                 JIRA, is there any tracking issue for that that I
>>>>                 can link to? I also
>>>>                 added link to design document here [3].
>>>>
>>>>                 [1] https://issues.apache.org/jira/browse/BEAM-9256
>>>>
>>>>                 [2] https://issues.apache.org/jira/browse/BEAM-9257
>>>>
>>>>                 [3]
>>>>                 https://cwiki.apache.org/confluence/display/BEAM/Design+Documents
>>>>
>>>>                 On 1/30/20 1:39 PM, Jan Lukavský wrote:
>>>>                 > Hi,
>>>>                 >
>>>>                 > PR [1] (issue [2]) went though code review, and
>>>>                 according to [3] seems
>>>>                 > to me to be ready for merge. Current state of the
>>>>                 implementation is
>>>>                 > that it is supported only for direct runner,
>>>>                 legacy flink runner
>>>>                 > (batch and streaming) and legacy spark (batch).
>>>>                 It could be supported
>>>>                 > by all other (streaming) runners using
>>>>                 StatefulDoFnRunner, provided
>>>>                 > the runner can make guarantees about ordering of
>>>>                 timer firings (which
>>>>                 > is unfortunately the case only for legacy flink
>>>>                 and direct runner, at
>>>>                 > least for now - related issues are mentioned
>>>>                 multiple times on other
>>>>                 > threads). Implementation for other batch runners
>>>>                 should be as
>>>>                 > straightforward as adding sorting by event
>>>>                 timestamp before stateful
>>>>                 > dofn (in case where the runner doesn't sort
>>>>                 already - e.g. Dataflow -
>>>>                 > in which case the annotation can be simply
>>>>                 ignored - hence support for
>>>>                 > batch Dataflow seems to be a no-op).
>>>>                 >
>>>>                 > There has been some slight controversy about this
>>>>                 feature, but current
>>>>                 > feature proposing and implementing guidelines do
>>>>                 not cover how to
>>>>                 > resolve those, so I'm using this opportunity to
>>>>                 let the community
>>>>                 > know, that there is a plan to merge this feature,
>>>>                 unless there is some
>>>>                 > veto (please provide specific reasons for that in
>>>>                 that case). The plan
>>>>                 > is to merge this in the second part of next week,
>>>>                 unless there is a veto.
>>>>                 >
>>>>                 > Thanks,
>>>>                 >
>>>>                 >  Jan
>>>>                 >
>>>>                 > [1] https://github.com/apache/beam/pull/8774
>>>>                 >
>>>>                 > [2] https://issues.apache.org/jira/browse/BEAM-8550
>>>>                 >
>>>>                 > [3]
>>>>                 https://beam.apache.org/contribute/committer-guide/
>>>>                 >
>>>>

Re: [BEAM-8550] @RequiresTimeSortedInput ready for merge to master

Posted by Kenneth Knowles <ke...@apache.org>.
Regarding StatefulDoFnRunner: this fails during pipeline execution, too
late, and as you noted is just a utility that a runner may optionally use.
The change needs to be in the runner's run() method prior to execution
starting. Here is a specific PR that demonstrates the technique:
https://github.com/apache/beam/pull/3420. You could make some generalized
shared code that runner's could share I suppose.

Regarding having a pipeline-level summary of required features: this
introduces an opportunity for inconsistency where the pipeline misses some
needed features. Jan added the needed info the proto so it can be scraped
out easily, though it is still easy for a runner to just not be updated. So
that's a proto design issue. With PTransform URNs if there is a leaf
PTransform with an unknown URN the runner necessarily fails. That is a
better model.

Regarding consensus: it is true that Jan did reach out repeatedly and it
was the community that didn't engage. It is reasonable to move to
implementation eventually. Yet we still need to avoid the code-level issues
so need review. On this PR, to name names, I would consider Reuven or Luke
to be valuable reviewers. Also the email thread suffers from a tragedy of
the commons. You did directly ask for review. But mentions and using
GitHub's "review request" are probably a good way to get the PR actually
onto someone's dashboard.

Kenn

On Fri, Feb 7, 2020 at 1:52 PM Jan Lukavský <je...@seznam.cz> wrote:

> I reviewed closely the runners ad it seems to me that:
>
>  - all batch runners that would fail to support the annotation will fail
> already (spark structured streaming, apex) due to missing support for state
> or timers
>
>  - streaming runners must explicitly enable this, _as long as they use
> StatefulDoFnRunner_, which is the case for apex, flink and samza
>
> I will explicitly disable any pipeline with this annotation for:
>
>  - dataflow, jet and gearpump (because I don't see usage of
> StatefulDoFnRunner, although I though there was one, that's my mistake)
>
>  - all batch runners should either support the annotation or fail already
> (due to missing support for state or timers)
>
> Does this proposal solve the issues you see?
>
> Regarding the process of introducing this annotation I tried really hard
> to get to the best consensus I could. The same holds true for getting core
> people involved in the review process (explicitly mentioned in the PR,
> multiple mailing list threads). The PR was opened for discussion for more
> than half a year. But because I agree with you, I proposed the BIP, so that
> we can have a more explicit process for arriving at a consensus for
> features like this. I'd be happy though, if we can get to consensus about
> what to do now (if the steps I wrote above will solve every doubts) and
> have a deeper process for similar features for future cases. As I mentioned
> this feature is already implemented and having open PR into core for nearly
> a year is expensive to keep it in sync with master.
> On 2/7/20 9:31 PM, Kenneth Knowles wrote:
>
> TL;DR I am not suggesting that you must implement this for any runner. I'm
> afraid I do have to propose this change be rolled back before release
> 2.21.0 unless we fix this. I think the fix is easily achieved.
>
> Clarifications inline.
>
> On Fri, Feb 7, 2020 at 11:20 AM Jan Lukavský <je...@seznam.cz> wrote:
>
>> Hi Kenn,
>>
>> I think that this approach is not well maintainable and doesn't scale.
>> Main reasons:
>>
>>  a) modifying core has by definition some impact on runners, so modifying
>> core would imply necessity to modify all runners
>>
> My concern is not about all changes to "core" but only changes to the
> model, which should be extraordinarily rare. They must receive extreme
> scrutiny and require a very high level of consensus. It is true that every
> runner needs to either correctly execute or refuse to execute every
> pipeline, to the extent possible. For the case we are talking about it is
> very easy to meet this requirement.
>
>  b) having to implement core feature for all existing runners will make
>> any modification to core prohibitively expensive
>>
> No one is suggesting this. I am saying that you need to write the 1 line
> that I linked to "if (usesRequiresTimeSortedInput) then reject pipeline" so
> the runner fails before it begins processing data, potentially consuming
> non-replayable messages.
>
>
>>  c) even if we accept this, there can be runners that are outside of beam
>> repo (or even closed source!)
>>
> Indeed. And those runners need time to adapt to the new proto fields. I
> did not mention it this time, because the proto is not considered stable.
> But very soon it will be. At that point additions like this will have to be
> fully specified and added to the proto long before they are enabled for
> use. That way all runners can adjust. The proper order is (1) add model
> feature (2) make runners reject it, unsupported (3) add functionality to
> SDK (4) add to some runners and enable.
>
>
>> Therefore I think, that the correct and scalable approach would be to
>> split this into several pieces:
>>
>>  1) define pipeline requirements (this is pretty much similar to how we
>> currently scope @Category(ValidatesRunner.class) tests
>>
>>  2) let pipeline infer it's requirements prior to being translated via
>> runner
>>
>>  3) runner can check the set of required features and their support and
>> reject the pipeline if some feature is missing
>>
> This is exactly what happens today, but was not included in your change.
> The pipeline proto (or the Java pipeline object) clearly contain all the
> needed information. Whether pipeline summarizes it or the runner implements
> a trivial PipelineVisitor is not important.
>
> This could even replace the annotations used in validates runner tests,
>> because each runner would simply execute all tests it has enough features
>> to run.
>>
> What you have described is exactly what happens today.
>
>
>> But as I mentioned - this is pretty much deep change. I don't know how to
>> safely do this for current runners, but to actually implement the feature
>> (it seems to be to me nearly equally complicated to fail pipeline in batch
>> case and to actually implement the sorting).
>>
> Indeed. This feature hasn't really got consensus. The proposal thread [1]
> never really concluded affirmatively [1]. The [VOTE] thread indicates a
> clear *lack* of consensus, with all people who weighed in asking to raise
> awareness and build more support and consensus. Robert made the good point
> that if it is (a) useful and (b) not easy for users to do themselves, then
> we should consider it, even if most people here are not interested in the
> feature. So that is the closest thing to approval that this feature has.
> But getting more people interested and on board would get better feedback
> and achieve a better result for our users.
>
> And as a final note, the PR was not reviewed by the core people who built
> out state & timers, nor those who built out DoFn annotation systems, nor
> any runner author, nor those working on the Beam model protos. You really
> should have gotten most of these people involved. They would likely have
> caught the issues described here.
>
> The specific action that I am proposing is to implement the 1 liner
> described in all runners. It might be best to roll back and proceed with
> steps 1-4 I outlined above, so we can be sure things are proceeding well.
>
> Kenn
>
> [1]
> https://lists.apache.org/thread.html/b91f96121d37bf16403acbd88bc264cf16e40ddb636f0435276e89aa%40%3Cdev.beam.apache.org%3E
> [2]
> https://lists.apache.org/thread.html/91b87940ba7736f9f1021928271a0090f8a0096e5e3f9e52de89acf2%40%3Cdev.beam.apache.org%3E
>
>> It would be super cool if anyone would be interested in implementing this
>> in runners that don't currently support it. A side note - currently the
>> annotation is not supported by all streaming runners due to missing
>> guarantees for timers ordering (which can lead to data losss). I think I
>> have found a solution to this, see [1], but I'd like to be 100% sure,
>> before enabling the support (I'm not sure what is the impact of mis-ordered
>> timers on output timestamps, and so on, and so forth).
>>
>> Jan
>>
>> [1]
>> https://github.com/apache/beam/pull/10795/files#diff-11a02ba72f437b89e35f7ad37102dfd1R209
>> On 2/7/20 7:53 PM, Kenneth Knowles wrote:
>>
>> I see. It is good to see that the pipeline will at least fail. However,
>> the expect approach here is that the pipeline is rejected prior to
>> execution. That is a primary reason for our annotation-driven API style; it
>> allows much better "static" analysis by a runner, so we don't have to wait
>> and fail late. Here is an example:
>> https://github.com/apache/beam/blob/master/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/DataflowRunner.java#L1940
>>
>> Kenn
>>
>> On Thu, Feb 6, 2020 at 11:03 PM Jan Lukavský <je...@seznam.cz> wrote:
>>
>>> Hi Kenn,
>>>
>>> that should not be the case. Care was taken to fail streaming pipeline
>>> which needs this ability and the runner doesn't support this [1]. It is
>>> true, however, that a batch pipeline will not fail, because there is no
>>> generic (runner agnostic) way of supporting this transform in batch case
>>> (which is why the annotation was needed). Failing batch pipelines in this
>>> case would mean runners have to understand this annotation, which is pretty
>>> much close to implementing this feature as a whole.
>>>
>>> This applies generally to any core functionality, it might take some
>>> time before runners fully support this. I don't know how to solve it, maybe
>>> add record to capability matrix? I can imagine a fully generic solution
>>> (runners might publish their capabilities and pipeline might be validated
>>> against these capabilities at pipeline build time), but that is obviously
>>> out of scope of the annotation.
>>>
>>> Jan
>>>
>>> [1]
>>> https://github.com/apache/beam/blob/master/runners/core-java/src/main/java/org/apache/beam/runners/core/DoFnRunners.java#L150
>>> On 2/7/20 1:01 AM, Kenneth Knowles wrote:
>>>
>>> There is a major problem with this merge: the runners that do not
>>> support it do not reject pipelines that need this feature. They will
>>> silently produce the wrong answer, causing data loss.
>>>
>>> Kenn
>>>
>>> On Thu, Feb 6, 2020 at 3:24 AM Jan Lukavský <je...@seznam.cz> wrote:
>>>
>>>> Hi,
>>>>
>>>> the PR was merged to master and a few follow-up issues, were created,
>>>> mainly [1] and [2]. I didn't find any reference to SortedMapState in
>>>> JIRA, is there any tracking issue for that that I can link to? I also
>>>> added link to design document here [3].
>>>>
>>>> [1] https://issues.apache.org/jira/browse/BEAM-9256
>>>>
>>>> [2] https://issues.apache.org/jira/browse/BEAM-9257
>>>>
>>>> [3] https://cwiki.apache.org/confluence/display/BEAM/Design+Documents
>>>>
>>>> On 1/30/20 1:39 PM, Jan Lukavský wrote:
>>>> > Hi,
>>>> >
>>>> > PR [1] (issue [2]) went though code review, and according to [3]
>>>> seems
>>>> > to me to be ready for merge. Current state of the implementation is
>>>> > that it is supported only for direct runner, legacy flink runner
>>>> > (batch and streaming) and legacy spark (batch). It could be supported
>>>> > by all other (streaming) runners using StatefulDoFnRunner, provided
>>>> > the runner can make guarantees about ordering of timer firings (which
>>>> > is unfortunately the case only for legacy flink and direct runner, at
>>>> > least for now - related issues are mentioned multiple times on other
>>>> > threads). Implementation for other batch runners should be as
>>>> > straightforward as adding sorting by event timestamp before stateful
>>>> > dofn (in case where the runner doesn't sort already - e.g. Dataflow -
>>>> > in which case the annotation can be simply ignored - hence support
>>>> for
>>>> > batch Dataflow seems to be a no-op).
>>>> >
>>>> > There has been some slight controversy about this feature, but
>>>> current
>>>> > feature proposing and implementing guidelines do not cover how to
>>>> > resolve those, so I'm using this opportunity to let the community
>>>> > know, that there is a plan to merge this feature, unless there is
>>>> some
>>>> > veto (please provide specific reasons for that in that case). The
>>>> plan
>>>> > is to merge this in the second part of next week, unless there is a
>>>> veto.
>>>> >
>>>> > Thanks,
>>>> >
>>>> >  Jan
>>>> >
>>>> > [1] https://github.com/apache/beam/pull/8774
>>>> >
>>>> > [2] https://issues.apache.org/jira/browse/BEAM-8550
>>>> >
>>>> > [3] https://beam.apache.org/contribute/committer-guide/
>>>> >
>>>>
>>>

Re: [BEAM-8550] @RequiresTimeSortedInput ready for merge to master

Posted by Jan Lukavský <je...@seznam.cz>.
I reviewed closely the runners ad it seems to me that:

  - all batch runners that would fail to support the annotation will 
fail already (spark structured streaming, apex) due to missing support 
for state or timers

  - streaming runners must explicitly enable this, _as long as they use 
StatefulDoFnRunner_, which is the case for apex, flink and samza

I will explicitly disable any pipeline with this annotation for:

  - dataflow, jet and gearpump (because I don't see usage of 
StatefulDoFnRunner, although I though there was one, that's my mistake)

  - all batch runners should either support the annotation or fail 
already (due to missing support for state or timers)

Does this proposal solve the issues you see?

Regarding the process of introducing this annotation I tried really hard 
to get to the best consensus I could. The same holds true for getting 
core people involved in the review process (explicitly mentioned in the 
PR, multiple mailing list threads). The PR was opened for discussion for 
more than half a year. But because I agree with you, I proposed the BIP, 
so that we can have a more explicit process for arriving at a consensus 
for features like this. I'd be happy though, if we can get to consensus 
about what to do now (if the steps I wrote above will solve every 
doubts) and have a deeper process for similar features for future cases. 
As I mentioned this feature is already implemented and having open PR 
into core for nearly a year is expensive to keep it in sync with master.

On 2/7/20 9:31 PM, Kenneth Knowles wrote:
> TL;DR I am not suggesting that you must implement this for any runner. 
> I'm afraid I do have to propose this change be rolled back before 
> release 2.21.0 unless we fix this. I think the fix is easily achieved.
>
> Clarifications inline.
>
> On Fri, Feb 7, 2020 at 11:20 AM Jan Lukavský <je.ik@seznam.cz 
> <ma...@seznam.cz>> wrote:
>
>     Hi Kenn,
>
>     I think that this approach is not well maintainable and doesn't
>     scale. Main reasons:
>
>      a) modifying core has by definition some impact on runners, so
>     modifying core would imply necessity to modify all runners
>
> My concern is not about all changes to "core" but only changes to the 
> model, which should be extraordinarily rare. They must receive extreme 
> scrutiny and require a very high level of consensus. It is true that 
> every runner needs to either correctly execute or refuse to execute 
> every pipeline, to the extent possible. For the case we are talking 
> about it is very easy to meet this requirement.
>
>      b) having to implement core feature for all existing runners will
>     make any modification to core prohibitively expensive
>
> No one is suggesting this. I am saying that you need to write the 1 
> line that I linked to "if (usesRequiresTimeSortedInput) then reject 
> pipeline" so the runner fails before it begins processing data, 
> potentially consuming non-replayable messages.
>
>      c) even if we accept this, there can be runners that are outside
>     of beam repo (or even closed source!)
>
> Indeed. And those runners need time to adapt to the new proto fields. 
> I did not mention it this time, because the proto is not considered 
> stable. But very soon it will be. At that point additions like this 
> will have to be fully specified and added to the proto long before 
> they are enabled for use. That way all runners can adjust. The proper 
> order is (1) add model feature (2) make runners reject it, unsupported 
> (3) add functionality to SDK (4) add to some runners and enable.
>
>     Therefore I think, that the correct and scalable approach would be
>     to split this into several pieces:
>
>      1) define pipeline requirements (this is pretty much similar to
>     how we currently scope @Category(ValidatesRunner.class) tests
>
>      2) let pipeline infer it's requirements prior to being translated
>     via runner
>
>      3) runner can check the set of required features and their
>     support and reject the pipeline if some feature is missing
>
> This is exactly what happens today, but was not included in your 
> change. The pipeline proto (or the Java pipeline object) clearly 
> contain all the needed information. Whether pipeline summarizes it or 
> the runner implements a trivial PipelineVisitor is not important.
>
>     This could even replace the annotations used in validates runner
>     tests, because each runner would simply execute all tests it has
>     enough features to run.
>
> What you have described is exactly what happens today.
>
>     But as I mentioned - this is pretty much deep change. I don't know
>     how to safely do this for current runners, but to actually
>     implement the feature (it seems to be to me nearly equally
>     complicated to fail pipeline in batch case and to actually
>     implement the sorting).
>
> Indeed. This feature hasn't really got consensus. The proposal thread 
> [1] never really concluded affirmatively [1]. The [VOTE] thread 
> indicates a clear *lack* of consensus, with all people who weighed in 
> asking to raise awareness and build more support and consensus. Robert 
> made the good point that if it is (a) useful and (b) not easy for 
> users to do themselves, then we should consider it, even if most 
> people here are not interested in the feature. So that is the closest 
> thing to approval that this feature has. But getting more people 
> interested and on board would get better feedback and achieve a better 
> result for our users.
>
> And as a final note, the PR was not reviewed by the core people who 
> built out state & timers, nor those who built out DoFn annotation 
> systems, nor any runner author, nor those working on the Beam model 
> protos. You really should have gotten most of these people involved. 
> They would likely have caught the issues described here.
>
> The specific action that I am proposing is to implement the 1 liner 
> described in all runners. It might be best to roll back and proceed 
> with steps 1-4 I outlined above, so we can be sure things are 
> proceeding well.
>
> Kenn
>
> [1] 
> https://lists.apache.org/thread.html/b91f96121d37bf16403acbd88bc264cf16e40ddb636f0435276e89aa%40%3Cdev.beam.apache.org%3E
> [2] 
> https://lists.apache.org/thread.html/91b87940ba7736f9f1021928271a0090f8a0096e5e3f9e52de89acf2%40%3Cdev.beam.apache.org%3E
>
>     It would be super cool if anyone would be interested in
>     implementing this in runners that don't currently support it. A
>     side note - currently the annotation is not supported by all
>     streaming runners due to missing guarantees for timers ordering
>     (which can lead to data losss). I think I have found a solution to
>     this, see [1], but I'd like to be 100% sure, before enabling the
>     support (I'm not sure what is the impact of mis-ordered timers on
>     output timestamps, and so on, and so forth).
>
>     Jan
>
>     [1]
>     https://github.com/apache/beam/pull/10795/files#diff-11a02ba72f437b89e35f7ad37102dfd1R209
>
>     On 2/7/20 7:53 PM, Kenneth Knowles wrote:
>>     I see. It is good to see that the pipeline will at least fail.
>>     However, the expect approach here is that the pipeline is
>>     rejected prior to execution. That is a primary reason for our
>>     annotation-driven API style; it allows much better "static"
>>     analysis by a runner, so we don't have to wait and fail late.
>>     Here is an example:
>>     https://github.com/apache/beam/blob/master/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/DataflowRunner.java#L1940
>>
>>
>>     Kenn
>>
>>     On Thu, Feb 6, 2020 at 11:03 PM Jan Lukavský <je.ik@seznam.cz
>>     <ma...@seznam.cz>> wrote:
>>
>>         Hi Kenn,
>>
>>         that should not be the case. Care was taken to fail streaming
>>         pipeline which needs this ability and the runner doesn't
>>         support this [1]. It is true, however, that a batch pipeline
>>         will not fail, because there is no generic (runner agnostic)
>>         way of supporting this transform in batch case (which is why
>>         the annotation was needed). Failing batch pipelines in this
>>         case would mean runners have to understand this annotation,
>>         which is pretty much close to implementing this feature as a
>>         whole.
>>
>>         This applies generally to any core functionality, it might
>>         take some time before runners fully support this. I don't
>>         know how to solve it, maybe add record to capability matrix?
>>         I can imagine a fully generic solution (runners might publish
>>         their capabilities and pipeline might be validated against
>>         these capabilities at pipeline build time), but that is
>>         obviously out of scope of the annotation.
>>
>>         Jan
>>
>>         [1]
>>         https://github.com/apache/beam/blob/master/runners/core-java/src/main/java/org/apache/beam/runners/core/DoFnRunners.java#L150
>>
>>         On 2/7/20 1:01 AM, Kenneth Knowles wrote:
>>>         There is a major problem with this merge: the runners that
>>>         do not support it do not reject pipelines that need this
>>>         feature. They will silently produce the wrong answer,
>>>         causing data loss.
>>>
>>>         Kenn
>>>
>>>         On Thu, Feb 6, 2020 at 3:24 AM Jan Lukavský <je.ik@seznam.cz
>>>         <ma...@seznam.cz>> wrote:
>>>
>>>             Hi,
>>>
>>>             the PR was merged to master and a few follow-up issues,
>>>             were created,
>>>             mainly [1] and [2]. I didn't find any reference to
>>>             SortedMapState in
>>>             JIRA, is there any tracking issue for that that I can
>>>             link to? I also
>>>             added link to design document here [3].
>>>
>>>             [1] https://issues.apache.org/jira/browse/BEAM-9256
>>>
>>>             [2] https://issues.apache.org/jira/browse/BEAM-9257
>>>
>>>             [3]
>>>             https://cwiki.apache.org/confluence/display/BEAM/Design+Documents
>>>
>>>             On 1/30/20 1:39 PM, Jan Lukavský wrote:
>>>             > Hi,
>>>             >
>>>             > PR [1] (issue [2]) went though code review, and
>>>             according to [3] seems
>>>             > to me to be ready for merge. Current state of the
>>>             implementation is
>>>             > that it is supported only for direct runner, legacy
>>>             flink runner
>>>             > (batch and streaming) and legacy spark (batch). It
>>>             could be supported
>>>             > by all other (streaming) runners using
>>>             StatefulDoFnRunner, provided
>>>             > the runner can make guarantees about ordering of timer
>>>             firings (which
>>>             > is unfortunately the case only for legacy flink and
>>>             direct runner, at
>>>             > least for now - related issues are mentioned multiple
>>>             times on other
>>>             > threads). Implementation for other batch runners
>>>             should be as
>>>             > straightforward as adding sorting by event timestamp
>>>             before stateful
>>>             > dofn (in case where the runner doesn't sort already -
>>>             e.g. Dataflow -
>>>             > in which case the annotation can be simply ignored -
>>>             hence support for
>>>             > batch Dataflow seems to be a no-op).
>>>             >
>>>             > There has been some slight controversy about this
>>>             feature, but current
>>>             > feature proposing and implementing guidelines do not
>>>             cover how to
>>>             > resolve those, so I'm using this opportunity to let
>>>             the community
>>>             > know, that there is a plan to merge this feature,
>>>             unless there is some
>>>             > veto (please provide specific reasons for that in that
>>>             case). The plan
>>>             > is to merge this in the second part of next week,
>>>             unless there is a veto.
>>>             >
>>>             > Thanks,
>>>             >
>>>             >  Jan
>>>             >
>>>             > [1] https://github.com/apache/beam/pull/8774
>>>             >
>>>             > [2] https://issues.apache.org/jira/browse/BEAM-8550
>>>             >
>>>             > [3] https://beam.apache.org/contribute/committer-guide/
>>>             >
>>>

Re: [BEAM-8550] @RequiresTimeSortedInput ready for merge to master

Posted by Kenneth Knowles <ke...@apache.org>.
TL;DR I am not suggesting that you must implement this for any runner. I'm
afraid I do have to propose this change be rolled back before release
2.21.0 unless we fix this. I think the fix is easily achieved.

Clarifications inline.

On Fri, Feb 7, 2020 at 11:20 AM Jan Lukavský <je...@seznam.cz> wrote:

> Hi Kenn,
>
> I think that this approach is not well maintainable and doesn't scale.
> Main reasons:
>
>  a) modifying core has by definition some impact on runners, so modifying
> core would imply necessity to modify all runners
>
My concern is not about all changes to "core" but only changes to the
model, which should be extraordinarily rare. They must receive extreme
scrutiny and require a very high level of consensus. It is true that every
runner needs to either correctly execute or refuse to execute every
pipeline, to the extent possible. For the case we are talking about it is
very easy to meet this requirement.

 b) having to implement core feature for all existing runners will make any
> modification to core prohibitively expensive
>
No one is suggesting this. I am saying that you need to write the 1 line
that I linked to "if (usesRequiresTimeSortedInput) then reject pipeline" so
the runner fails before it begins processing data, potentially consuming
non-replayable messages.


>  c) even if we accept this, there can be runners that are outside of beam
> repo (or even closed source!)
>
Indeed. And those runners need time to adapt to the new proto fields. I did
not mention it this time, because the proto is not considered stable. But
very soon it will be. At that point additions like this will have to be
fully specified and added to the proto long before they are enabled for
use. That way all runners can adjust. The proper order is (1) add model
feature (2) make runners reject it, unsupported (3) add functionality to
SDK (4) add to some runners and enable.


> Therefore I think, that the correct and scalable approach would be to
> split this into several pieces:
>
>  1) define pipeline requirements (this is pretty much similar to how we
> currently scope @Category(ValidatesRunner.class) tests
>
>  2) let pipeline infer it's requirements prior to being translated via
> runner
>
>  3) runner can check the set of required features and their support and
> reject the pipeline if some feature is missing
>
This is exactly what happens today, but was not included in your change.
The pipeline proto (or the Java pipeline object) clearly contain all the
needed information. Whether pipeline summarizes it or the runner implements
a trivial PipelineVisitor is not important.

This could even replace the annotations used in validates runner tests,
> because each runner would simply execute all tests it has enough features
> to run.
>
What you have described is exactly what happens today.


> But as I mentioned - this is pretty much deep change. I don't know how to
> safely do this for current runners, but to actually implement the feature
> (it seems to be to me nearly equally complicated to fail pipeline in batch
> case and to actually implement the sorting).
>
Indeed. This feature hasn't really got consensus. The proposal thread [1]
never really concluded affirmatively [1]. The [VOTE] thread indicates a
clear *lack* of consensus, with all people who weighed in asking to raise
awareness and build more support and consensus. Robert made the good point
that if it is (a) useful and (b) not easy for users to do themselves, then
we should consider it, even if most people here are not interested in the
feature. So that is the closest thing to approval that this feature has.
But getting more people interested and on board would get better feedback
and achieve a better result for our users.

And as a final note, the PR was not reviewed by the core people who built
out state & timers, nor those who built out DoFn annotation systems, nor
any runner author, nor those working on the Beam model protos. You really
should have gotten most of these people involved. They would likely have
caught the issues described here.

The specific action that I am proposing is to implement the 1 liner
described in all runners. It might be best to roll back and proceed with
steps 1-4 I outlined above, so we can be sure things are proceeding well.

Kenn

[1]
https://lists.apache.org/thread.html/b91f96121d37bf16403acbd88bc264cf16e40ddb636f0435276e89aa%40%3Cdev.beam.apache.org%3E
[2]
https://lists.apache.org/thread.html/91b87940ba7736f9f1021928271a0090f8a0096e5e3f9e52de89acf2%40%3Cdev.beam.apache.org%3E

> It would be super cool if anyone would be interested in implementing this
> in runners that don't currently support it. A side note - currently the
> annotation is not supported by all streaming runners due to missing
> guarantees for timers ordering (which can lead to data losss). I think I
> have found a solution to this, see [1], but I'd like to be 100% sure,
> before enabling the support (I'm not sure what is the impact of mis-ordered
> timers on output timestamps, and so on, and so forth).
>
> Jan
>
> [1]
> https://github.com/apache/beam/pull/10795/files#diff-11a02ba72f437b89e35f7ad37102dfd1R209
> On 2/7/20 7:53 PM, Kenneth Knowles wrote:
>
> I see. It is good to see that the pipeline will at least fail. However,
> the expect approach here is that the pipeline is rejected prior to
> execution. That is a primary reason for our annotation-driven API style; it
> allows much better "static" analysis by a runner, so we don't have to wait
> and fail late. Here is an example:
> https://github.com/apache/beam/blob/master/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/DataflowRunner.java#L1940
>
> Kenn
>
> On Thu, Feb 6, 2020 at 11:03 PM Jan Lukavský <je...@seznam.cz> wrote:
>
>> Hi Kenn,
>>
>> that should not be the case. Care was taken to fail streaming pipeline
>> which needs this ability and the runner doesn't support this [1]. It is
>> true, however, that a batch pipeline will not fail, because there is no
>> generic (runner agnostic) way of supporting this transform in batch case
>> (which is why the annotation was needed). Failing batch pipelines in this
>> case would mean runners have to understand this annotation, which is pretty
>> much close to implementing this feature as a whole.
>>
>> This applies generally to any core functionality, it might take some time
>> before runners fully support this. I don't know how to solve it, maybe add
>> record to capability matrix? I can imagine a fully generic solution
>> (runners might publish their capabilities and pipeline might be validated
>> against these capabilities at pipeline build time), but that is obviously
>> out of scope of the annotation.
>>
>> Jan
>>
>> [1]
>> https://github.com/apache/beam/blob/master/runners/core-java/src/main/java/org/apache/beam/runners/core/DoFnRunners.java#L150
>> On 2/7/20 1:01 AM, Kenneth Knowles wrote:
>>
>> There is a major problem with this merge: the runners that do not support
>> it do not reject pipelines that need this feature. They will silently
>> produce the wrong answer, causing data loss.
>>
>> Kenn
>>
>> On Thu, Feb 6, 2020 at 3:24 AM Jan Lukavský <je...@seznam.cz> wrote:
>>
>>> Hi,
>>>
>>> the PR was merged to master and a few follow-up issues, were created,
>>> mainly [1] and [2]. I didn't find any reference to SortedMapState in
>>> JIRA, is there any tracking issue for that that I can link to? I also
>>> added link to design document here [3].
>>>
>>> [1] https://issues.apache.org/jira/browse/BEAM-9256
>>>
>>> [2] https://issues.apache.org/jira/browse/BEAM-9257
>>>
>>> [3] https://cwiki.apache.org/confluence/display/BEAM/Design+Documents
>>>
>>> On 1/30/20 1:39 PM, Jan Lukavský wrote:
>>> > Hi,
>>> >
>>> > PR [1] (issue [2]) went though code review, and according to [3] seems
>>> > to me to be ready for merge. Current state of the implementation is
>>> > that it is supported only for direct runner, legacy flink runner
>>> > (batch and streaming) and legacy spark (batch). It could be supported
>>> > by all other (streaming) runners using StatefulDoFnRunner, provided
>>> > the runner can make guarantees about ordering of timer firings (which
>>> > is unfortunately the case only for legacy flink and direct runner, at
>>> > least for now - related issues are mentioned multiple times on other
>>> > threads). Implementation for other batch runners should be as
>>> > straightforward as adding sorting by event timestamp before stateful
>>> > dofn (in case where the runner doesn't sort already - e.g. Dataflow -
>>> > in which case the annotation can be simply ignored - hence support for
>>> > batch Dataflow seems to be a no-op).
>>> >
>>> > There has been some slight controversy about this feature, but current
>>> > feature proposing and implementing guidelines do not cover how to
>>> > resolve those, so I'm using this opportunity to let the community
>>> > know, that there is a plan to merge this feature, unless there is some
>>> > veto (please provide specific reasons for that in that case). The plan
>>> > is to merge this in the second part of next week, unless there is a
>>> veto.
>>> >
>>> > Thanks,
>>> >
>>> >  Jan
>>> >
>>> > [1] https://github.com/apache/beam/pull/8774
>>> >
>>> > [2] https://issues.apache.org/jira/browse/BEAM-8550
>>> >
>>> > [3] https://beam.apache.org/contribute/committer-guide/
>>> >
>>>
>>

Re: [BEAM-8550] @RequiresTimeSortedInput ready for merge to master

Posted by Jan Lukavský <je...@seznam.cz>.
Hi Kenn,

I think that this approach is not well maintainable and doesn't scale. 
Main reasons:

  a) modifying core has by definition some impact on runners, so 
modifying core would imply necessity to modify all runners

  b) having to implement core feature for all existing runners will make 
any modification to core prohibitively expensive

  c) even if we accept this, there can be runners that are outside of 
beam repo (or even closed source!)

Therefore I think, that the correct and scalable approach would be to 
split this into several pieces:

  1) define pipeline requirements (this is pretty much similar to how we 
currently scope @Category(ValidatesRunner.class) tests

  2) let pipeline infer it's requirements prior to being translated via 
runner

  3) runner can check the set of required features and their support and 
reject the pipeline if some feature is missing

This could even replace the annotations used in validates runner tests, 
because each runner would simply execute all tests it has enough 
features to run.

But as I mentioned - this is pretty much deep change. I don't know how 
to safely do this for current runners, but to actually implement the 
feature (it seems to be to me nearly equally complicated to fail 
pipeline in batch case and to actually implement the sorting). It would 
be super cool if anyone would be interested in implementing this in 
runners that don't currently support it. A side note - currently the 
annotation is not supported by all streaming runners due to missing 
guarantees for timers ordering (which can lead to data losss). I think I 
have found a solution to this, see [1], but I'd like to be 100% sure, 
before enabling the support (I'm not sure what is the impact of 
mis-ordered timers on output timestamps, and so on, and so forth).

Jan

[1] 
https://github.com/apache/beam/pull/10795/files#diff-11a02ba72f437b89e35f7ad37102dfd1R209

On 2/7/20 7:53 PM, Kenneth Knowles wrote:
> I see. It is good to see that the pipeline will at least fail. 
> However, the expect approach here is that the pipeline is rejected 
> prior to execution. That is a primary reason for our annotation-driven 
> API style; it allows much better "static" analysis by a runner, so we 
> don't have to wait and fail late. Here is an example: 
> https://github.com/apache/beam/blob/master/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/DataflowRunner.java#L1940 
>
>
> Kenn
>
> On Thu, Feb 6, 2020 at 11:03 PM Jan Lukavský <je.ik@seznam.cz 
> <ma...@seznam.cz>> wrote:
>
>     Hi Kenn,
>
>     that should not be the case. Care was taken to fail streaming
>     pipeline which needs this ability and the runner doesn't support
>     this [1]. It is true, however, that a batch pipeline will not
>     fail, because there is no generic (runner agnostic) way of
>     supporting this transform in batch case (which is why the
>     annotation was needed). Failing batch pipelines in this case would
>     mean runners have to understand this annotation, which is pretty
>     much close to implementing this feature as a whole.
>
>     This applies generally to any core functionality, it might take
>     some time before runners fully support this. I don't know how to
>     solve it, maybe add record to capability matrix? I can imagine a
>     fully generic solution (runners might publish their capabilities
>     and pipeline might be validated against these capabilities at
>     pipeline build time), but that is obviously out of scope of the
>     annotation.
>
>     Jan
>
>     [1]
>     https://github.com/apache/beam/blob/master/runners/core-java/src/main/java/org/apache/beam/runners/core/DoFnRunners.java#L150
>
>     On 2/7/20 1:01 AM, Kenneth Knowles wrote:
>>     There is a major problem with this merge: the runners that do not
>>     support it do not reject pipelines that need this feature. They
>>     will silently produce the wrong answer, causing data loss.
>>
>>     Kenn
>>
>>     On Thu, Feb 6, 2020 at 3:24 AM Jan Lukavský <je.ik@seznam.cz
>>     <ma...@seznam.cz>> wrote:
>>
>>         Hi,
>>
>>         the PR was merged to master and a few follow-up issues, were
>>         created,
>>         mainly [1] and [2]. I didn't find any reference to
>>         SortedMapState in
>>         JIRA, is there any tracking issue for that that I can link
>>         to? I also
>>         added link to design document here [3].
>>
>>         [1] https://issues.apache.org/jira/browse/BEAM-9256
>>
>>         [2] https://issues.apache.org/jira/browse/BEAM-9257
>>
>>         [3]
>>         https://cwiki.apache.org/confluence/display/BEAM/Design+Documents
>>
>>         On 1/30/20 1:39 PM, Jan Lukavský wrote:
>>         > Hi,
>>         >
>>         > PR [1] (issue [2]) went though code review, and according
>>         to [3] seems
>>         > to me to be ready for merge. Current state of the
>>         implementation is
>>         > that it is supported only for direct runner, legacy flink
>>         runner
>>         > (batch and streaming) and legacy spark (batch). It could be
>>         supported
>>         > by all other (streaming) runners using StatefulDoFnRunner,
>>         provided
>>         > the runner can make guarantees about ordering of timer
>>         firings (which
>>         > is unfortunately the case only for legacy flink and direct
>>         runner, at
>>         > least for now - related issues are mentioned multiple times
>>         on other
>>         > threads). Implementation for other batch runners should be as
>>         > straightforward as adding sorting by event timestamp before
>>         stateful
>>         > dofn (in case where the runner doesn't sort already - e.g.
>>         Dataflow -
>>         > in which case the annotation can be simply ignored - hence
>>         support for
>>         > batch Dataflow seems to be a no-op).
>>         >
>>         > There has been some slight controversy about this feature,
>>         but current
>>         > feature proposing and implementing guidelines do not cover
>>         how to
>>         > resolve those, so I'm using this opportunity to let the
>>         community
>>         > know, that there is a plan to merge this feature, unless
>>         there is some
>>         > veto (please provide specific reasons for that in that
>>         case). The plan
>>         > is to merge this in the second part of next week, unless
>>         there is a veto.
>>         >
>>         > Thanks,
>>         >
>>         >  Jan
>>         >
>>         > [1] https://github.com/apache/beam/pull/8774
>>         >
>>         > [2] https://issues.apache.org/jira/browse/BEAM-8550
>>         >
>>         > [3] https://beam.apache.org/contribute/committer-guide/
>>         >
>>

Re: [BEAM-8550] @RequiresTimeSortedInput ready for merge to master

Posted by Kenneth Knowles <ke...@apache.org>.
I see. It is good to see that the pipeline will at least fail. However, the
expect approach here is that the pipeline is rejected prior to execution.
That is a primary reason for our annotation-driven API style; it allows
much better "static" analysis by a runner, so we don't have to wait and
fail late. Here is an example:
https://github.com/apache/beam/blob/master/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/DataflowRunner.java#L1940

Kenn

On Thu, Feb 6, 2020 at 11:03 PM Jan Lukavský <je...@seznam.cz> wrote:

> Hi Kenn,
>
> that should not be the case. Care was taken to fail streaming pipeline
> which needs this ability and the runner doesn't support this [1]. It is
> true, however, that a batch pipeline will not fail, because there is no
> generic (runner agnostic) way of supporting this transform in batch case
> (which is why the annotation was needed). Failing batch pipelines in this
> case would mean runners have to understand this annotation, which is pretty
> much close to implementing this feature as a whole.
>
> This applies generally to any core functionality, it might take some time
> before runners fully support this. I don't know how to solve it, maybe add
> record to capability matrix? I can imagine a fully generic solution
> (runners might publish their capabilities and pipeline might be validated
> against these capabilities at pipeline build time), but that is obviously
> out of scope of the annotation.
>
> Jan
>
> [1]
> https://github.com/apache/beam/blob/master/runners/core-java/src/main/java/org/apache/beam/runners/core/DoFnRunners.java#L150
> On 2/7/20 1:01 AM, Kenneth Knowles wrote:
>
> There is a major problem with this merge: the runners that do not support
> it do not reject pipelines that need this feature. They will silently
> produce the wrong answer, causing data loss.
>
> Kenn
>
> On Thu, Feb 6, 2020 at 3:24 AM Jan Lukavský <je...@seznam.cz> wrote:
>
>> Hi,
>>
>> the PR was merged to master and a few follow-up issues, were created,
>> mainly [1] and [2]. I didn't find any reference to SortedMapState in
>> JIRA, is there any tracking issue for that that I can link to? I also
>> added link to design document here [3].
>>
>> [1] https://issues.apache.org/jira/browse/BEAM-9256
>>
>> [2] https://issues.apache.org/jira/browse/BEAM-9257
>>
>> [3] https://cwiki.apache.org/confluence/display/BEAM/Design+Documents
>>
>> On 1/30/20 1:39 PM, Jan Lukavský wrote:
>> > Hi,
>> >
>> > PR [1] (issue [2]) went though code review, and according to [3] seems
>> > to me to be ready for merge. Current state of the implementation is
>> > that it is supported only for direct runner, legacy flink runner
>> > (batch and streaming) and legacy spark (batch). It could be supported
>> > by all other (streaming) runners using StatefulDoFnRunner, provided
>> > the runner can make guarantees about ordering of timer firings (which
>> > is unfortunately the case only for legacy flink and direct runner, at
>> > least for now - related issues are mentioned multiple times on other
>> > threads). Implementation for other batch runners should be as
>> > straightforward as adding sorting by event timestamp before stateful
>> > dofn (in case where the runner doesn't sort already - e.g. Dataflow -
>> > in which case the annotation can be simply ignored - hence support for
>> > batch Dataflow seems to be a no-op).
>> >
>> > There has been some slight controversy about this feature, but current
>> > feature proposing and implementing guidelines do not cover how to
>> > resolve those, so I'm using this opportunity to let the community
>> > know, that there is a plan to merge this feature, unless there is some
>> > veto (please provide specific reasons for that in that case). The plan
>> > is to merge this in the second part of next week, unless there is a
>> veto.
>> >
>> > Thanks,
>> >
>> >  Jan
>> >
>> > [1] https://github.com/apache/beam/pull/8774
>> >
>> > [2] https://issues.apache.org/jira/browse/BEAM-8550
>> >
>> > [3] https://beam.apache.org/contribute/committer-guide/
>> >
>>
>

Re: [BEAM-8550] @RequiresTimeSortedInput ready for merge to master

Posted by Jan Lukavský <je...@seznam.cz>.
Hi Kenn,

that should not be the case. Care was taken to fail streaming pipeline 
which needs this ability and the runner doesn't support this [1]. It is 
true, however, that a batch pipeline will not fail, because there is no 
generic (runner agnostic) way of supporting this transform in batch case 
(which is why the annotation was needed). Failing batch pipelines in 
this case would mean runners have to understand this annotation, which 
is pretty much close to implementing this feature as a whole.

This applies generally to any core functionality, it might take some 
time before runners fully support this. I don't know how to solve it, 
maybe add record to capability matrix? I can imagine a fully generic 
solution (runners might publish their capabilities and pipeline might be 
validated against these capabilities at pipeline build time), but that 
is obviously out of scope of the annotation.

Jan

[1] 
https://github.com/apache/beam/blob/master/runners/core-java/src/main/java/org/apache/beam/runners/core/DoFnRunners.java#L150

On 2/7/20 1:01 AM, Kenneth Knowles wrote:
> There is a major problem with this merge: the runners that do not 
> support it do not reject pipelines that need this feature. They will 
> silently produce the wrong answer, causing data loss.
>
> Kenn
>
> On Thu, Feb 6, 2020 at 3:24 AM Jan Lukavský <je.ik@seznam.cz 
> <ma...@seznam.cz>> wrote:
>
>     Hi,
>
>     the PR was merged to master and a few follow-up issues, were created,
>     mainly [1] and [2]. I didn't find any reference to SortedMapState in
>     JIRA, is there any tracking issue for that that I can link to? I also
>     added link to design document here [3].
>
>     [1] https://issues.apache.org/jira/browse/BEAM-9256
>
>     [2] https://issues.apache.org/jira/browse/BEAM-9257
>
>     [3] https://cwiki.apache.org/confluence/display/BEAM/Design+Documents
>
>     On 1/30/20 1:39 PM, Jan Lukavský wrote:
>     > Hi,
>     >
>     > PR [1] (issue [2]) went though code review, and according to [3]
>     seems
>     > to me to be ready for merge. Current state of the implementation is
>     > that it is supported only for direct runner, legacy flink runner
>     > (batch and streaming) and legacy spark (batch). It could be
>     supported
>     > by all other (streaming) runners using StatefulDoFnRunner, provided
>     > the runner can make guarantees about ordering of timer firings
>     (which
>     > is unfortunately the case only for legacy flink and direct
>     runner, at
>     > least for now - related issues are mentioned multiple times on
>     other
>     > threads). Implementation for other batch runners should be as
>     > straightforward as adding sorting by event timestamp before
>     stateful
>     > dofn (in case where the runner doesn't sort already - e.g.
>     Dataflow -
>     > in which case the annotation can be simply ignored - hence
>     support for
>     > batch Dataflow seems to be a no-op).
>     >
>     > There has been some slight controversy about this feature, but
>     current
>     > feature proposing and implementing guidelines do not cover how to
>     > resolve those, so I'm using this opportunity to let the community
>     > know, that there is a plan to merge this feature, unless there
>     is some
>     > veto (please provide specific reasons for that in that case).
>     The plan
>     > is to merge this in the second part of next week, unless there
>     is a veto.
>     >
>     > Thanks,
>     >
>     >  Jan
>     >
>     > [1] https://github.com/apache/beam/pull/8774
>     >
>     > [2] https://issues.apache.org/jira/browse/BEAM-8550
>     >
>     > [3] https://beam.apache.org/contribute/committer-guide/
>     >
>

Re: [BEAM-8550] @RequiresTimeSortedInput ready for merge to master

Posted by Kenneth Knowles <ke...@apache.org>.
There is a major problem with this merge: the runners that do not support
it do not reject pipelines that need this feature. They will silently
produce the wrong answer, causing data loss.

Kenn

On Thu, Feb 6, 2020 at 3:24 AM Jan Lukavský <je...@seznam.cz> wrote:

> Hi,
>
> the PR was merged to master and a few follow-up issues, were created,
> mainly [1] and [2]. I didn't find any reference to SortedMapState in
> JIRA, is there any tracking issue for that that I can link to? I also
> added link to design document here [3].
>
> [1] https://issues.apache.org/jira/browse/BEAM-9256
>
> [2] https://issues.apache.org/jira/browse/BEAM-9257
>
> [3] https://cwiki.apache.org/confluence/display/BEAM/Design+Documents
>
> On 1/30/20 1:39 PM, Jan Lukavský wrote:
> > Hi,
> >
> > PR [1] (issue [2]) went though code review, and according to [3] seems
> > to me to be ready for merge. Current state of the implementation is
> > that it is supported only for direct runner, legacy flink runner
> > (batch and streaming) and legacy spark (batch). It could be supported
> > by all other (streaming) runners using StatefulDoFnRunner, provided
> > the runner can make guarantees about ordering of timer firings (which
> > is unfortunately the case only for legacy flink and direct runner, at
> > least for now - related issues are mentioned multiple times on other
> > threads). Implementation for other batch runners should be as
> > straightforward as adding sorting by event timestamp before stateful
> > dofn (in case where the runner doesn't sort already - e.g. Dataflow -
> > in which case the annotation can be simply ignored - hence support for
> > batch Dataflow seems to be a no-op).
> >
> > There has been some slight controversy about this feature, but current
> > feature proposing and implementing guidelines do not cover how to
> > resolve those, so I'm using this opportunity to let the community
> > know, that there is a plan to merge this feature, unless there is some
> > veto (please provide specific reasons for that in that case). The plan
> > is to merge this in the second part of next week, unless there is a veto.
> >
> > Thanks,
> >
> >  Jan
> >
> > [1] https://github.com/apache/beam/pull/8774
> >
> > [2] https://issues.apache.org/jira/browse/BEAM-8550
> >
> > [3] https://beam.apache.org/contribute/committer-guide/
> >
>

Re: [BEAM-8550] @RequiresTimeSortedInput ready for merge to master

Posted by Jan Lukavský <je...@seznam.cz>.
Hi,

the PR was merged to master and a few follow-up issues, were created, 
mainly [1] and [2]. I didn't find any reference to SortedMapState in 
JIRA, is there any tracking issue for that that I can link to? I also 
added link to design document here [3].

[1] https://issues.apache.org/jira/browse/BEAM-9256

[2] https://issues.apache.org/jira/browse/BEAM-9257

[3] https://cwiki.apache.org/confluence/display/BEAM/Design+Documents

On 1/30/20 1:39 PM, Jan Lukavský wrote:
> Hi,
>
> PR [1] (issue [2]) went though code review, and according to [3] seems 
> to me to be ready for merge. Current state of the implementation is 
> that it is supported only for direct runner, legacy flink runner 
> (batch and streaming) and legacy spark (batch). It could be supported 
> by all other (streaming) runners using StatefulDoFnRunner, provided 
> the runner can make guarantees about ordering of timer firings (which 
> is unfortunately the case only for legacy flink and direct runner, at 
> least for now - related issues are mentioned multiple times on other 
> threads). Implementation for other batch runners should be as 
> straightforward as adding sorting by event timestamp before stateful 
> dofn (in case where the runner doesn't sort already - e.g. Dataflow - 
> in which case the annotation can be simply ignored - hence support for 
> batch Dataflow seems to be a no-op).
>
> There has been some slight controversy about this feature, but current 
> feature proposing and implementing guidelines do not cover how to 
> resolve those, so I'm using this opportunity to let the community 
> know, that there is a plan to merge this feature, unless there is some 
> veto (please provide specific reasons for that in that case). The plan 
> is to merge this in the second part of next week, unless there is a veto.
>
> Thanks,
>
>  Jan
>
> [1] https://github.com/apache/beam/pull/8774
>
> [2] https://issues.apache.org/jira/browse/BEAM-8550
>
> [3] https://beam.apache.org/contribute/committer-guide/
>