You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@beam.apache.org by ke...@apache.org on 2016/12/21 21:59:53 UTC

[4/7] incubator-beam git commit: Allow setting timer by ID in DirectTimerInternals

Allow setting timer by ID in DirectTimerInternals


Project: http://git-wip-us.apache.org/repos/asf/incubator-beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-beam/commit/7f14c463
Tree: http://git-wip-us.apache.org/repos/asf/incubator-beam/tree/7f14c463
Diff: http://git-wip-us.apache.org/repos/asf/incubator-beam/diff/7f14c463

Branch: refs/heads/master
Commit: 7f14c463acd2ae5b86ac81a9528ac4aa7dff765f
Parents: 4d71924
Author: Kenneth Knowles <kl...@google.com>
Authored: Wed Dec 7 20:18:44 2016 -0800
Committer: Kenneth Knowles <kl...@google.com>
Committed: Wed Dec 21 13:45:37 2016 -0800

----------------------------------------------------------------------
 .../runners/direct/DirectTimerInternals.java    |  2 +-
 .../beam/runners/direct/WatermarkManager.java   | 25 ++++++++++++++++++++
 2 files changed, 26 insertions(+), 1 deletion(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/7f14c463/runners/direct-java/src/main/java/org/apache/beam/runners/direct/DirectTimerInternals.java
----------------------------------------------------------------------
diff --git a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/DirectTimerInternals.java b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/DirectTimerInternals.java
index 5ca276d..80e0721 100644
--- a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/DirectTimerInternals.java
+++ b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/DirectTimerInternals.java
@@ -49,7 +49,7 @@ class DirectTimerInternals implements TimerInternals {
   @Override
   public void setTimer(StateNamespace namespace, String timerId, Instant target,
       TimeDomain timeDomain) {
-    throw new UnsupportedOperationException("Setting timer by ID not yet supported.");
+    timerUpdateBuilder.setTimer(TimerData.of(timerId, namespace, target, timeDomain));
   }
 
   @Deprecated

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/7f14c463/runners/direct-java/src/main/java/org/apache/beam/runners/direct/WatermarkManager.java
----------------------------------------------------------------------
diff --git a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/WatermarkManager.java b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/WatermarkManager.java
index 7bed751..f7bafd1 100644
--- a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/WatermarkManager.java
+++ b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/WatermarkManager.java
@@ -23,11 +23,13 @@ import com.google.auto.value.AutoValue;
 import com.google.common.annotations.VisibleForTesting;
 import com.google.common.base.MoreObjects;
 import com.google.common.collect.ComparisonChain;
+import com.google.common.collect.HashBasedTable;
 import com.google.common.collect.ImmutableList;
 import com.google.common.collect.ImmutableSet;
 import com.google.common.collect.Iterables;
 import com.google.common.collect.Ordering;
 import com.google.common.collect.SortedMultiset;
+import com.google.common.collect.Table;
 import com.google.common.collect.TreeMultiset;
 import java.io.Serializable;
 import java.util.ArrayList;
@@ -56,6 +58,7 @@ import org.apache.beam.sdk.transforms.windowing.BoundedWindow;
 import org.apache.beam.sdk.util.TimeDomain;
 import org.apache.beam.sdk.util.TimerInternals;
 import org.apache.beam.sdk.util.TimerInternals.TimerData;
+import org.apache.beam.sdk.util.state.StateNamespace;
 import org.apache.beam.sdk.values.PCollection;
 import org.apache.beam.sdk.values.TaggedPValue;
 import org.joda.time.Instant;
@@ -210,6 +213,10 @@ public class WatermarkManager {
     private final SortedMultiset<CommittedBundle<?>> pendingElements;
     private final Map<StructuralKey<?>, NavigableSet<TimerData>> objectTimers;
 
+    // Entries in this table represent the authoritative timestamp for which
+    // a per-key-and-StateNamespace timer is set.
+    private final Map<StructuralKey<?>, Table<StateNamespace, String, TimerData>> existingTimers;
+
     private AtomicReference<Instant> currentWatermark;
 
     public AppliedPTransformInputWatermark(Collection<? extends Watermark> inputWatermarks) {
@@ -222,6 +229,7 @@ public class WatermarkManager {
       this.pendingElements =
           TreeMultiset.create(pendingBundleComparator);
       this.objectTimers = new HashMap<>();
+      this.existingTimers = new HashMap<>();
       currentWatermark = new AtomicReference<>(BoundedWindow.TIMESTAMP_MIN_VALUE);
     }
 
@@ -276,14 +284,31 @@ public class WatermarkManager {
         keyTimers = new TreeSet<>();
         objectTimers.put(update.key, keyTimers);
       }
+      Table<StateNamespace, String, TimerData> existingTimersForKey =
+          existingTimers.get(update.key);
+      if (existingTimersForKey == null) {
+        existingTimersForKey = HashBasedTable.create();
+        existingTimers.put(update.key, existingTimersForKey);
+      }
+
       for (TimerData timer : update.setTimers) {
         if (TimeDomain.EVENT_TIME.equals(timer.getDomain())) {
+          @Nullable
+          TimerData existingTimer =
+              existingTimersForKey.get(timer.getNamespace(), timer.getTimerId());
+
+          if (existingTimer != null) {
+            keyTimers.remove(existingTimer);
+          }
           keyTimers.add(timer);
+          existingTimersForKey.put(timer.getNamespace(), timer.getTimerId(), timer);
         }
       }
+
       for (TimerData timer : update.deletedTimers) {
         if (TimeDomain.EVENT_TIME.equals(timer.getDomain())) {
           keyTimers.remove(timer);
+          existingTimersForKey.remove(timer.getNamespace(), timer.getTimerId());
         }
       }
       // We don't keep references to timers that have been fired and delivered via #getFiredTimers()