You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@beam.apache.org by ke...@apache.org on 2016/12/21 21:59:51 UTC

[2/7] incubator-beam git commit: Hold output watermark according to pending timers

Hold output watermark according to pending timers


Project: http://git-wip-us.apache.org/repos/asf/incubator-beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-beam/commit/dfe2e62d
Tree: http://git-wip-us.apache.org/repos/asf/incubator-beam/tree/dfe2e62d
Diff: http://git-wip-us.apache.org/repos/asf/incubator-beam/diff/dfe2e62d

Branch: refs/heads/master
Commit: dfe2e62d103595583e3ca4594cc03885fe1bba16
Parents: 7f14c46
Author: Kenneth Knowles <kl...@google.com>
Authored: Tue Dec 20 13:37:40 2016 -0800
Committer: Kenneth Knowles <kl...@google.com>
Committed: Wed Dec 21 13:45:37 2016 -0800

----------------------------------------------------------------------
 .../beam/runners/direct/WatermarkManager.java   | 59 ++++++++++++++++----
 1 file changed, 48 insertions(+), 11 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/dfe2e62d/runners/direct-java/src/main/java/org/apache/beam/runners/direct/WatermarkManager.java
----------------------------------------------------------------------
diff --git a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/WatermarkManager.java b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/WatermarkManager.java
index f7bafd1..248fafd 100644
--- a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/WatermarkManager.java
+++ b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/WatermarkManager.java
@@ -211,12 +211,18 @@ public class WatermarkManager {
   private static class AppliedPTransformInputWatermark implements Watermark {
     private final Collection<? extends Watermark> inputWatermarks;
     private final SortedMultiset<CommittedBundle<?>> pendingElements;
-    private final Map<StructuralKey<?>, NavigableSet<TimerData>> objectTimers;
+
+    // This tracks only the quantity of timers at each timestamp, for quickly getting the cross-key
+    // minimum
+    private final SortedMultiset<Instant> pendingTimers;
 
     // Entries in this table represent the authoritative timestamp for which
     // a per-key-and-StateNamespace timer is set.
     private final Map<StructuralKey<?>, Table<StateNamespace, String, TimerData>> existingTimers;
 
+    // This per-key sorted set allows quick retrieval of timers that should fire for a key
+    private final Map<StructuralKey<?>, NavigableSet<TimerData>> objectTimers;
+
     private AtomicReference<Instant> currentWatermark;
 
     public AppliedPTransformInputWatermark(Collection<? extends Watermark> inputWatermarks) {
@@ -224,10 +230,13 @@ public class WatermarkManager {
       // The ordering must order elements by timestamp, and must not compare two distinct elements
       // as equal. This is built on the assumption that any element added as a pending element will
       // be consumed without modifications.
+      //
+      // The same logic is applied for pending timers
       Ordering<CommittedBundle<?>> pendingBundleComparator =
           new BundleByElementTimestampComparator().compound(Ordering.arbitrary());
       this.pendingElements =
           TreeMultiset.create(pendingBundleComparator);
+      this.pendingTimers = TreeMultiset.create();
       this.objectTimers = new HashMap<>();
       this.existingTimers = new HashMap<>();
       currentWatermark = new AtomicReference<>(BoundedWindow.TIMESTAMP_MIN_VALUE);
@@ -278,6 +287,14 @@ public class WatermarkManager {
       pendingElements.remove(completed);
     }
 
+    private synchronized Instant getEarliestTimerTimestamp() {
+      if (pendingTimers.isEmpty()) {
+        return BoundedWindow.TIMESTAMP_MAX_VALUE;
+      } else {
+        return pendingTimers.firstEntry().getElement();
+      }
+    }
+
     private synchronized void updateTimers(TimerUpdate update) {
       NavigableSet<TimerData> keyTimers = objectTimers.get(update.key);
       if (keyTimers == null) {
@@ -291,27 +308,43 @@ public class WatermarkManager {
         existingTimers.put(update.key, existingTimersForKey);
       }
 
-      for (TimerData timer : update.setTimers) {
+      for (TimerData timer : update.getSetTimers()) {
+        if (TimeDomain.EVENT_TIME.equals(timer.getDomain())) {
+          @Nullable
+          TimerData existingTimer =
+              existingTimersForKey.get(timer.getNamespace(), timer.getTimerId());
+
+          if (existingTimer == null) {
+            pendingTimers.add(timer.getTimestamp());
+            keyTimers.add(timer);
+          } else if (!existingTimer.equals(timer)) {
+            keyTimers.remove(existingTimer);
+            keyTimers.add(timer);
+          } // else the timer is already set identically, so noop
+
+          existingTimersForKey.put(timer.getNamespace(), timer.getTimerId(), timer);
+        }
+      }
+
+      for (TimerData timer : update.getDeletedTimers()) {
         if (TimeDomain.EVENT_TIME.equals(timer.getDomain())) {
           @Nullable
           TimerData existingTimer =
               existingTimersForKey.get(timer.getNamespace(), timer.getTimerId());
 
           if (existingTimer != null) {
+            pendingTimers.remove(existingTimer.getTimestamp());
             keyTimers.remove(existingTimer);
+            existingTimersForKey.remove(existingTimer.getNamespace(), existingTimer.getTimerId());
           }
-          keyTimers.add(timer);
-          existingTimersForKey.put(timer.getNamespace(), timer.getTimerId(), timer);
         }
       }
 
-      for (TimerData timer : update.deletedTimers) {
+      for (TimerData timer : update.getCompletedTimers()) {
         if (TimeDomain.EVENT_TIME.equals(timer.getDomain())) {
-          keyTimers.remove(timer);
-          existingTimersForKey.remove(timer.getNamespace(), timer.getTimerId());
+          pendingTimers.remove(timer.getTimestamp());
         }
       }
-      // We don't keep references to timers that have been fired and delivered via #getFiredTimers()
     }
 
     private synchronized Map<StructuralKey<?>, List<TimerData>> extractFiredEventTimeTimers() {
@@ -336,11 +369,12 @@ public class WatermarkManager {
    * {@link #refresh()} for more information.
    */
   private static class AppliedPTransformOutputWatermark implements Watermark {
-    private final Watermark inputWatermark;
+    private final AppliedPTransformInputWatermark inputWatermark;
     private final PerKeyHolds holds;
     private AtomicReference<Instant> currentWatermark;
 
-    public AppliedPTransformOutputWatermark(AppliedPTransformInputWatermark inputWatermark) {
+    public AppliedPTransformOutputWatermark(
+        AppliedPTransformInputWatermark inputWatermark) {
       this.inputWatermark = inputWatermark;
       holds = new PerKeyHolds();
       currentWatermark = new AtomicReference<>(BoundedWindow.TIMESTAMP_MIN_VALUE);
@@ -377,7 +411,10 @@ public class WatermarkManager {
     @Override
     public synchronized WatermarkUpdate refresh() {
       Instant oldWatermark = currentWatermark.get();
-      Instant newWatermark = INSTANT_ORDERING.min(inputWatermark.get(), holds.getMinHold());
+      Instant newWatermark = INSTANT_ORDERING.min(
+          inputWatermark.get(),
+          inputWatermark.getEarliestTimerTimestamp(),
+          holds.getMinHold());
       newWatermark = INSTANT_ORDERING.max(oldWatermark, newWatermark);
       currentWatermark.set(newWatermark);
       return WatermarkUpdate.fromTimestamps(oldWatermark, newWatermark);