You are viewing a plain text version of this content. The canonical link for it is here.
Posted to github@beam.apache.org by GitBox <gi...@apache.org> on 2020/06/24 08:36:17 UTC

[GitHub] [beam] je-ik commented on a change in pull request #11808: [BEAM-10072] Fix RequiresTimeSortedInput for stateless DoFns

je-ik commented on a change in pull request #11808:
URL: https://github.com/apache/beam/pull/11808#discussion_r444722211



##########
File path: runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/ParDoTranslation.java
##########
@@ -739,7 +737,9 @@ private static ParDoPayload getParDoPayload(RunnerApi.PTransform parDoPTransform
 
   public static boolean usesStateOrTimers(AppliedPTransform<?, ?, ?> transform) throws IOException {
     ParDoPayload payload = getParDoPayload(transform);
-    return payload.getStateSpecsCount() > 0 || payload.getTimerFamilySpecsCount() > 0;
+    return payload.getStateSpecsCount() > 0
+        || payload.getTimerFamilySpecsCount() > 0
+        || payload.getRequiresTimeSortedInput();

Review comment:
       That is correct, I'm aware there was no clear consensus, so I tried to minimize the changes needed to fix the underlying issue. I think, that RequiresTimeSortedInput implies "statefulness" (which is the actual purpose of this method), because there is no way to sort data without state (at least in - generic - streaming case). The alternative way seems to be to introduce new `PTransformMatcher` but that would be just for case of "stateless" ordered DoFns, which - although legitimate - are sort of corner case. Vast majority of uses of RequiresTimeSortedInput will match the original condition of `getStateSpecsCount() > 0`. I'm not sure if introducing additional complexity of another PTransformMatcher, incorporating that in all runners that support the annotation is worth it. WDYT?

##########
File path: sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/ParDoTest.java
##########
@@ -2572,6 +2572,49 @@ public void testTwoRequiresTimeSortedInputWithLateData() {
           false);
     }
 
+    @Test
+    @Category({
+      ValidatesRunner.class,
+      UsesStatefulParDo.class,
+      UsesRequiresTimeSortedInput.class,
+      UsesStrictTimerOrdering.class,
+      UsesTestStream.class
+    })
+    public void testRequiresTimeSortedInputWithStatelessDoFn() {
+      // generate list long enough to rule out random shuffle in sorted order
+      int numElements = 1000;
+      List<Long> eventStamps =
+          LongStream.range(0, numElements)
+              .mapToObj(i -> numElements - i)
+              .collect(Collectors.toList());
+      TestStream.Builder<Long> stream = TestStream.create(VarLongCoder.of());
+      for (Long stamp : eventStamps) {
+        stream = stream.addElements(TimestampedValue.of(stamp, Instant.ofEpochMilli(stamp)));
+      }
+      testTimeSortedInputStateless(
+          numElements, pipeline.apply(stream.advanceWatermarkToInfinity()));
+    }
+
+    @Test
+    @Category({
+      ValidatesRunner.class,
+      UsesStatefulParDo.class,

Review comment:
       I updated the javadoc, thanks. The comment _Note that this annotation makes sense only for stateful ParDos, because outcome of stateless functions cannot depend on the ordering._ was replaced, because it built upon false premise (even stateless DoFn can have requirement for ordering, in cases it uses some form of "implicit" state - this can be RPC, internal clock, or similar mechanisms).
   




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org