You are viewing a plain text version of this content. The canonical link for it is here.
Posted to github@beam.apache.org by GitBox <gi...@apache.org> on 2021/10/13 00:26:36 UTC

[GitHub] [beam] lukecwik commented on a change in pull request #15540: [BEAM-12931] Allow for DoFn#getAllowedTimestampSkew() when checking the output timestamp

lukecwik commented on a change in pull request #15540:
URL: https://github.com/apache/beam/pull/15540#discussion_r727598137



##########
File path: sdks/java/harness/src/main/java/org/apache/beam/fn/harness/FnApiDoFnRunner.java
##########
@@ -1897,14 +1898,26 @@ private Instant minTargetAndGcTime(Instant target) {
       return Timer.cleared(userKey, dynamicTimerTag, Collections.singletonList(boundedWindow));
     }
 
+    @SuppressWarnings("deprecation") // Allowed Skew is deprecated for users, but must be respected
     private Timer<K> getTimerForTime(Instant scheduledTime) {
       if (outputTimestamp != null) {
-        checkArgument(
-            !outputTimestamp.isBefore(elementTimestampOrTimerHoldTimestamp),
-            "output timestamp %s should be after input message timestamp or output timestamp of"
-                + " firing timers %s",
-            outputTimestamp,
-            elementTimestampOrTimerHoldTimestamp);
+        Instant lowerBound;
+        Instant upperBound = BoundedWindow.TIMESTAMP_MAX_VALUE;
+        try {
+          lowerBound = elementTimestampOrTimerHoldTimestamp.minus(doFn.getAllowedTimestampSkew());
+        } catch (ArithmeticException e) {
+          lowerBound = BoundedWindow.TIMESTAMP_MIN_VALUE;
+        }
+        if (outputTimestamp.isBefore(lowerBound) || outputTimestamp.isAfter(upperBound)) {

Review comment:
       ```suggestion
           try {
             lowerBound = elementTimestampOrTimerHoldTimestamp.minus(doFn.getAllowedTimestampSkew());
           } catch (ArithmeticException e) {
             lowerBound = BoundedWindow.TIMESTAMP_MIN_VALUE;
           }
           if (outputTimestamp.isBefore(lowerBound) || outputTimestamp.isAfter(BoundedWindow.TIMESTAMP_MAX_VALUE)) {
   ```

##########
File path: sdks/java/harness/src/main/java/org/apache/beam/fn/harness/FnApiDoFnRunner.java
##########
@@ -1999,6 +2012,28 @@ public void set(String dynamicTimerTag, Instant absoluteTime) {
     }
   }
 
+  @SuppressWarnings("deprecation") // Allowed Skew is deprecated for users, but must be respected
+  private void checkTimestamp(Instant timestamp) {
+    Instant lowerBound;
+    Instant upperBound = BoundedWindow.TIMESTAMP_MAX_VALUE;
+    try {
+      lowerBound = currentElement.getTimestamp().minus(doFn.getAllowedTimestampSkew());
+    } catch (ArithmeticException e) {
+      lowerBound = BoundedWindow.TIMESTAMP_MIN_VALUE;
+    }
+    if (timestamp.isBefore(lowerBound) || timestamp.isAfter(upperBound)) {

Review comment:
       ```suggestion
       try {
         lowerBound = currentElement.getTimestamp().minus(doFn.getAllowedTimestampSkew());
       } catch (ArithmeticException e) {
         lowerBound = BoundedWindow.TIMESTAMP_MIN_VALUE;
       }
       if (timestamp.isBefore(lowerBound) || timestamp.isAfter(BoundedWindow.TIMESTAMP_MAX_VALUE)) {
   ```

##########
File path: runners/core-java/src/main/java/org/apache/beam/runners/core/SimpleDoFnRunner.java
##########
@@ -1190,13 +1195,24 @@ public Timer withOutputTimestamp(Instant outputTimestamp) {
      * </ul>
      */
     private void setAndVerifyOutputTimestamp() {
-
       if (outputTimestamp != null) {
-        checkArgument(
-            !outputTimestamp.isBefore(elementInputTimestamp),
-            "output timestamp %s should be after input message timestamp or output timestamp of firing timers %s",
-            outputTimestamp,
-            elementInputTimestamp);
+        Instant lowerBound;
+        Instant upperBound = BoundedWindow.TIMESTAMP_MAX_VALUE;
+        try {
+          lowerBound = elementInputTimestamp.minus(fn.getAllowedTimestampSkew());
+        } catch (ArithmeticException e) {
+          lowerBound = BoundedWindow.TIMESTAMP_MIN_VALUE;
+        }
+        if (outputTimestamp.isBefore(lowerBound) || outputTimestamp.isAfter(upperBound)) {

Review comment:
       ```suggestion
           try {
             lowerBound = elementInputTimestamp.minus(fn.getAllowedTimestampSkew());
           } catch (ArithmeticException e) {
             lowerBound = BoundedWindow.TIMESTAMP_MIN_VALUE;
           }
           if (outputTimestamp.isBefore(lowerBound) || outputTimestamp.isAfter(BoundedWindow.TIMESTAMP_MAX_VALUE)) {
   ```

##########
File path: runners/core-java/src/test/java/org/apache/beam/runners/core/SimpleDoFnRunnerTest.java
##########
@@ -386,6 +397,186 @@ public void testInfiniteSkew() {
             BoundedWindow.TIMESTAMP_MAX_VALUE));
   }
 
+  @Test
+  @Category({ValidatesRunner.class, UsesTimersInParDo.class})
+  public void testRunnerNoSkew() {
+    PCollection<Duration> input =
+        p.apply(
+            "create",
+            Create.timestamped(
+                Arrays.asList(new Duration(0L), new Duration(1L)), Arrays.asList(0L, 1L)));
+    input.apply(ParDo.of(new SkewingDoFn(Duration.ZERO)));
+    // The errors differ between runners but at least check that the output timestamp is printed.
+    thrown.expectMessage(String.format("%s", new Instant(0L)));
+    thrown.expectMessage("output");
+    p.run();
+  }
+
+  @Test
+  @Category({UsesTimersInParDo.class, ValidatesRunner.class})
+  public void testRunnerAllowedSkew() {
+    PCollection<Duration> input =
+        p.apply(
+            "create",
+            Create.timestamped(
+                Arrays.asList(new Duration(0L), new Duration(1L)), Arrays.asList(0L, 1L)));
+    input.apply(ParDo.of(new SkewingDoFn(new Duration(2L))));
+    p.run();
+  }
+
+  /**
+   * Demonstrates that attempting to set a timer with an output timestamp before the timestamp of
+   * the current element with zero {@link DoFn#getAllowedTimestampSkew() allowed timestamp skew}
+   * throws.
+   */
+  @Test
+  public void testTimerBackwardsInTimeNoSkew() {
+    TimerSkewingDoFn fn = new TimerSkewingDoFn(Duration.ZERO);
+    DoFnRunner<KV<String, Duration>, Duration> runner =
+        new SimpleDoFnRunner<>(
+            null,
+            fn,
+            NullSideInputReader.empty(),
+            new ListOutputManager(),
+            new TupleTag<>(),
+            Collections.emptyList(),
+            mockStepContext,
+            null,
+            Collections.emptyMap(),
+            WindowingStrategy.of(new GlobalWindows()),
+            DoFnSchemaInformation.create(),
+            Collections.emptyMap());
+
+    runner.startBundle();
+    // A timer with output timestamp at the current timestamp is fine.
+    runner.processElement(
+        WindowedValue.timestampedValueInGlobalWindow(KV.of("1", Duration.ZERO), new Instant(0)));
+
+    thrown.expect(UserCodeException.class);
+    thrown.expectCause(isA(IllegalArgumentException.class));
+    thrown.expectMessage("output timestamp of firing timers");
+    thrown.expectMessage(
+        String.format("output timestamp %s", new Instant(0).minus(Duration.millis(1L))));
+    thrown.expectMessage(
+        String.format(
+            "allowed skew %s", PeriodFormat.getDefault().print(Duration.ZERO.toPeriod())));
+
+    // A timer with output timestamp before (current time - skew) is forbidden
+    runner.processElement(
+        WindowedValue.timestampedValueInGlobalWindow(
+            KV.of("2", Duration.millis(1L)), new Instant(0)));
+  }
+
+  /**
+   * Demonstrates that attempting to have a timer with output timestamp before the timestamp of the
+   * current element plus the value of {@link DoFn#getAllowedTimestampSkew()} throws, but between
+   * that value and the current timestamp succeeds.
+   */
+  @Test
+  public void testTimerSkew() {
+    TimerSkewingDoFn fn = new TimerSkewingDoFn(Duration.standardMinutes(10L));
+    DoFnRunner<KV<String, Duration>, Duration> runner =
+        new SimpleDoFnRunner<>(
+            null,
+            fn,
+            NullSideInputReader.empty(),
+            new ListOutputManager(),
+            new TupleTag<>(),
+            Collections.emptyList(),
+            mockStepContext,
+            null,
+            Collections.emptyMap(),
+            WindowingStrategy.of(new GlobalWindows()),
+            DoFnSchemaInformation.create(),
+            Collections.emptyMap());
+
+    runner.startBundle();
+    // Timer with output timestamp between "now" and "now - allowed skew" succeeds.
+    runner.processElement(
+        WindowedValue.timestampedValueInGlobalWindow(
+            KV.of("1", Duration.standardMinutes(5L)), new Instant(0)));
+    thrown.expect(UserCodeException.class);
+    thrown.expectCause(isA(IllegalArgumentException.class));
+    thrown.expectMessage("output timestamp of firing timers");
+    thrown.expectMessage(
+        String.format("output timestamp %s", new Instant(0).minus(Duration.standardHours(1L))));
+    thrown.expectMessage(
+        String.format(
+            "allowed skew %s",
+            PeriodFormat.getDefault().print(Duration.standardMinutes(10L).toPeriod())));
+    // Timer with output timestamp before "now - allowed skew" fails.
+    runner.processElement(
+        WindowedValue.timestampedValueInGlobalWindow(
+            KV.of("2", Duration.standardHours(1L)), new Instant(0)));
+  }
+
+  /**
+   * Demonstrates that attempting to output an element with a timestamp before the current one
+   * always succeeds when {@link DoFn#getAllowedTimestampSkew()} is equal to {@link Long#MAX_VALUE}
+   * milliseconds.
+   */
+  @Test
+  public void testTimerInfiniteSkew() {
+    TimerSkewingDoFn fn = new TimerSkewingDoFn(Duration.millis(Long.MAX_VALUE));
+    DoFnRunner<KV<String, Duration>, Duration> runner =
+        new SimpleDoFnRunner<>(
+            null,
+            fn,
+            NullSideInputReader.empty(),
+            new ListOutputManager(),
+            new TupleTag<>(),
+            Collections.emptyList(),
+            mockStepContext,
+            null,
+            Collections.emptyMap(),
+            WindowingStrategy.of(new GlobalWindows()),
+            DoFnSchemaInformation.create(),
+            Collections.emptyMap());
+
+    runner.startBundle();
+    runner.processElement(
+        WindowedValue.timestampedValueInGlobalWindow(
+            KV.of("1", Duration.millis(1L)), new Instant(0)));
+    runner.processElement(
+        WindowedValue.timestampedValueInGlobalWindow(
+            KV.of("2", Duration.millis(1L)),
+            BoundedWindow.TIMESTAMP_MIN_VALUE.plus(Duration.millis(1))));
+    runner.processElement(
+        WindowedValue.timestampedValueInGlobalWindow(
+            KV.of(
+                "3",
+                // This is the maximum amount a timestamp in beam can move (from the maximum
+                // timestamp
+                // to the minimum timestamp).
+                Duration.millis(BoundedWindow.TIMESTAMP_MAX_VALUE.getMillis())
+                    .minus(Duration.millis(BoundedWindow.TIMESTAMP_MIN_VALUE.getMillis()))),
+            BoundedWindow.TIMESTAMP_MAX_VALUE));
+  }
+
+  @Test
+  @Category({UsesTimersInParDo.class, ValidatesRunner.class})
+  public void testRunnerTimerNoSkew() {
+    List<KV<String, Duration>> durations =
+        Arrays.asList(KV.of("0", new Duration(0L)), KV.of("2", new Duration(1L)));
+    PCollection<KV<String, Duration>> input =
+        p.apply("create", Create.timestamped(durations, Arrays.asList(0L, 2L)));
+    input.apply(ParDo.of(new TimerSkewingDoFn(Duration.ZERO)));
+    // The errors differ between runners but at least check that the output timestamp is printed.
+    thrown.expectMessage(String.format("%s", new Instant(1L)));

Review comment:
       I don't think this will work for Dataflow streaming.
   
   This passes on Jenkins because all of these tests don't run in both streaming and batch modes but will not work when we run tests inside Google.

##########
File path: sdks/java/harness/src/test/java/org/apache/beam/fn/harness/FnApiDoFnRunnerTest.java
##########
@@ -3878,6 +3885,182 @@ public void testProcessElementForWindowedTruncateAndSizeRestriction() throws Exc
     }
   }
 
+  @RunWith(JUnit4.class)
+  public static class ExceptionThrowingExecutionTest {
+    @Rule public final ExpectedException thrown = ExpectedException.none();
+
+    public static final String TEST_TRANSFORM_ID = "pTransformId";
+
+    /**
+     * A {@link DoFn} that outputs elements with timestamp equal to the input timestamp minus the
+     * input element.
+     */
+    private static class SkewingDoFn extends DoFn<String, String> {
+      private final Duration allowedSkew;
+
+      private SkewingDoFn(Duration allowedSkew) {
+        this.allowedSkew = allowedSkew;
+      }
+
+      @ProcessElement
+      public void processElement(ProcessContext context) {
+        Duration duration = new Duration(Long.valueOf(context.element()));
+        context.outputWithTimestamp(context.element(), context.timestamp().minus(duration));
+      }
+
+      @Override
+      public Duration getAllowedTimestampSkew() {
+        return allowedSkew;
+      }
+    }
+
+    @Test
+    public void testDoFnSkewNotAllowed() throws Exception {
+      Pipeline p = Pipeline.create();
+      PCollection<String> valuePCollection = p.apply(Create.of("0", "1"));
+      PCollection<String> outputPCollection =
+          valuePCollection.apply(TEST_TRANSFORM_ID, ParDo.of(new SkewingDoFn(Duration.ZERO)));
+
+      SdkComponents sdkComponents = SdkComponents.create(p.getOptions());
+      RunnerApi.Pipeline pProto = PipelineTranslation.toProto(p, sdkComponents);
+      String inputPCollectionId = sdkComponents.registerPCollection(valuePCollection);
+      String outputPCollectionId = sdkComponents.registerPCollection(outputPCollection);
+      RunnerApi.PTransform pTransform =
+          pProto
+              .getComponents()
+              .getTransformsOrThrow(
+                  pProto
+                      .getComponents()
+                      .getTransformsOrThrow(TEST_TRANSFORM_ID)
+                      .getSubtransforms(0));
+
+      List<WindowedValue<String>> mainOutputValues = new ArrayList<>();
+      MetricsContainerStepMap metricsContainerRegistry = new MetricsContainerStepMap();
+      PCollectionConsumerRegistry consumers =
+          new PCollectionConsumerRegistry(
+              metricsContainerRegistry, mock(ExecutionStateTracker.class));
+
+      consumers.register(
+          outputPCollectionId,
+          TEST_TRANSFORM_ID,
+          (FnDataReceiver) (FnDataReceiver<WindowedValue<String>>) mainOutputValues::add,
+          StringUtf8Coder.of());
+      PTransformFunctionRegistry startFunctionRegistry =
+          new PTransformFunctionRegistry(
+              mock(MetricsContainerStepMap.class), mock(ExecutionStateTracker.class), "start");
+      PTransformFunctionRegistry finishFunctionRegistry =
+          new PTransformFunctionRegistry(
+              mock(MetricsContainerStepMap.class), mock(ExecutionStateTracker.class), "finish");
+      List<ThrowingRunnable> teardownFunctions = new ArrayList<>();
+
+      new FnApiDoFnRunner.Factory<>()
+          .createRunnerForPTransform(
+              PipelineOptionsFactory.create(),
+              null /* beamFnDataClient */,
+              null /* beamFnStateClient */,
+              null /* beamFnTimerClient */,
+              TEST_TRANSFORM_ID,
+              pTransform,
+              Suppliers.ofInstance("57L")::get,
+              pProto.getComponents().getPcollectionsMap(),
+              pProto.getComponents().getCodersMap(),
+              pProto.getComponents().getWindowingStrategiesMap(),
+              consumers,
+              startFunctionRegistry,
+              finishFunctionRegistry,
+              null /* addResetFunction */,
+              teardownFunctions::add,
+              null /* addProgressRequestCallback */,
+              null /* splitListener */,
+              null /* bundleFinalizer */);
+
+      thrown.expect(UserCodeException.class);
+      thrown.expectMessage(String.format("timestamp %s", new Instant(0).minus(new Duration(1L))));
+      thrown.expectMessage(
+          String.format(
+              "allowed skew (%s)", PeriodFormat.getDefault().print(Duration.ZERO.toPeriod())));
+
+      Iterables.getOnlyElement(startFunctionRegistry.getFunctions()).run();
+      mainOutputValues.clear();
+
+      FnDataReceiver<WindowedValue<?>> mainInput =
+          consumers.getMultiplexingConsumer(inputPCollectionId);
+      mainInput.accept(valueInGlobalWindow("0"));
+      mainInput.accept(timestampedValueInGlobalWindow("1", new Instant(0L)));

Review comment:
       Please swap to use `assertThrows` since `thrown.expect` is deprecated because it doesn't do a good job at showing which line of code is causing the exception.

##########
File path: runners/core-java/src/test/java/org/apache/beam/runners/core/SimpleDoFnRunnerTest.java
##########
@@ -472,6 +663,37 @@ public Duration getAllowedTimestampSkew() {
     }
   }
 
+  /**
+   * A {@link DoFn} that creates/sets a timer with an output timestamp equal to the input timestamp
+   * minus the input element's value. Keys are ignored but required for timers.
+   */
+  private static class TimerSkewingDoFn extends DoFn<KV<String, Duration>, Duration> {
+    static final String TIMER_ID = "testTimerId";
+    private final Duration allowedSkew;
+
+    @TimerId(TIMER_ID)
+    private static final TimerSpec timer = TimerSpecs.timer(TimeDomain.PROCESSING_TIME);
+
+    private TimerSkewingDoFn(Duration allowedSkew) {
+      this.allowedSkew = allowedSkew;
+    }
+
+    @ProcessElement
+    public void processElement(ProcessContext context, @TimerId(TIMER_ID) Timer timer) {
+      timer
+          .withOutputTimestamp(context.timestamp().minus(context.element().getValue()))
+          .set(new Instant(0));
+    }
+
+    @OnTimer(TIMER_ID)
+    public void onTimer() {}

Review comment:
       It would be nice to be able to test the onTimer variant as well and not just when timers are set within processElement.

##########
File path: runners/core-java/src/test/java/org/apache/beam/runners/core/SimpleDoFnRunnerTest.java
##########
@@ -386,6 +399,180 @@ public void testInfiniteSkew() {
             BoundedWindow.TIMESTAMP_MAX_VALUE));
   }
 
+  @Test
+  @Category({ValidatesRunner.class})
+  public void testRunnerNoSkew() {
+    PCollection<Duration> input =
+        p.apply("create", Create.timestamped(Arrays.asList(new Duration(0L), new Duration(1L)), Arrays.asList(0L, 1L)));
+    input.apply(ParDo.of(new SkewingDoFn(Duration.ZERO)));
+    // The errors differ between runners but at least check that the output timestamp is printed.
+    thrown.expectMessage(String.format("%s", new Instant(0L)));

Review comment:
       The issue is that `thrown.expect` is going to rely on Pipeline.run throwing the underlying cause for the job failure. I don't think all runners properly propagate the error (e.g. Dataflow streaming) that caused the pipeline to fail. The other failure tests in `ParDoTest.java` use `NeedsRunner` which run with the DirectRunner and not all runners. So if you want to only test DirectRunner mark as `NeedsRunner` instead of `ValidatesRunner`

##########
File path: runners/core-java/src/main/java/org/apache/beam/runners/core/SimpleDoFnRunner.java
##########
@@ -437,19 +437,24 @@ public Instant timestamp() {
 
     @SuppressWarnings("deprecation") // Allowed Skew is deprecated for users, but must be respected
     private void checkTimestamp(Instant timestamp) {
-      // The documentation of getAllowedTimestampSkew explicitly permits Long.MAX_VALUE to be used
-      // for infinite skew. Defend against underflow in that case for timestamps before the epoch
-      if (fn.getAllowedTimestampSkew().getMillis() != Long.MAX_VALUE
-          && timestamp.isBefore(elem.getTimestamp().minus(fn.getAllowedTimestampSkew()))) {
+      Instant lowerBound;
+      Instant upperBound = BoundedWindow.TIMESTAMP_MAX_VALUE;
+      try {
+        lowerBound = elem.getTimestamp().minus(fn.getAllowedTimestampSkew());
+      } catch (ArithmeticException e) {
+        lowerBound = BoundedWindow.TIMESTAMP_MIN_VALUE;
+      }
+      if (timestamp.isBefore(lowerBound) || timestamp.isAfter(upperBound)) {

Review comment:
       ```suggestion
         try {
           lowerBound = elem.getTimestamp().minus(fn.getAllowedTimestampSkew());
         } catch (ArithmeticException e) {
           lowerBound = BoundedWindow.TIMESTAMP_MIN_VALUE;
         }
         if (timestamp.isBefore(lowerBound) || timestamp.isAfter(BoundedWindow.TIMESTAMP_MAX_VALUE)) {
   ```

##########
File path: runners/core-java/src/main/java/org/apache/beam/runners/core/SimpleDoFnRunner.java
##########
@@ -437,19 +437,24 @@ public Instant timestamp() {
 
     @SuppressWarnings("deprecation") // Allowed Skew is deprecated for users, but must be respected
     private void checkTimestamp(Instant timestamp) {
-      // The documentation of getAllowedTimestampSkew explicitly permits Long.MAX_VALUE to be used
-      // for infinite skew. Defend against underflow in that case for timestamps before the epoch
-      if (fn.getAllowedTimestampSkew().getMillis() != Long.MAX_VALUE
-          && timestamp.isBefore(elem.getTimestamp().minus(fn.getAllowedTimestampSkew()))) {
+      Instant lowerBound;
+      Instant upperBound = BoundedWindow.TIMESTAMP_MAX_VALUE;
+      try {
+        lowerBound = elem.getTimestamp().minus(fn.getAllowedTimestampSkew());
+      } catch (ArithmeticException e) {
+        lowerBound = BoundedWindow.TIMESTAMP_MIN_VALUE;
+      }
+      if (timestamp.isBefore(lowerBound) || timestamp.isAfter(upperBound)) {
         throw new IllegalArgumentException(
             String.format(
                 "Cannot output with timestamp %s. Output timestamps must be no earlier than the "
-                    + "timestamp of the current input (%s) minus the allowed skew (%s). See the "
-                    + "DoFn#getAllowedTimestampSkew() Javadoc for details on changing the allowed "
-                    + "skew.",
+                    + "timestamp of the current input (%s) minus the allowed skew (%s) and no "
+                    + "later than %s. See the DoFn#getAllowedTimestampSkew() Javadoc for details "
+                    + "on changing the allowed skew.",
                 timestamp,
                 elem.getTimestamp(),
-                PeriodFormat.getDefault().print(fn.getAllowedTimestampSkew().toPeriod())));
+                PeriodFormat.getDefault().print(fn.getAllowedTimestampSkew().toPeriod()),
+                upperBound));

Review comment:
       ```suggestion
                   BoundedWindow.TIMESTAMP_MAX_VALUE));
   ```

##########
File path: runners/core-java/src/main/java/org/apache/beam/runners/core/SimpleDoFnRunner.java
##########
@@ -1190,13 +1195,24 @@ public Timer withOutputTimestamp(Instant outputTimestamp) {
      * </ul>
      */
     private void setAndVerifyOutputTimestamp() {
-
       if (outputTimestamp != null) {
-        checkArgument(
-            !outputTimestamp.isBefore(elementInputTimestamp),
-            "output timestamp %s should be after input message timestamp or output timestamp of firing timers %s",
-            outputTimestamp,
-            elementInputTimestamp);
+        Instant lowerBound;
+        Instant upperBound = BoundedWindow.TIMESTAMP_MAX_VALUE;
+        try {
+          lowerBound = elementInputTimestamp.minus(fn.getAllowedTimestampSkew());
+        } catch (ArithmeticException e) {
+          lowerBound = BoundedWindow.TIMESTAMP_MIN_VALUE;
+        }
+        if (outputTimestamp.isBefore(lowerBound) || outputTimestamp.isAfter(upperBound)) {
+          throw new IllegalArgumentException(
+              String.format(
+                  "output timestamp %s (allowed skew %s) should be after input message timestamp or"
+                      + " output timestamp of firing timers %s and before %s",
+                  outputTimestamp,
+                  PeriodFormat.getDefault().print(fn.getAllowedTimestampSkew().toPeriod()),
+                  elementInputTimestamp,
+                  upperBound));

Review comment:
       The previous message isn't great. Can we improve this to be the similar to the other message like:
   ```
   Cannot output timer with timestamp %s. Output timestamps must be no earlier than the timestamp of the current input (%s) minus the allowed skew (%s) and no later than %s. See the DoFn#getAllowedTimestampSkew() Javadoc for details on changing the allowed skew.
   ```
   
   Ditto here and in `FnApiDoFnRunner.java`

##########
File path: runners/core-java/src/test/java/org/apache/beam/runners/core/SimpleDoFnRunnerTest.java
##########
@@ -386,6 +397,186 @@ public void testInfiniteSkew() {
             BoundedWindow.TIMESTAMP_MAX_VALUE));
   }
 
+  @Test
+  @Category({ValidatesRunner.class, UsesTimersInParDo.class})
+  public void testRunnerNoSkew() {

Review comment:
       Please move the `ValidatesRunner` tests to `ParDoTest.java` since that is where the bulk of the other ParDo specific `ValidatesRunner` and `NeedsRunner` test exist.

##########
File path: sdks/java/harness/src/main/java/org/apache/beam/fn/harness/FnApiDoFnRunner.java
##########
@@ -1999,6 +2012,28 @@ public void set(String dynamicTimerTag, Instant absoluteTime) {
     }
   }
 
+  @SuppressWarnings("deprecation") // Allowed Skew is deprecated for users, but must be respected
+  private void checkTimestamp(Instant timestamp) {
+    Instant lowerBound;
+    Instant upperBound = BoundedWindow.TIMESTAMP_MAX_VALUE;
+    try {
+      lowerBound = currentElement.getTimestamp().minus(doFn.getAllowedTimestampSkew());
+    } catch (ArithmeticException e) {
+      lowerBound = BoundedWindow.TIMESTAMP_MIN_VALUE;
+    }
+    if (timestamp.isBefore(lowerBound) || timestamp.isAfter(upperBound)) {
+      throw new IllegalArgumentException(
+          String.format(
+              "Cannot output with timestamp %s. Output timestamps must be no earlier than the "
+                  + "timestamp of the current input (%s) minus the allowed skew (%s). See the "

Review comment:
       This message differs from the one in SimpleDoFnRunner as it is missing the upper bound.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@beam.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org