You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@beam.apache.org by re...@apache.org on 2020/02/09 07:55:39 UTC

[beam] branch master updated: Merge pull request #10627:[BEAM-2535] Support outputTimestamp and watermark holds in processing timers.

This is an automated email from the ASF dual-hosted git repository.

reuvenlax pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/beam.git


The following commit(s) were added to refs/heads/master by this push:
     new a005fd7  Merge pull request #10627:[BEAM-2535] Support outputTimestamp and watermark holds in processing timers.
a005fd7 is described below

commit a005fd765a762183ca88df90f261f6d4a20cf3e0
Author: Rehman <re...@gmail.com>
AuthorDate: Sun Feb 9 12:55:27 2020 +0500

    Merge pull request #10627:[BEAM-2535] Support outputTimestamp and watermark holds in processing timers.
---
 .../apache/beam/runners/core/SimpleDoFnRunner.java | 80 +++++++++++++-----
 .../beam/runners/direct/WatermarkManager.java      | 47 +++++++++--
 .../beam/runners/direct/WatermarkManagerTest.java  |  5 +-
 .../org/apache/beam/sdk/transforms/ParDoTest.java  | 97 +++++++++++++++++++++-
 .../apache/beam/fn/harness/FnApiDoFnRunner.java    |  1 -
 5 files changed, 199 insertions(+), 31 deletions(-)

diff --git a/runners/core-java/src/main/java/org/apache/beam/runners/core/SimpleDoFnRunner.java b/runners/core-java/src/main/java/org/apache/beam/runners/core/SimpleDoFnRunner.java
index aaae986..98d8e04 100644
--- a/runners/core-java/src/main/java/org/apache/beam/runners/core/SimpleDoFnRunner.java
+++ b/runners/core-java/src/main/java/org/apache/beam/runners/core/SimpleDoFnRunner.java
@@ -762,7 +762,7 @@ public class SimpleDoFnRunner<InputT, OutputT> implements DoFnRunner<InputT, Out
       try {
         TimerSpec spec = (TimerSpec) signature.timerDeclarations().get(timerId).field().get(fn);
         return new TimerInternalsTimer(
-            window(), getNamespace(), timerId, spec, stepContext.timerInternals());
+            window(), getNamespace(), timerId, spec, timestamp(), stepContext.timerInternals());
       } catch (IllegalAccessException e) {
         throw new RuntimeException(e);
       }
@@ -774,7 +774,12 @@ public class SimpleDoFnRunner<InputT, OutputT> implements DoFnRunner<InputT, Out
         TimerSpec spec =
             (TimerSpec) signature.timerFamilyDeclarations().get(timerFamilyId).field().get(fn);
         return new TimerInternalsTimerMap(
-            timerFamilyId, window(), getNamespace(), spec, stepContext.timerInternals());
+            timerFamilyId,
+            window(),
+            getNamespace(),
+            spec,
+            timestamp(),
+            stepContext.timerInternals());
       } catch (IllegalAccessException e) {
         throw new RuntimeException(e);
       }
@@ -949,7 +954,7 @@ public class SimpleDoFnRunner<InputT, OutputT> implements DoFnRunner<InputT, Out
       try {
         TimerSpec spec = (TimerSpec) signature.timerDeclarations().get(timerId).field().get(fn);
         return new TimerInternalsTimer(
-            window, getNamespace(), timerId, spec, stepContext.timerInternals());
+            window, getNamespace(), timerId, spec, timestamp(), stepContext.timerInternals());
       } catch (IllegalAccessException e) {
         throw new RuntimeException(e);
       }
@@ -961,7 +966,12 @@ public class SimpleDoFnRunner<InputT, OutputT> implements DoFnRunner<InputT, Out
         TimerSpec spec =
             (TimerSpec) signature.timerFamilyDeclarations().get(timerFamilyId).field().get(fn);
         return new TimerInternalsTimerMap(
-            timerFamilyId, window(), getNamespace(), spec, stepContext.timerInternals());
+            timerFamilyId,
+            window(),
+            getNamespace(),
+            spec,
+            timestamp(),
+            stepContext.timerInternals());
       } catch (IllegalAccessException e) {
         throw new RuntimeException(e);
       }
@@ -1006,6 +1016,7 @@ public class SimpleDoFnRunner<InputT, OutputT> implements DoFnRunner<InputT, Out
     private final TimerSpec spec;
     private Instant target;
     private Instant outputTimestamp;
+    private final Instant elementInputTimestamp;
     private Duration period = Duration.ZERO;
     private Duration offset = Duration.ZERO;
 
@@ -1014,12 +1025,14 @@ public class SimpleDoFnRunner<InputT, OutputT> implements DoFnRunner<InputT, Out
         StateNamespace namespace,
         String timerId,
         TimerSpec spec,
+        Instant elementInputTimestamp,
         TimerInternals timerInternals) {
       this.window = window;
       this.namespace = namespace;
       this.timerId = timerId;
       this.timerFamilyId = "";
       this.spec = spec;
+      this.elementInputTimestamp = elementInputTimestamp;
       this.timerInternals = timerInternals;
     }
 
@@ -1029,12 +1042,14 @@ public class SimpleDoFnRunner<InputT, OutputT> implements DoFnRunner<InputT, Out
         String timerId,
         String timerFamilyId,
         TimerSpec spec,
+        Instant elementInputTimestamp,
         TimerInternals timerInternals) {
       this.window = window;
       this.namespace = namespace;
       this.timerId = timerId;
       this.timerFamilyId = timerFamilyId;
       this.spec = spec;
+      this.elementInputTimestamp = elementInputTimestamp;
       this.timerInternals = timerInternals;
     }
 
@@ -1111,24 +1126,35 @@ public class SimpleDoFnRunner<InputT, OutputT> implements DoFnRunner<InputT, Out
      * </ul>
      */
     private void setAndVerifyOutputTimestamp() {
-      // Output timestamp is currently not supported in processing time timers.
-      if (outputTimestamp != null && !TimeDomain.EVENT_TIME.equals(spec.getTimeDomain())) {
-        throw new IllegalStateException("Cannot set outputTimestamp in processing time domain.");
+
+      if (outputTimestamp != null) {
+        checkArgument(
+            !outputTimestamp.isBefore(elementInputTimestamp),
+            "output timestamp %s should be after input message timestamp or output timestamp of firing timers %s",
+            outputTimestamp,
+            elementInputTimestamp);
       }
+
       // Output timestamp is set to the delivery time if not initialized by an user.
-      if (outputTimestamp == null) {
+      if (outputTimestamp == null && TimeDomain.EVENT_TIME.equals(spec.getTimeDomain())) {
         outputTimestamp = target;
       }
-
-      if (TimeDomain.EVENT_TIME.equals(spec.getTimeDomain())) {
-        Instant windowExpiry = window.maxTimestamp().plus(allowedLateness);
-        checkArgument(
-            !target.isAfter(windowExpiry),
-            "Attempted to set event time timer that outputs for %s but that is"
-                + " after the expiration of window %s",
-            target,
-            windowExpiry);
+      // For processing timers
+      if (outputTimestamp == null) {
+        // For processing timers output timestamp will be:
+        // 1) timestamp of input element
+        // OR
+        // 2) output timestamp of firing timer.
+        outputTimestamp = elementInputTimestamp;
       }
+
+      Instant windowExpiry = window.maxTimestamp().plus(allowedLateness);
+      checkArgument(
+          !target.isAfter(windowExpiry),
+          "Attempted to set event time timer that outputs for %s but that is"
+              + " after the expiration of window %s",
+          target,
+          windowExpiry);
     }
 
     /**
@@ -1163,6 +1189,7 @@ public class SimpleDoFnRunner<InputT, OutputT> implements DoFnRunner<InputT, Out
     private final BoundedWindow window;
     private final StateNamespace namespace;
     private final TimerSpec spec;
+    private final Instant elementInputTimestamp;
     private final String timerFamilyId;
 
     public TimerInternalsTimerMap(
@@ -1170,10 +1197,12 @@ public class SimpleDoFnRunner<InputT, OutputT> implements DoFnRunner<InputT, Out
         BoundedWindow window,
         StateNamespace namespace,
         TimerSpec spec,
+        Instant elementInputTimestamp,
         TimerInternals timerInternals) {
       this.window = window;
       this.namespace = namespace;
       this.spec = spec;
+      this.elementInputTimestamp = elementInputTimestamp;
       this.timerInternals = timerInternals;
       this.timerFamilyId = timerFamilyId;
     }
@@ -1181,7 +1210,14 @@ public class SimpleDoFnRunner<InputT, OutputT> implements DoFnRunner<InputT, Out
     @Override
     public void set(String timerId, Instant absoluteTime) {
       Timer timer =
-          new TimerInternalsTimer(window, namespace, timerId, timerFamilyId, spec, timerInternals);
+          new TimerInternalsTimer(
+              window,
+              namespace,
+              timerId,
+              timerFamilyId,
+              spec,
+              elementInputTimestamp,
+              timerInternals);
       timer.set(absoluteTime);
       timers.put(timerId, timer);
     }
@@ -1191,7 +1227,13 @@ public class SimpleDoFnRunner<InputT, OutputT> implements DoFnRunner<InputT, Out
       if (timers.get(timerId) == null) {
         Timer timer =
             new TimerInternalsTimer(
-                window, namespace, timerId, timerFamilyId, spec, timerInternals);
+                window,
+                namespace,
+                timerId,
+                timerFamilyId,
+                spec,
+                elementInputTimestamp,
+                timerInternals);
         timers.put(timerId, timer);
       }
       return timers.get(timerId);
diff --git a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/WatermarkManager.java b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/WatermarkManager.java
index 265ebb1..35cfd23 100644
--- a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/WatermarkManager.java
+++ b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/WatermarkManager.java
@@ -327,10 +327,19 @@ public class WatermarkManager<ExecutableT, CollectionT> {
       if (pendingTimers.isEmpty()) {
         return BoundedWindow.TIMESTAMP_MAX_VALUE;
       } else {
-        return pendingTimers.firstEntry().getElement().getOutputTimestamp();
+        return getMinimumOutputTimestamp(pendingTimers);
       }
     }
 
+    private Instant getMinimumOutputTimestamp(SortedMultiset<TimerData> timers) {
+      Instant minimumOutputTimestamp = timers.firstEntry().getElement().getOutputTimestamp();
+      for (TimerData timerData : timers) {
+        minimumOutputTimestamp =
+            INSTANT_ORDERING.min(timerData.getOutputTimestamp(), minimumOutputTimestamp);
+      }
+      return minimumOutputTimestamp;
+    }
+
     @VisibleForTesting
     synchronized void updateTimers(TimerUpdate update) {
       NavigableSet<TimerData> keyTimers =
@@ -597,20 +606,29 @@ public class WatermarkManager<ExecutableT, CollectionT> {
       Instant earliest = THE_END_OF_TIME.get();
       for (NavigableSet<TimerData> timers : processingTimers.values()) {
         if (!timers.isEmpty()) {
-          earliest = INSTANT_ORDERING.min(timers.first().getTimestamp(), earliest);
+          earliest = INSTANT_ORDERING.min(getMinimumOutputTimestamp(timers), earliest);
         }
       }
       for (NavigableSet<TimerData> timers : synchronizedProcessingTimers.values()) {
         if (!timers.isEmpty()) {
-          earliest = INSTANT_ORDERING.min(timers.first().getTimestamp(), earliest);
+          earliest = INSTANT_ORDERING.min(getMinimumOutputTimestamp(timers), earliest);
         }
       }
       if (!pendingTimers.isEmpty()) {
-        earliest = INSTANT_ORDERING.min(pendingTimers.first().getTimestamp(), earliest);
+        earliest = INSTANT_ORDERING.min(getMinimumOutputTimestamp(pendingTimers), earliest);
       }
       return earliest;
     }
 
+    private Instant getMinimumOutputTimestamp(NavigableSet<TimerData> timers) {
+      Instant minimumOutputTimestamp = timers.first().getOutputTimestamp();
+      for (TimerData timerData : timers) {
+        minimumOutputTimestamp =
+            INSTANT_ORDERING.min(timerData.getOutputTimestamp(), minimumOutputTimestamp);
+      }
+      return minimumOutputTimestamp;
+    }
+
     private synchronized void updateTimers(TimerUpdate update) {
       Map<TimeDomain, NavigableSet<TimerData>> timerMap = timerMap(update.key);
       Table<StateNamespace, String, TimerData> existingTimersForKey =
@@ -738,15 +756,25 @@ public class WatermarkManager<ExecutableT, CollectionT> {
     private final String name;
 
     private final SynchronizedProcessingTimeInputWatermark inputWm;
+    private final PerKeyHolds holds;
     private AtomicReference<Instant> latestRefresh;
 
     public SynchronizedProcessingTimeOutputWatermark(
         String name, SynchronizedProcessingTimeInputWatermark inputWm) {
       this.name = name;
       this.inputWm = inputWm;
+      holds = new PerKeyHolds();
       this.latestRefresh = new AtomicReference<>(BoundedWindow.TIMESTAMP_MIN_VALUE);
     }
 
+    public synchronized void updateHold(Object key, Instant newHold) {
+      if (newHold == null) {
+        holds.removeHold(key);
+      } else {
+        holds.updateHold(key, newHold);
+      }
+    }
+
     @Override
     public String getName() {
       return name;
@@ -780,7 +808,8 @@ public class WatermarkManager<ExecutableT, CollectionT> {
       // downstream timers to.
       Instant oldRefresh = latestRefresh.get();
       Instant newTimestamp =
-          INSTANT_ORDERING.min(inputWm.get(), inputWm.getEarliestTimerTimestamp());
+          INSTANT_ORDERING.min(
+              inputWm.get(), holds.getMinHold(), inputWm.getEarliestTimerTimestamp());
       latestRefresh.set(newTimestamp);
       return updateAndTrace(getName(), oldRefresh, newTimestamp);
     }
@@ -788,6 +817,7 @@ public class WatermarkManager<ExecutableT, CollectionT> {
     @Override
     public synchronized String toString() {
       return MoreObjects.toStringHelper(SynchronizedProcessingTimeOutputWatermark.class)
+          .add("holds", holds)
           .add("latestRefresh", latestRefresh)
           .toString();
     }
@@ -1133,6 +1163,9 @@ public class WatermarkManager<ExecutableT, CollectionT> {
     TransformWatermarks transformWms = transformToWatermarks.get(executable);
     transformWms.setEventTimeHold(
         inputBundle == null ? null : inputBundle.getKey(), pending.getEarliestHold());
+
+    transformWms.setSynchronizedProcessingTimeHold(
+        inputBundle == null ? null : inputBundle.getKey(), pending.getEarliestHold());
   }
 
   /**
@@ -1438,6 +1471,10 @@ public class WatermarkManager<ExecutableT, CollectionT> {
       outputWatermark.updateHold(key, newHold);
     }
 
+    private void setSynchronizedProcessingTimeHold(Object key, Instant newHold) {
+      synchronizedProcessingOutputWatermark.updateHold(key, newHold);
+    }
+
     private void removePending(Bundle<?, ?> bundle) {
       inputWatermark.removePending(bundle);
       synchronizedProcessingInputWatermark.removePending(bundle);
diff --git a/runners/direct-java/src/test/java/org/apache/beam/runners/direct/WatermarkManagerTest.java b/runners/direct-java/src/test/java/org/apache/beam/runners/direct/WatermarkManagerTest.java
index 5e9cfc2..eef43c7 100644
--- a/runners/direct-java/src/test/java/org/apache/beam/runners/direct/WatermarkManagerTest.java
+++ b/runners/direct-java/src/test/java/org/apache/beam/runners/direct/WatermarkManagerTest.java
@@ -1052,10 +1052,9 @@ public class WatermarkManagerTest implements Serializable {
         Collections.emptyList(),
         BoundedWindow.TIMESTAMP_MAX_VALUE);
     manager.refreshAll();
-    assertThat(filteredDoubledWms.getSynchronizedProcessingOutputTime(), equalTo(clock.now()));
 
     clock.set(new Instant(Long.MAX_VALUE));
-    assertThat(filteredWms.getSynchronizedProcessingOutputTime(), equalTo(new Instant(4096)));
+
     assertThat(
         filteredDoubledWms.getSynchronizedProcessingOutputTime(),
         not(greaterThan(new Instant(4096))));
@@ -1161,7 +1160,7 @@ public class WatermarkManagerTest implements Serializable {
         BoundedWindow.TIMESTAMP_MAX_VALUE);
     manager.refreshAll();
 
-    assertThat(downstreamWms.getSynchronizedProcessingInputTime(), not(lessThan(clock.now())));
+    assertThat(downstreamWms.getSynchronizedProcessingInputTime(), not(greaterThan(clock.now())));
   }
 
   @Test
diff --git a/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/ParDoTest.java b/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/ParDoTest.java
index 65f7003..6bcb1fe 100644
--- a/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/ParDoTest.java
+++ b/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/ParDoTest.java
@@ -97,6 +97,7 @@ import org.apache.beam.sdk.testing.UsesSideInputsWithDifferentCoders;
 import org.apache.beam.sdk.testing.UsesStatefulParDo;
 import org.apache.beam.sdk.testing.UsesStrictTimerOrdering;
 import org.apache.beam.sdk.testing.UsesTestStream;
+import org.apache.beam.sdk.testing.UsesTestStreamWithOutputTimestamp;
 import org.apache.beam.sdk.testing.UsesTestStreamWithProcessingTime;
 import org.apache.beam.sdk.testing.UsesTimerMap;
 import org.apache.beam.sdk.testing.UsesTimersInParDo;
@@ -3936,8 +3937,12 @@ public class ParDoTest implements Serializable {
 
             @ProcessElement
             public void processElement(
-                @TimerId(timerId) Timer timer, OutputReceiver<KV<String, Long>> o) {
-              timer.withOutputTimestamp(new Instant(5)).set(new Instant(10));
+                @TimerId(timerId) Timer timer,
+                @Timestamp Instant timestamp,
+                OutputReceiver<KV<String, Long>> o) {
+              timer
+                  .withOutputTimestamp(timestamp.plus(Duration.millis(5)))
+                  .set(timestamp.plus(Duration.millis(10)));
               // Output a message. This will cause the next DoFn to set a timer as well.
               o.output(KV.of("foo", 100L));
             }
@@ -3958,6 +3963,7 @@ public class ParDoTest implements Serializable {
             @ProcessElement
             public void processElement(
                 @TimerId(timerId) Timer timer,
+                @Timestamp Instant timestamp,
                 @StateId("timerFired") ValueState<Boolean> timerFiredState) {
               Boolean timerFired = timerFiredState.read();
               assertTrue(timerFired == null || !timerFired);
@@ -3966,7 +3972,7 @@ public class ParDoTest implements Serializable {
               // DoFn timer's watermark hold. This timer should not fire until the previous timer
               // fires and removes
               // the watermark hold.
-              timer.set(new Instant(8));
+              timer.set(timestamp.plus(Duration.millis(8)));
             }
 
             @OnTimer(timerId)
@@ -3996,6 +4002,91 @@ public class ParDoTest implements Serializable {
       pipeline.run();
     }
 
+    @Test
+    @Category({
+      ValidatesRunner.class,
+      UsesStatefulParDo.class,
+      UsesTimersInParDo.class,
+      UsesTestStreamWithProcessingTime.class,
+      UsesTestStreamWithOutputTimestamp.class
+    })
+    public void testOutputTimestampWithProcessingTime() {
+      final String timerId = "foo";
+      DoFn<KV<String, Integer>, KV<String, Integer>> fn1 =
+          new DoFn<KV<String, Integer>, KV<String, Integer>>() {
+
+            @TimerId(timerId)
+            private final TimerSpec timer = TimerSpecs.timer(TimeDomain.PROCESSING_TIME);
+
+            @ProcessElement
+            public void processElement(
+                @TimerId(timerId) Timer timer,
+                @Timestamp Instant timestamp,
+                OutputReceiver<KV<String, Integer>> o) {
+              timer
+                  .withOutputTimestamp(timestamp.plus(Duration.standardSeconds(5)))
+                  .offset(Duration.standardSeconds(10))
+                  .setRelative();
+              // Output a message. This will cause the next DoFn to set a timer as well.
+              o.output(KV.of("foo", 100));
+            }
+
+            @OnTimer(timerId)
+            public void onTimer(OnTimerContext c, BoundedWindow w) {}
+          };
+
+      DoFn<KV<String, Integer>, Integer> fn2 =
+          new DoFn<KV<String, Integer>, Integer>() {
+
+            @TimerId(timerId)
+            private final TimerSpec timer = TimerSpecs.timer(TimeDomain.EVENT_TIME);
+
+            @StateId("timerFired")
+            final StateSpec<ValueState<Boolean>> timerFiredState = StateSpecs.value();
+
+            @ProcessElement
+            public void processElement(
+                @TimerId(timerId) Timer timer,
+                @StateId("timerFired") ValueState<Boolean> timerFiredState) {
+              Boolean timerFired = timerFiredState.read();
+              assertTrue(timerFired == null || !timerFired);
+              // Set a timer to 8. This is earlier than the previous DoFn's timer, but after the
+              // previous
+              // DoFn timer's watermark hold. This timer should not fire until the previous timer
+              // fires and removes
+              // the watermark hold.
+              timer.set(new Instant(8));
+            }
+
+            @OnTimer(timerId)
+            public void onTimer(
+                @StateId("timerFired") ValueState<Boolean> timerFiredState,
+                OutputReceiver<Integer> o) {
+              timerFiredState.write(true);
+              o.output(100);
+            }
+          };
+
+      TestStream<KV<String, Integer>> stream =
+          TestStream.create(KvCoder.of(StringUtf8Coder.of(), VarIntCoder.of()))
+              .advanceProcessingTime(Duration.standardSeconds(1))
+              // Cause fn2 to set a timer.
+              .addElements(KV.of("key", 1))
+              // Normally this would case fn2's timer to expire, but it shouldn't here because of
+              // the output timestamp.
+              .advanceProcessingTime(Duration.standardSeconds(9))
+              .advanceWatermarkTo(new Instant(11))
+              // If the timer fired, then this would case fn2 to fail with an assertion error.
+              .addElements(KV.of("key", 1))
+              .advanceProcessingTime(Duration.standardSeconds(100))
+              .advanceWatermarkToInfinity();
+      PCollection<Integer> output =
+          pipeline.apply(stream).apply("first", ParDo.of(fn1)).apply("second", ParDo.of(fn2));
+
+      PAssert.that(output).containsInAnyOrder(100); // result output
+      pipeline.run();
+    }
+
     private static class TwoTimerTest extends PTransform<PBegin, PDone> {
 
       private static PTransform<PBegin, PDone> of(
diff --git a/sdks/java/harness/src/main/java/org/apache/beam/fn/harness/FnApiDoFnRunner.java b/sdks/java/harness/src/main/java/org/apache/beam/fn/harness/FnApiDoFnRunner.java
index 70d0d0a..35e08a0 100644
--- a/sdks/java/harness/src/main/java/org/apache/beam/fn/harness/FnApiDoFnRunner.java
+++ b/sdks/java/harness/src/main/java/org/apache/beam/fn/harness/FnApiDoFnRunner.java
@@ -916,7 +916,6 @@ public class FnApiDoFnRunner<InputT, RestrictionT, PositionT, OutputT> {
       this.currentOutputTimestamp = outputTime;
       return this;
     }
-
     /**
      * For event time timers the target time should be prior to window GC time. So it returns
      * min(time to set, GC Time of window).