You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@beam.apache.org by ke...@apache.org on 2016/12/21 21:59:53 UTC
[4/7] incubator-beam git commit: Allow setting timer by ID in
DirectTimerInternals
Allow setting timer by ID in DirectTimerInternals
Project: http://git-wip-us.apache.org/repos/asf/incubator-beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-beam/commit/7f14c463
Tree: http://git-wip-us.apache.org/repos/asf/incubator-beam/tree/7f14c463
Diff: http://git-wip-us.apache.org/repos/asf/incubator-beam/diff/7f14c463
Branch: refs/heads/master
Commit: 7f14c463acd2ae5b86ac81a9528ac4aa7dff765f
Parents: 4d71924
Author: Kenneth Knowles <kl...@google.com>
Authored: Wed Dec 7 20:18:44 2016 -0800
Committer: Kenneth Knowles <kl...@google.com>
Committed: Wed Dec 21 13:45:37 2016 -0800
----------------------------------------------------------------------
.../runners/direct/DirectTimerInternals.java | 2 +-
.../beam/runners/direct/WatermarkManager.java | 25 ++++++++++++++++++++
2 files changed, 26 insertions(+), 1 deletion(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/7f14c463/runners/direct-java/src/main/java/org/apache/beam/runners/direct/DirectTimerInternals.java
----------------------------------------------------------------------
diff --git a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/DirectTimerInternals.java b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/DirectTimerInternals.java
index 5ca276d..80e0721 100644
--- a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/DirectTimerInternals.java
+++ b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/DirectTimerInternals.java
@@ -49,7 +49,7 @@ class DirectTimerInternals implements TimerInternals {
@Override
public void setTimer(StateNamespace namespace, String timerId, Instant target,
TimeDomain timeDomain) {
- throw new UnsupportedOperationException("Setting timer by ID not yet supported.");
+ timerUpdateBuilder.setTimer(TimerData.of(timerId, namespace, target, timeDomain));
}
@Deprecated
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/7f14c463/runners/direct-java/src/main/java/org/apache/beam/runners/direct/WatermarkManager.java
----------------------------------------------------------------------
diff --git a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/WatermarkManager.java b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/WatermarkManager.java
index 7bed751..f7bafd1 100644
--- a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/WatermarkManager.java
+++ b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/WatermarkManager.java
@@ -23,11 +23,13 @@ import com.google.auto.value.AutoValue;
import com.google.common.annotations.VisibleForTesting;
import com.google.common.base.MoreObjects;
import com.google.common.collect.ComparisonChain;
+import com.google.common.collect.HashBasedTable;
import com.google.common.collect.ImmutableList;
import com.google.common.collect.ImmutableSet;
import com.google.common.collect.Iterables;
import com.google.common.collect.Ordering;
import com.google.common.collect.SortedMultiset;
+import com.google.common.collect.Table;
import com.google.common.collect.TreeMultiset;
import java.io.Serializable;
import java.util.ArrayList;
@@ -56,6 +58,7 @@ import org.apache.beam.sdk.transforms.windowing.BoundedWindow;
import org.apache.beam.sdk.util.TimeDomain;
import org.apache.beam.sdk.util.TimerInternals;
import org.apache.beam.sdk.util.TimerInternals.TimerData;
+import org.apache.beam.sdk.util.state.StateNamespace;
import org.apache.beam.sdk.values.PCollection;
import org.apache.beam.sdk.values.TaggedPValue;
import org.joda.time.Instant;
@@ -210,6 +213,10 @@ public class WatermarkManager {
private final SortedMultiset<CommittedBundle<?>> pendingElements;
private final Map<StructuralKey<?>, NavigableSet<TimerData>> objectTimers;
+ // Entries in this table represent the authoritative timestamp for which
+ // a per-key-and-StateNamespace timer is set.
+ private final Map<StructuralKey<?>, Table<StateNamespace, String, TimerData>> existingTimers;
+
private AtomicReference<Instant> currentWatermark;
public AppliedPTransformInputWatermark(Collection<? extends Watermark> inputWatermarks) {
@@ -222,6 +229,7 @@ public class WatermarkManager {
this.pendingElements =
TreeMultiset.create(pendingBundleComparator);
this.objectTimers = new HashMap<>();
+ this.existingTimers = new HashMap<>();
currentWatermark = new AtomicReference<>(BoundedWindow.TIMESTAMP_MIN_VALUE);
}
@@ -276,14 +284,31 @@ public class WatermarkManager {
keyTimers = new TreeSet<>();
objectTimers.put(update.key, keyTimers);
}
+ Table<StateNamespace, String, TimerData> existingTimersForKey =
+ existingTimers.get(update.key);
+ if (existingTimersForKey == null) {
+ existingTimersForKey = HashBasedTable.create();
+ existingTimers.put(update.key, existingTimersForKey);
+ }
+
for (TimerData timer : update.setTimers) {
if (TimeDomain.EVENT_TIME.equals(timer.getDomain())) {
+ @Nullable
+ TimerData existingTimer =
+ existingTimersForKey.get(timer.getNamespace(), timer.getTimerId());
+
+ if (existingTimer != null) {
+ keyTimers.remove(existingTimer);
+ }
keyTimers.add(timer);
+ existingTimersForKey.put(timer.getNamespace(), timer.getTimerId(), timer);
}
}
+
for (TimerData timer : update.deletedTimers) {
if (TimeDomain.EVENT_TIME.equals(timer.getDomain())) {
keyTimers.remove(timer);
+ existingTimersForKey.remove(timer.getNamespace(), timer.getTimerId());
}
}
// We don't keep references to timers that have been fired and delivered via #getFiredTimers()