You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@beam.apache.org by ke...@apache.org on 2016/12/21 21:59:51 UTC
[2/7] incubator-beam git commit: Hold output watermark according to
pending timers
Hold output watermark according to pending timers
Project: http://git-wip-us.apache.org/repos/asf/incubator-beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-beam/commit/dfe2e62d
Tree: http://git-wip-us.apache.org/repos/asf/incubator-beam/tree/dfe2e62d
Diff: http://git-wip-us.apache.org/repos/asf/incubator-beam/diff/dfe2e62d
Branch: refs/heads/master
Commit: dfe2e62d103595583e3ca4594cc03885fe1bba16
Parents: 7f14c46
Author: Kenneth Knowles <kl...@google.com>
Authored: Tue Dec 20 13:37:40 2016 -0800
Committer: Kenneth Knowles <kl...@google.com>
Committed: Wed Dec 21 13:45:37 2016 -0800
----------------------------------------------------------------------
.../beam/runners/direct/WatermarkManager.java | 59 ++++++++++++++++----
1 file changed, 48 insertions(+), 11 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/dfe2e62d/runners/direct-java/src/main/java/org/apache/beam/runners/direct/WatermarkManager.java
----------------------------------------------------------------------
diff --git a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/WatermarkManager.java b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/WatermarkManager.java
index f7bafd1..248fafd 100644
--- a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/WatermarkManager.java
+++ b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/WatermarkManager.java
@@ -211,12 +211,18 @@ public class WatermarkManager {
private static class AppliedPTransformInputWatermark implements Watermark {
private final Collection<? extends Watermark> inputWatermarks;
private final SortedMultiset<CommittedBundle<?>> pendingElements;
- private final Map<StructuralKey<?>, NavigableSet<TimerData>> objectTimers;
+
+ // This tracks only the quantity of timers at each timestamp, for quickly getting the cross-key
+ // minimum
+ private final SortedMultiset<Instant> pendingTimers;
// Entries in this table represent the authoritative timestamp for which
// a per-key-and-StateNamespace timer is set.
private final Map<StructuralKey<?>, Table<StateNamespace, String, TimerData>> existingTimers;
+ // This per-key sorted set allows quick retrieval of timers that should fire for a key
+ private final Map<StructuralKey<?>, NavigableSet<TimerData>> objectTimers;
+
private AtomicReference<Instant> currentWatermark;
public AppliedPTransformInputWatermark(Collection<? extends Watermark> inputWatermarks) {
@@ -224,10 +230,13 @@ public class WatermarkManager {
// The ordering must order elements by timestamp, and must not compare two distinct elements
// as equal. This is built on the assumption that any element added as a pending element will
// be consumed without modifications.
+ //
+ // The same logic is applied for pending timers
Ordering<CommittedBundle<?>> pendingBundleComparator =
new BundleByElementTimestampComparator().compound(Ordering.arbitrary());
this.pendingElements =
TreeMultiset.create(pendingBundleComparator);
+ this.pendingTimers = TreeMultiset.create();
this.objectTimers = new HashMap<>();
this.existingTimers = new HashMap<>();
currentWatermark = new AtomicReference<>(BoundedWindow.TIMESTAMP_MIN_VALUE);
@@ -278,6 +287,14 @@ public class WatermarkManager {
pendingElements.remove(completed);
}
+ private synchronized Instant getEarliestTimerTimestamp() {
+ if (pendingTimers.isEmpty()) {
+ return BoundedWindow.TIMESTAMP_MAX_VALUE;
+ } else {
+ return pendingTimers.firstEntry().getElement();
+ }
+ }
+
private synchronized void updateTimers(TimerUpdate update) {
NavigableSet<TimerData> keyTimers = objectTimers.get(update.key);
if (keyTimers == null) {
@@ -291,27 +308,43 @@ public class WatermarkManager {
existingTimers.put(update.key, existingTimersForKey);
}
- for (TimerData timer : update.setTimers) {
+ for (TimerData timer : update.getSetTimers()) {
+ if (TimeDomain.EVENT_TIME.equals(timer.getDomain())) {
+ @Nullable
+ TimerData existingTimer =
+ existingTimersForKey.get(timer.getNamespace(), timer.getTimerId());
+
+ if (existingTimer == null) {
+ pendingTimers.add(timer.getTimestamp());
+ keyTimers.add(timer);
+ } else if (!existingTimer.equals(timer)) {
+ keyTimers.remove(existingTimer);
+ keyTimers.add(timer);
+ } // else the timer is already set identically, so noop
+
+ existingTimersForKey.put(timer.getNamespace(), timer.getTimerId(), timer);
+ }
+ }
+
+ for (TimerData timer : update.getDeletedTimers()) {
if (TimeDomain.EVENT_TIME.equals(timer.getDomain())) {
@Nullable
TimerData existingTimer =
existingTimersForKey.get(timer.getNamespace(), timer.getTimerId());
if (existingTimer != null) {
+ pendingTimers.remove(existingTimer.getTimestamp());
keyTimers.remove(existingTimer);
+ existingTimersForKey.remove(existingTimer.getNamespace(), existingTimer.getTimerId());
}
- keyTimers.add(timer);
- existingTimersForKey.put(timer.getNamespace(), timer.getTimerId(), timer);
}
}
- for (TimerData timer : update.deletedTimers) {
+ for (TimerData timer : update.getCompletedTimers()) {
if (TimeDomain.EVENT_TIME.equals(timer.getDomain())) {
- keyTimers.remove(timer);
- existingTimersForKey.remove(timer.getNamespace(), timer.getTimerId());
+ pendingTimers.remove(timer.getTimestamp());
}
}
- // We don't keep references to timers that have been fired and delivered via #getFiredTimers()
}
private synchronized Map<StructuralKey<?>, List<TimerData>> extractFiredEventTimeTimers() {
@@ -336,11 +369,12 @@ public class WatermarkManager {
* {@link #refresh()} for more information.
*/
private static class AppliedPTransformOutputWatermark implements Watermark {
- private final Watermark inputWatermark;
+ private final AppliedPTransformInputWatermark inputWatermark;
private final PerKeyHolds holds;
private AtomicReference<Instant> currentWatermark;
- public AppliedPTransformOutputWatermark(AppliedPTransformInputWatermark inputWatermark) {
+ public AppliedPTransformOutputWatermark(
+ AppliedPTransformInputWatermark inputWatermark) {
this.inputWatermark = inputWatermark;
holds = new PerKeyHolds();
currentWatermark = new AtomicReference<>(BoundedWindow.TIMESTAMP_MIN_VALUE);
@@ -377,7 +411,10 @@ public class WatermarkManager {
@Override
public synchronized WatermarkUpdate refresh() {
Instant oldWatermark = currentWatermark.get();
- Instant newWatermark = INSTANT_ORDERING.min(inputWatermark.get(), holds.getMinHold());
+ Instant newWatermark = INSTANT_ORDERING.min(
+ inputWatermark.get(),
+ inputWatermark.getEarliestTimerTimestamp(),
+ holds.getMinHold());
newWatermark = INSTANT_ORDERING.max(oldWatermark, newWatermark);
currentWatermark.set(newWatermark);
return WatermarkUpdate.fromTimestamps(oldWatermark, newWatermark);