You are viewing a plain text version of this content. The canonical link for it is here.
Posted to github@beam.apache.org by GitBox <gi...@apache.org> on 2020/05/25 12:05:08 UTC

[GitHub] [beam] je-ik opened a new pull request #11808: [WIP] [BEAM-10072] Fix RequiresTimeSortedInput for stateless DoFns

je-ik opened a new pull request #11808:
URL: https://github.com/apache/beam/pull/11808


   WIP for BEAM-10072
   
   ------------------------
   
   Thank you for your contribution! Follow this checklist to help us incorporate your contribution quickly and easily:
   
    - [ ] [**Choose reviewer(s)**](https://beam.apache.org/contribute/#make-your-change) and mention them in a comment (`R: @username`).
    - [ ] Format the pull request title like `[BEAM-XXX] Fixes bug in ApproximateQuantiles`, where you replace `BEAM-XXX` with the appropriate JIRA issue, if applicable. This will automatically link the pull request to the issue.
    - [ ] Update `CHANGES.md` with noteworthy changes.
    - [ ] If this contribution is large, please file an Apache [Individual Contributor License Agreement](https://www.apache.org/licenses/icla.pdf).
   
   See the [Contributor Guide](https://beam.apache.org/contribute) for more tips on [how to make review process smoother](https://beam.apache.org/contribute/#make-reviewers-job-easier).
   
   Post-Commit Tests Status (on master branch)
   ------------------------------------------------------------------------------------------------
   
   Lang | SDK | Apex | Dataflow | Flink | Gearpump | Samza | Spark
   --- | --- | --- | --- | --- | --- | --- | ---
   Go | [![Build Status](https://builds.apache.org/job/beam_PostCommit_Go/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Go/lastCompletedBuild/) | --- | --- | [![Build Status](https://builds.apache.org/job/beam_PostCommit_Go_VR_Flink/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Go_VR_Flink/lastCompletedBuild/) | --- | --- | [![Build Status](https://builds.apache.org/job/beam_PostCommit_Go_VR_Spark/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Go_VR_Spark/lastCompletedBuild/)
   Java | [![Build Status](https://builds.apache.org/job/beam_PostCommit_Java/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Java/lastCompletedBuild/) | [![Build Status](https://builds.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Apex/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Apex/lastCompletedBuild/) | [![Build Status](https://builds.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Dataflow/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Dataflow/lastCompletedBuild/)<br>[![Build Status](https://builds.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Dataflow_Java11/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Dataflow_Java11/lastCompletedBuild/) | [![Build Status](https://builds.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Flink/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Flink/lastCompletedBuild/)<br>[![Build Status](https://builds.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Flink_Java11/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Flink_Java11/lastCompletedBuild/)<br>[![Build Status](https://builds.apache.org/job/beam_PostCommit_Java_PVR_Flink_Batch/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Java_PVR_Flink_Batch/lastCompletedBuild/)<br>[![Build Status](https://builds.apache.org/job/beam_PostCommit_Java_PVR_Flink_Streaming/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Java_PVR_Flink_Streaming/lastCompletedBuild/) | [![Build Status](https://builds.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Gearpump/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Gearpump/lastCompletedBuild/) | [![Build Status](https://builds.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Samza/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Samza/lastCompletedBuild/) | [![Build Status](https://builds.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Spark/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Spark/lastCompletedBuild/)<br>[![Build Status](https://builds.apache.org/job/beam_PostCommit_Java_PVR_Spark_Batch/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Java_PVR_Spark_Batch/lastCompletedBuild/)<br>[![Build Status](https://builds.apache.org/job/beam_PostCommit_Java_ValidatesRunner_SparkStructuredStreaming/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Java_ValidatesRunner_SparkStructuredStreaming/lastCompletedBuild/)
   Python | [![Build Status](https://builds.apache.org/job/beam_PostCommit_Python2/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Python2/lastCompletedBuild/)<br>[![Build Status](https://builds.apache.org/job/beam_PostCommit_Python35/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Python35/lastCompletedBuild/)<br>[![Build Status](https://builds.apache.org/job/beam_PostCommit_Python36/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Python36/lastCompletedBuild/)<br>[![Build Status](https://builds.apache.org/job/beam_PostCommit_Python37/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Python37/lastCompletedBuild/) | --- | [![Build Status](https://builds.apache.org/job/beam_PostCommit_Py_VR_Dataflow/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Py_VR_Dataflow/lastCompletedBuild/)<br>[![Build Status](https://builds.apache.org/job/beam_PostCommit_Py_VR_Dataflow_V2/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Py_VR_Dataflow_V2/lastCompletedBuild/)<br>[![Build Status](https://builds.apache.org/job/beam_PostCommit_Py_ValCont/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Py_ValCont/lastCompletedBuild/) | [![Build Status](https://builds.apache.org/job/beam_PreCommit_Python2_PVR_Flink_Cron/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PreCommit_Python2_PVR_Flink_Cron/lastCompletedBuild/)<br>[![Build Status](https://builds.apache.org/job/beam_PostCommit_Python35_VR_Flink/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Python35_VR_Flink/lastCompletedBuild/) | --- | --- | [![Build Status](https://builds.apache.org/job/beam_PostCommit_Python_VR_Spark/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Python_VR_Spark/lastCompletedBuild/)
   XLang | --- | --- | --- | [![Build Status](https://builds.apache.org/job/beam_PostCommit_XVR_Flink/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_XVR_Flink/lastCompletedBuild/) | --- | --- | [![Build Status](https://builds.apache.org/job/beam_PostCommit_XVR_Spark/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_XVR_Spark/lastCompletedBuild/)
   
   Pre-Commit Tests Status (on master branch)
   ------------------------------------------------------------------------------------------------
   
   --- |Java | Python | Go | Website
   --- | --- | --- | --- | ---
   Non-portable | [![Build Status](https://builds.apache.org/job/beam_PreCommit_Java_Cron/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PreCommit_Java_Cron/lastCompletedBuild/) | [![Build Status](https://builds.apache.org/job/beam_PreCommit_Python_Cron/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PreCommit_Python_Cron/lastCompletedBuild/)<br>[![Build Status](https://builds.apache.org/job/beam_PreCommit_PythonLint_Cron/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PreCommit_PythonLint_Cron/lastCompletedBuild/) | [![Build Status](https://builds.apache.org/job/beam_PreCommit_Go_Cron/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PreCommit_Go_Cron/lastCompletedBuild/) | [![Build Status](https://builds.apache.org/job/beam_PreCommit_Website_Cron/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PreCommit_Website_Cron/lastCompletedBuild/) 
   Portable | --- | [![Build Status](https://builds.apache.org/job/beam_PreCommit_Portable_Python_Cron/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PreCommit_Portable_Python_Cron/lastCompletedBuild/) | --- | ---
   
   See [.test-infra/jenkins/README](https://github.com/apache/beam/blob/master/.test-infra/jenkins/README.md) for trigger phrase, status and link of all Jenkins jobs.
   


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [beam] stale[bot] commented on pull request #11808: [BEAM-10072] Fix RequiresTimeSortedInput for stateless DoFns

Posted by GitBox <gi...@apache.org>.
stale[bot] commented on pull request #11808:
URL: https://github.com/apache/beam/pull/11808#issuecomment-683641299


   This pull request has been closed due to lack of activity. If you think that is incorrect, or the pull request requires review, you can revive the PR at any time.
   


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [beam] je-ik commented on pull request #11808: [BEAM-10072] Fix RequiresTimeSortedInput for stateless DoFns

Posted by GitBox <gi...@apache.org>.
je-ik commented on pull request #11808:
URL: https://github.com/apache/beam/pull/11808#issuecomment-647699712


   Run Flink ValidatesRunner


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [beam] je-ik commented on pull request #11808: [WIP] [BEAM-10072] Fix RequiresTimeSortedInput for stateless DoFns

Posted by GitBox <gi...@apache.org>.
je-ik commented on pull request #11808:
URL: https://github.com/apache/beam/pull/11808#issuecomment-633540317


   We also might want to split this into two PRs - one for unification of the logic in `DoFnSignature` vs. `DoFnSignatures` and the other to add test for stateless DoFn and `@RequiresTimeSortedInput`. Need a resolution to [this thread](https://lists.apache.org/thread.html/r3c2ff415e5dd866b5a6c95a22606e7bd61eda4cbfac9ba598bdf4c71%40%3Cdev.beam.apache.org%3E)


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [beam] lukecwik commented on a change in pull request #11808: [BEAM-10072] Fix RequiresTimeSortedInput for stateless DoFns

Posted by GitBox <gi...@apache.org>.
lukecwik commented on a change in pull request #11808:
URL: https://github.com/apache/beam/pull/11808#discussion_r443789083



##########
File path: runners/flink/src/main/java/org/apache/beam/runners/flink/FlinkStreamingTransformTranslators.java
##########
@@ -510,15 +509,13 @@ public RawUnionValue map(T o) throws Exception {
 
       Coder keyCoder = null;
       KeySelector<WindowedValue<InputT>, ?> keySelector = null;
-      boolean stateful = false;
-      DoFnSignature signature = DoFnSignatures.getSignature(doFn.getClass());
-      if (signature.stateDeclarations().size() > 0 || signature.timerDeclarations().size() > 0) {
+      boolean stateful = DoFnSignatures.isStateful(doFn);
+      if (stateful) {
         // Based on the fact that the signature is stateful, DoFnSignatures ensures
         // that it is also keyed
         keyCoder = ((KvCoder) input.getCoder()).getKeyCoder();
         keySelector = new KvToByteBufferKeySelector(keyCoder);
         inputDataStream = inputDataStream.keyBy(keySelector);
-        stateful = true;
       } else if (doFn instanceof SplittableParDoViaKeyedWorkItems.ProcessFn) {

Review comment:
       This is an example where how a runner chooses to execute a transform sets that it is "stateful"

##########
File path: runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/ParDoTranslation.java
##########
@@ -739,7 +737,9 @@ private static ParDoPayload getParDoPayload(RunnerApi.PTransform parDoPTransform
 
   public static boolean usesStateOrTimers(AppliedPTransform<?, ?, ?> transform) throws IOException {
     ParDoPayload payload = getParDoPayload(transform);
-    return payload.getStateSpecsCount() > 0 || payload.getTimerFamilySpecsCount() > 0;
+    return payload.getStateSpecsCount() > 0
+        || payload.getTimerFamilySpecsCount() > 0
+        || payload.getRequiresTimeSortedInput();

Review comment:
       I don't believe we came to a consensus on the ML to have "usesStateOrTimers" represent how the DoFn is executed by a runner since a runner may choose to not implement RequiresTimeSortedInput using a wrapping stateful DoFn.

##########
File path: sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/ParDoTest.java
##########
@@ -2572,6 +2572,49 @@ public void testTwoRequiresTimeSortedInputWithLateData() {
           false);
     }
 
+    @Test
+    @Category({
+      ValidatesRunner.class,
+      UsesStatefulParDo.class,
+      UsesRequiresTimeSortedInput.class,
+      UsesStrictTimerOrdering.class,
+      UsesTestStream.class
+    })
+    public void testRequiresTimeSortedInputWithStatelessDoFn() {
+      // generate list long enough to rule out random shuffle in sorted order
+      int numElements = 1000;
+      List<Long> eventStamps =
+          LongStream.range(0, numElements)
+              .mapToObj(i -> numElements - i)
+              .collect(Collectors.toList());
+      TestStream.Builder<Long> stream = TestStream.create(VarLongCoder.of());
+      for (Long stamp : eventStamps) {
+        stream = stream.addElements(TimestampedValue.of(stamp, Instant.ofEpochMilli(stamp)));
+      }
+      testTimeSortedInputStateless(
+          numElements, pipeline.apply(stream.advanceWatermarkToInfinity()));
+    }
+
+    @Test
+    @Category({
+      ValidatesRunner.class,
+      UsesStatefulParDo.class,

Review comment:
       The javadoc for RequiresTimeSortedInput shouldn't have any meaningful effect on stateless DoFns. So I'm confused as why this should be allowed and not an error during DoFn signature validation?
   
   If this is a change to the javadoc, please update that as part of this PR.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [beam] je-ik commented on pull request #11808: [BEAM-10072] Fix RequiresTimeSortedInput for stateless DoFns

Posted by GitBox <gi...@apache.org>.
je-ik commented on pull request #11808:
URL: https://github.com/apache/beam/pull/11808#issuecomment-648730667


   Run Java PreCommit


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [beam] lukecwik commented on a change in pull request #11808: [BEAM-10072] Fix RequiresTimeSortedInput for stateless DoFns

Posted by GitBox <gi...@apache.org>.
lukecwik commented on a change in pull request #11808:
URL: https://github.com/apache/beam/pull/11808#discussion_r445205831



##########
File path: sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/ParDoTest.java
##########
@@ -2572,6 +2572,49 @@ public void testTwoRequiresTimeSortedInputWithLateData() {
           false);
     }
 
+    @Test
+    @Category({
+      ValidatesRunner.class,
+      UsesStatefulParDo.class,
+      UsesRequiresTimeSortedInput.class,
+      UsesStrictTimerOrdering.class,
+      UsesTestStream.class
+    })
+    public void testRequiresTimeSortedInputWithStatelessDoFn() {
+      // generate list long enough to rule out random shuffle in sorted order
+      int numElements = 1000;
+      List<Long> eventStamps =
+          LongStream.range(0, numElements)
+              .mapToObj(i -> numElements - i)
+              .collect(Collectors.toList());
+      TestStream.Builder<Long> stream = TestStream.create(VarLongCoder.of());
+      for (Long stamp : eventStamps) {
+        stream = stream.addElements(TimestampedValue.of(stamp, Instant.ofEpochMilli(stamp)));
+      }
+      testTimeSortedInputStateless(
+          numElements, pipeline.apply(stream.advanceWatermarkToInfinity()));
+    }
+
+    @Test
+    @Category({
+      ValidatesRunner.class,
+      UsesStatefulParDo.class,

Review comment:
       makes sense




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [beam] lukecwik commented on a change in pull request #11808: [BEAM-10072] Fix RequiresTimeSortedInput for stateless DoFns

Posted by GitBox <gi...@apache.org>.
lukecwik commented on a change in pull request #11808:
URL: https://github.com/apache/beam/pull/11808#discussion_r445207524



##########
File path: runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/ParDoTranslation.java
##########
@@ -739,7 +737,9 @@ private static ParDoPayload getParDoPayload(RunnerApi.PTransform parDoPTransform
 
   public static boolean usesStateOrTimers(AppliedPTransform<?, ?, ?> transform) throws IOException {
     ParDoPayload payload = getParDoPayload(transform);
-    return payload.getStateSpecsCount() > 0 || payload.getTimerFamilySpecsCount() > 0;
+    return payload.getStateSpecsCount() > 0
+        || payload.getTimerFamilySpecsCount() > 0
+        || payload.getRequiresTimeSortedInput();

Review comment:
       I think modelling how something is executed and making that separate from what the users definition of their transform is important (so using the PTransformMatcher makes a lot of sense since not all runners will use state). For example, a runner can sort using their shuffle implementation where the timestamp is the sort key (this is something that Dataflow does for some batch pipelines) and other runners may choose to do this as well.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [beam] je-ik commented on pull request #11808: [WIP] [BEAM-10072] Fix RequiresTimeSortedInput for stateless DoFns

Posted by GitBox <gi...@apache.org>.
je-ik commented on pull request #11808:
URL: https://github.com/apache/beam/pull/11808#issuecomment-633539604


   cc @dmvk 
   this is just a WIP, but I'd be glad for any suggestions, if you have any so far


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [beam] je-ik commented on pull request #11808: [BEAM-10072] Fix RequiresTimeSortedInput for stateless DoFns

Posted by GitBox <gi...@apache.org>.
je-ik commented on pull request #11808:
URL: https://github.com/apache/beam/pull/11808#issuecomment-647699486


   Run DIrect ValidatesRunner


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [beam] stale[bot] closed pull request #11808: [BEAM-10072] Fix RequiresTimeSortedInput for stateless DoFns

Posted by GitBox <gi...@apache.org>.
stale[bot] closed pull request #11808:
URL: https://github.com/apache/beam/pull/11808


   


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [beam] je-ik commented on pull request #11808: [BEAM-10072] Fix RequiresTimeSortedInput for stateless DoFns

Posted by GitBox <gi...@apache.org>.
je-ik commented on pull request #11808:
URL: https://github.com/apache/beam/pull/11808#issuecomment-647699425


   Run Java PreCommit


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [beam] je-ik commented on a change in pull request #11808: [BEAM-10072] Fix RequiresTimeSortedInput for stateless DoFns

Posted by GitBox <gi...@apache.org>.
je-ik commented on a change in pull request #11808:
URL: https://github.com/apache/beam/pull/11808#discussion_r444722211



##########
File path: runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/ParDoTranslation.java
##########
@@ -739,7 +737,9 @@ private static ParDoPayload getParDoPayload(RunnerApi.PTransform parDoPTransform
 
   public static boolean usesStateOrTimers(AppliedPTransform<?, ?, ?> transform) throws IOException {
     ParDoPayload payload = getParDoPayload(transform);
-    return payload.getStateSpecsCount() > 0 || payload.getTimerFamilySpecsCount() > 0;
+    return payload.getStateSpecsCount() > 0
+        || payload.getTimerFamilySpecsCount() > 0
+        || payload.getRequiresTimeSortedInput();

Review comment:
       That is correct, I'm aware there was no clear consensus, so I tried to minimize the changes needed to fix the underlying issue. I think, that RequiresTimeSortedInput implies "statefulness" (which is the actual purpose of this method), because there is no way to sort data without state (at least in - generic - streaming case). The alternative way seems to be to introduce new `PTransformMatcher` but that would be just for case of "stateless" ordered DoFns, which - although legitimate - are sort of corner case. Vast majority of uses of RequiresTimeSortedInput will match the original condition of `getStateSpecsCount() > 0`. I'm not sure if introducing additional complexity of another PTransformMatcher, incorporating that in all runners that support the annotation is worth it. WDYT?

##########
File path: sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/ParDoTest.java
##########
@@ -2572,6 +2572,49 @@ public void testTwoRequiresTimeSortedInputWithLateData() {
           false);
     }
 
+    @Test
+    @Category({
+      ValidatesRunner.class,
+      UsesStatefulParDo.class,
+      UsesRequiresTimeSortedInput.class,
+      UsesStrictTimerOrdering.class,
+      UsesTestStream.class
+    })
+    public void testRequiresTimeSortedInputWithStatelessDoFn() {
+      // generate list long enough to rule out random shuffle in sorted order
+      int numElements = 1000;
+      List<Long> eventStamps =
+          LongStream.range(0, numElements)
+              .mapToObj(i -> numElements - i)
+              .collect(Collectors.toList());
+      TestStream.Builder<Long> stream = TestStream.create(VarLongCoder.of());
+      for (Long stamp : eventStamps) {
+        stream = stream.addElements(TimestampedValue.of(stamp, Instant.ofEpochMilli(stamp)));
+      }
+      testTimeSortedInputStateless(
+          numElements, pipeline.apply(stream.advanceWatermarkToInfinity()));
+    }
+
+    @Test
+    @Category({
+      ValidatesRunner.class,
+      UsesStatefulParDo.class,

Review comment:
       I updated the javadoc, thanks. The comment _Note that this annotation makes sense only for stateful ParDos, because outcome of stateless functions cannot depend on the ordering._ was replaced, because it built upon false premise (even stateless DoFn can have requirement for ordering, in cases it uses some form of "implicit" state - this can be RPC, internal clock, or similar mechanisms).
   




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [beam] lukecwik commented on a change in pull request #11808: [BEAM-10072] Fix RequiresTimeSortedInput for stateless DoFns

Posted by GitBox <gi...@apache.org>.
lukecwik commented on a change in pull request #11808:
URL: https://github.com/apache/beam/pull/11808#discussion_r445207524



##########
File path: runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/ParDoTranslation.java
##########
@@ -739,7 +737,9 @@ private static ParDoPayload getParDoPayload(RunnerApi.PTransform parDoPTransform
 
   public static boolean usesStateOrTimers(AppliedPTransform<?, ?, ?> transform) throws IOException {
     ParDoPayload payload = getParDoPayload(transform);
-    return payload.getStateSpecsCount() > 0 || payload.getTimerFamilySpecsCount() > 0;
+    return payload.getStateSpecsCount() > 0
+        || payload.getTimerFamilySpecsCount() > 0
+        || payload.getRequiresTimeSortedInput();

Review comment:
       I think modelling how something is executed using the PTransformMatcher makes a lot of sense since not all runners will use state. For example, a runner can sort using their shuffle implementation where the timestamp is the sort key (this is something that Dataflow does for some batch pipelines) and other runners may choose to do this as well.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [beam] stale[bot] commented on pull request #11808: [BEAM-10072] Fix RequiresTimeSortedInput for stateless DoFns

Posted by GitBox <gi...@apache.org>.
stale[bot] commented on pull request #11808:
URL: https://github.com/apache/beam/pull/11808#issuecomment-678973723


   This pull request has been marked as stale due to 60 days of inactivity. It will be closed in 1 week if no further activity occurs. If you think that’s incorrect or this pull request requires a review, please simply write any comment. If closed, you can revive the PR at any time and @mention a reviewer or discuss it on the dev@beam.apache.org list. Thank you for your contributions.
   


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org