You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@beam.apache.org by tg...@apache.org on 2016/11/07 23:48:30 UTC

[1/3] incubator-beam git commit: This closes #1287

Repository: incubator-beam
Updated Branches:
  refs/heads/master 9de9ce69f -> c6d9bf297


This closes #1287


Project: http://git-wip-us.apache.org/repos/asf/incubator-beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-beam/commit/c6d9bf29
Tree: http://git-wip-us.apache.org/repos/asf/incubator-beam/tree/c6d9bf29
Diff: http://git-wip-us.apache.org/repos/asf/incubator-beam/diff/c6d9bf29

Branch: refs/heads/master
Commit: c6d9bf29700d6f13a33423183201a18525040e05
Parents: 9de9ce6 a58f1eb
Author: Thomas Groh <tg...@google.com>
Authored: Mon Nov 7 15:47:02 2016 -0800
Committer: Thomas Groh <tg...@google.com>
Committed: Mon Nov 7 15:47:02 2016 -0800

----------------------------------------------------------------------
 .../beam/runners/direct/DirectRunner.java       |   5 +
 .../direct/ImmutableListBundleFactory.java      |  21 ++-
 .../beam/runners/direct/WatermarkManager.java   | 153 +++++++++++++------
 .../direct/ImmutableListBundleFactoryTest.java  |  15 +-
 4 files changed, 141 insertions(+), 53 deletions(-)
----------------------------------------------------------------------



[2/3] incubator-beam git commit: Track Minimum Element Timestamps within Bundles

Posted by tg...@apache.org.
Track Minimum Element Timestamps within Bundles

This allows the Watermark Manager to track pending elements by bundles
of elements rather than per-element, which significantly reduces the
amount of work done per-element to track watermarks.


Project: http://git-wip-us.apache.org/repos/asf/incubator-beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-beam/commit/a58f1eba
Tree: http://git-wip-us.apache.org/repos/asf/incubator-beam/tree/a58f1eba
Diff: http://git-wip-us.apache.org/repos/asf/incubator-beam/diff/a58f1eba

Branch: refs/heads/master
Commit: a58f1eba1a3b5ae453f4d65ad409785cb717b2ae
Parents: 317b5e6
Author: Thomas Groh <tg...@google.com>
Authored: Fri Nov 4 12:54:00 2016 -0700
Committer: Thomas Groh <tg...@google.com>
Committed: Mon Nov 7 15:47:02 2016 -0800

----------------------------------------------------------------------
 .../beam/runners/direct/DirectRunner.java       |  5 ++
 .../direct/ImmutableListBundleFactory.java      | 21 ++++++++-
 .../beam/runners/direct/WatermarkManager.java   | 48 +++++++++-----------
 .../direct/ImmutableListBundleFactoryTest.java  | 15 +++++-
 4 files changed, 60 insertions(+), 29 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/a58f1eba/runners/direct-java/src/main/java/org/apache/beam/runners/direct/DirectRunner.java
----------------------------------------------------------------------
diff --git a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/DirectRunner.java b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/DirectRunner.java
index 44d1986..4d5a449 100644
--- a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/DirectRunner.java
+++ b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/DirectRunner.java
@@ -151,6 +151,11 @@ public class DirectRunner
     Iterable<WindowedValue<T>> getElements();
 
     /**
+     * Returns the minimum timestamp among all of the elements of this {@link CommittedBundle}.
+     */
+    Instant getMinTimestamp();
+
+    /**
      * Returns the processing time output watermark at the time the producing {@link PTransform}
      * committed this bundle. Downstream synchronized processing time watermarks cannot progress
      * past this point before consuming this bundle.

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/a58f1eba/runners/direct-java/src/main/java/org/apache/beam/runners/direct/ImmutableListBundleFactory.java
----------------------------------------------------------------------
diff --git a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/ImmutableListBundleFactory.java b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/ImmutableListBundleFactory.java
index abc6dd8..6b342d6 100644
--- a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/ImmutableListBundleFactory.java
+++ b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/ImmutableListBundleFactory.java
@@ -24,6 +24,7 @@ import com.google.common.collect.ImmutableList;
 import javax.annotation.Nullable;
 import org.apache.beam.runners.direct.DirectRunner.CommittedBundle;
 import org.apache.beam.runners.direct.DirectRunner.UncommittedBundle;
+import org.apache.beam.sdk.transforms.windowing.BoundedWindow;
 import org.apache.beam.sdk.util.WindowedValue;
 import org.apache.beam.sdk.values.PCollection;
 import org.joda.time.Instant;
@@ -64,6 +65,7 @@ class ImmutableListBundleFactory implements BundleFactory {
     private final StructuralKey<?> key;
     private boolean committed = false;
     private ImmutableList.Builder<WindowedValue<T>> elements;
+    private Instant minSoFar = BoundedWindow.TIMESTAMP_MAX_VALUE;
 
     /**
      * Create a new {@link UncommittedImmutableListBundle} for the specified {@link PCollection}.
@@ -93,6 +95,9 @@ class ImmutableListBundleFactory implements BundleFactory {
           element,
           pcollection);
       elements.add(element);
+      if (element.getTimestamp().isBefore(minSoFar)) {
+        minSoFar = element.getTimestamp();
+      }
       return this;
     }
 
@@ -102,7 +107,7 @@ class ImmutableListBundleFactory implements BundleFactory {
       committed = true;
       final Iterable<WindowedValue<T>> committedElements = elements.build();
       return CommittedImmutableListBundle.create(
-          pcollection, key, committedElements, synchronizedCompletionTime);
+          pcollection, key, committedElements, minSoFar, synchronizedCompletionTime);
     }
   }
 
@@ -112,9 +117,10 @@ class ImmutableListBundleFactory implements BundleFactory {
         @Nullable PCollection<T> pcollection,
         StructuralKey<?> key,
         Iterable<WindowedValue<T>> committedElements,
+        Instant minElementTimestamp,
         Instant synchronizedCompletionTime) {
       return new AutoValue_ImmutableListBundleFactory_CommittedImmutableListBundle<>(
-          pcollection, key, committedElements, synchronizedCompletionTime);
+          pcollection, key, committedElements, minElementTimestamp, synchronizedCompletionTime);
     }
 
     @Override
@@ -123,6 +129,7 @@ class ImmutableListBundleFactory implements BundleFactory {
           getPCollection(),
           getKey(),
           ImmutableList.copyOf(elements),
+          minTimestamp(elements),
           getSynchronizedProcessingOutputWatermark());
     }
 
@@ -136,4 +143,14 @@ class ImmutableListBundleFactory implements BundleFactory {
       return this == obj;
     }
   }
+
+  private static Instant minTimestamp(Iterable<? extends WindowedValue<?>> elements) {
+    Instant minTs = BoundedWindow.TIMESTAMP_MAX_VALUE;
+    for (WindowedValue<?> element : elements) {
+      if (element.getTimestamp().isBefore(minTs)) {
+        minTs = element.getTimestamp();
+      }
+    }
+    return minTs;
+  }
 }

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/a58f1eba/runners/direct-java/src/main/java/org/apache/beam/runners/direct/WatermarkManager.java
----------------------------------------------------------------------
diff --git a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/WatermarkManager.java b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/WatermarkManager.java
index 2228cd5..f235af0 100644
--- a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/WatermarkManager.java
+++ b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/WatermarkManager.java
@@ -27,8 +27,6 @@ import com.google.common.collect.ImmutableList;
 import com.google.common.collect.ImmutableSet;
 import com.google.common.collect.Iterables;
 import com.google.common.collect.Ordering;
-import com.google.common.collect.SortedMultiset;
-import com.google.common.collect.TreeMultiset;
 import java.util.ArrayList;
 import java.util.Collection;
 import java.util.Collections;
@@ -55,7 +53,6 @@ import org.apache.beam.sdk.transforms.windowing.BoundedWindow;
 import org.apache.beam.sdk.util.TimeDomain;
 import org.apache.beam.sdk.util.TimerInternals;
 import org.apache.beam.sdk.util.TimerInternals.TimerData;
-import org.apache.beam.sdk.util.WindowedValue;
 import org.apache.beam.sdk.values.PCollection;
 import org.apache.beam.sdk.values.PValue;
 import org.joda.time.Instant;
@@ -123,6 +120,10 @@ import org.joda.time.Instant;
  * </pre>
  */
 public class WatermarkManager {
+  // The number of updates to apply in #tryApplyPendingUpdates
+  private static final int MAX_INCREMENTAL_UPDATES = 10;
+
+
   /**
    * The watermark of some {@link Pipeline} element, usually a {@link PTransform} or a
    * {@link PCollection}.
@@ -203,7 +204,7 @@ public class WatermarkManager {
    */
   private static class AppliedPTransformInputWatermark implements Watermark {
     private final Collection<? extends Watermark> inputWatermarks;
-    private final SortedMultiset<WindowedValue<?>> pendingElements;
+    private final NavigableSet<CommittedBundle<?>> pendingElements;
     private final Map<StructuralKey<?>, NavigableSet<TimerData>> objectTimers;
 
     private AtomicReference<Instant> currentWatermark;
@@ -213,10 +214,10 @@ public class WatermarkManager {
       // The ordering must order elements by timestamp, and must not compare two distinct elements
       // as equal. This is built on the assumption that any element added as a pending element will
       // be consumed without modifications.
-      Ordering<WindowedValue<?>> pendingElementComparator =
-          new WindowedValueByTimestampComparator().compound(Ordering.arbitrary());
+      Ordering<CommittedBundle<?>> pendingBundleComparator =
+          new BundleByElementTimestampComparator().compound(Ordering.arbitrary());
       this.pendingElements =
-          TreeMultiset.create(pendingElementComparator);
+          new TreeSet<>(pendingBundleComparator);
       this.objectTimers = new HashMap<>();
       currentWatermark = new AtomicReference<>(BoundedWindow.TIMESTAMP_MIN_VALUE);
     }
@@ -249,25 +250,20 @@ public class WatermarkManager {
         minInputWatermark = INSTANT_ORDERING.min(minInputWatermark, inputWatermark.get());
       }
       if (!pendingElements.isEmpty()) {
-        minInputWatermark = INSTANT_ORDERING.min(
-            minInputWatermark, pendingElements.firstEntry().getElement().getTimestamp());
+        minInputWatermark =
+            INSTANT_ORDERING.min(minInputWatermark, pendingElements.first().getMinTimestamp());
       }
       Instant newWatermark = INSTANT_ORDERING.max(oldWatermark, minInputWatermark);
       currentWatermark.set(newWatermark);
       return WatermarkUpdate.fromTimestamps(oldWatermark, newWatermark);
     }
 
-    private synchronized void addPendingElements(Iterable<? extends WindowedValue<?>> newPending) {
-      for (WindowedValue<?> pendingElement : newPending) {
-        pendingElements.add(pendingElement);
-      }
+    private synchronized void addPending(CommittedBundle<?> newPending) {
+      pendingElements.add(newPending);
     }
 
-    private synchronized void removePendingElements(
-        Iterable<? extends WindowedValue<?>> finishedElements) {
-      for (WindowedValue<?> finishedElement : finishedElements) {
-        pendingElements.remove(finishedElement);
-      }
+    private synchronized void removePending(CommittedBundle<?> completed) {
+      pendingElements.remove(completed);
     }
 
     private synchronized void updateTimers(TimerUpdate update) {
@@ -856,7 +852,7 @@ public class WatermarkManager {
   private void tryApplyPendingUpdates() {
     if (refreshLock.tryLock()) {
       try {
-        applyNUpdates(10);
+        applyNUpdates(MAX_INCREMENTAL_UPDATES);
       } finally {
         refreshLock.unlock();
       }
@@ -867,7 +863,7 @@ public class WatermarkManager {
    * Applies all pending updates to this {@link WatermarkManager}, causing the pending state
    * of all {@link TransformWatermarks} to be advanced as far as possible.
    */
-  private void applyPendingUpdates() {
+  private void applyAllPendingUpdates() {
     refreshLock.lock();
     try {
       applyNUpdates(-1);
@@ -944,7 +940,7 @@ public class WatermarkManager {
   synchronized void refreshAll() {
     refreshLock.lock();
     try {
-      applyPendingUpdates();
+      applyAllPendingUpdates();
       Set<AppliedPTransform<?, ?, ?>> toRefresh = pendingRefreshes;
       while (!toRefresh.isEmpty()) {
         toRefresh = refreshAllOf(toRefresh);
@@ -1180,12 +1176,12 @@ public class WatermarkManager {
     }
 
     private void removePending(CommittedBundle<?> bundle) {
-      inputWatermark.removePendingElements(bundle.getElements());
+      inputWatermark.removePending(bundle);
       synchronizedProcessingInputWatermark.removePending(bundle);
     }
 
     private void addPending(CommittedBundle<?> bundle) {
-      inputWatermark.addPendingElements(bundle.getElements());
+      inputWatermark.addPending(bundle);
       synchronizedProcessingInputWatermark.addPending(bundle);
     }
 
@@ -1434,11 +1430,11 @@ public class WatermarkManager {
     }
   }
 
-  private static class WindowedValueByTimestampComparator extends Ordering<WindowedValue<?>> {
+  private static class BundleByElementTimestampComparator extends Ordering<CommittedBundle<?>> {
     @Override
-    public int compare(WindowedValue<?> o1, WindowedValue<?> o2) {
+    public int compare(CommittedBundle<?> o1, CommittedBundle<?> o2) {
       return ComparisonChain.start()
-          .compare(o1.getTimestamp(), o2.getTimestamp())
+          .compare(o1.getMinTimestamp(), o2.getMinTimestamp())
           .result();
     }
   }

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/a58f1eba/runners/direct-java/src/test/java/org/apache/beam/runners/direct/ImmutableListBundleFactoryTest.java
----------------------------------------------------------------------
diff --git a/runners/direct-java/src/test/java/org/apache/beam/runners/direct/ImmutableListBundleFactoryTest.java b/runners/direct-java/src/test/java/org/apache/beam/runners/direct/ImmutableListBundleFactoryTest.java
index 4a7477f..a36c408 100644
--- a/runners/direct-java/src/test/java/org/apache/beam/runners/direct/ImmutableListBundleFactoryTest.java
+++ b/runners/direct-java/src/test/java/org/apache/beam/runners/direct/ImmutableListBundleFactoryTest.java
@@ -19,6 +19,7 @@ package org.apache.beam.runners.direct;
 
 import static org.hamcrest.Matchers.containsInAnyOrder;
 import static org.hamcrest.Matchers.equalTo;
+import static org.hamcrest.Matchers.not;
 import static org.junit.Assert.assertThat;
 
 import com.google.common.collect.ImmutableList;
@@ -35,6 +36,7 @@ import org.apache.beam.sdk.coders.VoidCoder;
 import org.apache.beam.sdk.testing.TestPipeline;
 import org.apache.beam.sdk.transforms.Create;
 import org.apache.beam.sdk.transforms.WithKeys;
+import org.apache.beam.sdk.transforms.windowing.BoundedWindow;
 import org.apache.beam.sdk.transforms.windowing.IntervalWindow;
 import org.apache.beam.sdk.transforms.windowing.PaneInfo;
 import org.apache.beam.sdk.util.WindowedValue;
@@ -95,15 +97,25 @@ public class ImmutableListBundleFactoryTest {
   afterCommitGetElementsShouldHaveAddedElements(Iterable<WindowedValue<T>> elems) {
     UncommittedBundle<T> bundle = bundleFactory.createRootBundle();
     Collection<Matcher<? super WindowedValue<T>>> expectations = new ArrayList<>();
+    Instant minElementTs = BoundedWindow.TIMESTAMP_MAX_VALUE;
     for (WindowedValue<T> elem : elems) {
       bundle.add(elem);
       expectations.add(equalTo(elem));
+      if (elem.getTimestamp().isBefore(minElementTs)) {
+        minElementTs = elem.getTimestamp();
+      }
     }
     Matcher<Iterable<? extends WindowedValue<T>>> containsMatcher =
         Matchers.<WindowedValue<T>>containsInAnyOrder(expectations);
-    CommittedBundle<T> committed = bundle.commit(Instant.now());
+    Instant commitTime = Instant.now();
+    CommittedBundle<T> committed = bundle.commit(commitTime);
     assertThat(committed.getElements(), containsMatcher);
 
+    // Sanity check that the test is meaningful.
+    assertThat(minElementTs, not(equalTo(commitTime)));
+    assertThat(committed.getMinTimestamp(), equalTo(minElementTs));
+    assertThat(committed.getSynchronizedProcessingOutputWatermark(), equalTo(commitTime));
+
     return committed;
   }
 
@@ -149,6 +161,7 @@ public class ImmutableListBundleFactoryTest {
     assertThat(
         withed.getSynchronizedProcessingOutputWatermark(),
         equalTo(committed.getSynchronizedProcessingOutputWatermark()));
+    assertThat(withed.getMinTimestamp(), equalTo(new Instant(2048L)));
   }
 
   @Test


[3/3] incubator-beam git commit: Incrementally update Pending elements when work completes

Posted by tg...@apache.org.
Incrementally update Pending elements when work completes

This reduces the amount of single-threaded updates the monitor thread
performs before firing timers.


Project: http://git-wip-us.apache.org/repos/asf/incubator-beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-beam/commit/317b5e65
Tree: http://git-wip-us.apache.org/repos/asf/incubator-beam/tree/317b5e65
Diff: http://git-wip-us.apache.org/repos/asf/incubator-beam/diff/317b5e65

Branch: refs/heads/master
Commit: 317b5e6577a623fa8fddeac90e6a3c9510a250e5
Parents: 9de9ce6
Author: Thomas Groh <tg...@google.com>
Authored: Fri Nov 4 11:28:03 2016 -0700
Committer: Thomas Groh <tg...@google.com>
Committed: Mon Nov 7 15:47:02 2016 -0800

----------------------------------------------------------------------
 .../beam/runners/direct/WatermarkManager.java   | 109 ++++++++++++++-----
 1 file changed, 83 insertions(+), 26 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/317b5e65/runners/direct-java/src/main/java/org/apache/beam/runners/direct/WatermarkManager.java
----------------------------------------------------------------------
diff --git a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/WatermarkManager.java b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/WatermarkManager.java
index f01c13c..2228cd5 100644
--- a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/WatermarkManager.java
+++ b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/WatermarkManager.java
@@ -43,7 +43,10 @@ import java.util.Set;
 import java.util.TreeSet;
 import java.util.concurrent.ConcurrentLinkedQueue;
 import java.util.concurrent.atomic.AtomicReference;
+import java.util.concurrent.locks.Lock;
+import java.util.concurrent.locks.ReentrantLock;
 import javax.annotation.Nullable;
+import javax.annotation.concurrent.GuardedBy;
 import org.apache.beam.runners.direct.DirectRunner.CommittedBundle;
 import org.apache.beam.sdk.Pipeline;
 import org.apache.beam.sdk.transforms.AppliedPTransform;
@@ -682,10 +685,16 @@ public class WatermarkManager {
   private final ConcurrentLinkedQueue<PendingWatermarkUpdate> pendingUpdates;
 
   /**
+   * A lock used to control concurrency for updating pending values.
+   */
+  private final Lock refreshLock;
+
+  /**
    * A queue of pending {@link AppliedPTransform AppliedPTransforms} that have potentially
    * stale data.
    */
-  private final ConcurrentLinkedQueue<AppliedPTransform<?, ?, ?>> pendingRefreshes;
+  @GuardedBy("refreshLock")
+  private final Set<AppliedPTransform<?, ?, ?>> pendingRefreshes;
 
   /**
    * Creates a new {@link WatermarkManager}. All watermarks within the newly created
@@ -710,7 +719,9 @@ public class WatermarkManager {
     this.clock = clock;
     this.consumers = consumers;
     this.pendingUpdates = new ConcurrentLinkedQueue<>();
-    this.pendingRefreshes = new ConcurrentLinkedQueue<>();
+
+    this.refreshLock = new ReentrantLock();
+    this.pendingRefreshes = new HashSet<>();
 
     transformToWatermarks = new HashMap<>();
 
@@ -795,13 +806,18 @@ public class WatermarkManager {
 
   public void initialize(
       Map<AppliedPTransform<?, ?, ?>, ? extends Iterable<CommittedBundle<?>>> initialBundles) {
-    for (Map.Entry<AppliedPTransform<?, ?, ?>, ? extends Iterable<CommittedBundle<?>>> rootEntry :
-        initialBundles.entrySet()) {
-      TransformWatermarks rootWms = transformToWatermarks.get(rootEntry.getKey());
-      for (CommittedBundle<?> initialBundle : rootEntry.getValue()) {
-        rootWms.addPending(initialBundle);
+    refreshLock.lock();
+    try {
+      for (Map.Entry<AppliedPTransform<?, ?, ?>, ? extends Iterable<CommittedBundle<?>>> rootEntry :
+          initialBundles.entrySet()) {
+        TransformWatermarks rootWms = transformToWatermarks.get(rootEntry.getKey());
+        for (CommittedBundle<?> initialBundle : rootEntry.getValue()) {
+          rootWms.addPending(initialBundle);
+        }
+        pendingRefreshes.add(rootEntry.getKey());
       }
-      pendingRefreshes.offer(rootEntry.getKey());
+    } finally {
+      refreshLock.unlock();
     }
   }
 
@@ -834,6 +850,17 @@ public class WatermarkManager {
         timerUpdate,
         result,
         earliestHold));
+    tryApplyPendingUpdates();
+  }
+
+  private void tryApplyPendingUpdates() {
+    if (refreshLock.tryLock()) {
+      try {
+        applyNUpdates(10);
+      } finally {
+        refreshLock.unlock();
+      }
+    }
   }
 
   /**
@@ -841,14 +868,24 @@ public class WatermarkManager {
    * of all {@link TransformWatermarks} to be advanced as far as possible.
    */
   private void applyPendingUpdates() {
-    Set<AppliedPTransform<?, ?, ?>> updatedTransforms = new HashSet<>();
-    PendingWatermarkUpdate pending = pendingUpdates.poll();
-    while (pending != null) {
+    refreshLock.lock();
+    try {
+      applyNUpdates(-1);
+    } finally {
+      refreshLock.unlock();
+    }
+  }
+
+  @GuardedBy("refreshLock")
+  /**
+   * Applies up to {@code numUpdates}, or all available updates if numUpdates is non-positive.
+   */
+  private void applyNUpdates(int numUpdates) {
+    for (int i = 0; !pendingUpdates.isEmpty() && (i < numUpdates || numUpdates <= 0); i++) {
+      PendingWatermarkUpdate pending = pendingUpdates.poll();
       applyPendingUpdate(pending);
-      updatedTransforms.add(pending.getTransform());
-      pending = pendingUpdates.poll();
+      pendingRefreshes.add(pending.getTransform());
     }
-    pendingRefreshes.addAll(updatedTransforms);
   }
 
   private void applyPendingUpdate(PendingWatermarkUpdate pending) {
@@ -905,22 +942,37 @@ public class WatermarkManager {
    * watermarks to be advanced as far as possible.
    */
   synchronized void refreshAll() {
-    applyPendingUpdates();
-    while (!pendingRefreshes.isEmpty()) {
-      refreshWatermarks(pendingRefreshes.poll());
+    refreshLock.lock();
+    try {
+      applyPendingUpdates();
+      Set<AppliedPTransform<?, ?, ?>> toRefresh = pendingRefreshes;
+      while (!toRefresh.isEmpty()) {
+        toRefresh = refreshAllOf(toRefresh);
+      }
+    } finally {
+      refreshLock.unlock();
     }
   }
 
-  private void refreshWatermarks(AppliedPTransform<?, ?, ?> toRefresh) {
+  private Set<AppliedPTransform<?, ?, ?>> refreshAllOf(Set<AppliedPTransform<?, ?, ?>> toRefresh) {
+    Set<AppliedPTransform<?, ?, ?>> newRefreshes = new HashSet<>();
+    for (AppliedPTransform<?, ?, ?> transform : toRefresh) {
+      newRefreshes.addAll(refreshWatermarks(transform));
+    }
+    return newRefreshes;
+  }
+
+  private Set<AppliedPTransform<?, ?, ?>> refreshWatermarks(AppliedPTransform<?, ?, ?> toRefresh) {
     TransformWatermarks myWatermarks = transformToWatermarks.get(toRefresh);
     WatermarkUpdate updateResult = myWatermarks.refresh();
-    Set<AppliedPTransform<?, ?, ?>> additionalRefreshes = new HashSet<>();
     if (updateResult.isAdvanced()) {
+      Set<AppliedPTransform<?, ?, ?>> additionalRefreshes = new HashSet<>();
       for (PValue outputPValue : toRefresh.getOutput().expand()) {
         additionalRefreshes.addAll(consumers.get(outputPValue));
       }
+      return additionalRefreshes;
     }
-    pendingRefreshes.addAll(additionalRefreshes);
+    return Collections.emptySet();
   }
 
   /**
@@ -929,12 +981,17 @@ public class WatermarkManager {
    */
   public Collection<FiredTimers> extractFiredTimers() {
     Collection<FiredTimers> allTimers = new ArrayList<>();
-    for (Map.Entry<AppliedPTransform<?, ?, ?>, TransformWatermarks> watermarksEntry :
-        transformToWatermarks.entrySet()) {
-      Collection<FiredTimers> firedTimers = watermarksEntry.getValue().extractFiredTimers();
-      allTimers.addAll(firedTimers);
+    refreshLock.lock();
+    try {
+      for (Map.Entry<AppliedPTransform<?, ?, ?>, TransformWatermarks> watermarksEntry
+          : transformToWatermarks.entrySet()) {
+        Collection<FiredTimers> firedTimers = watermarksEntry.getValue().extractFiredTimers();
+        allTimers.addAll(firedTimers);
+      }
+      return allTimers;
+    } finally {
+      refreshLock.unlock();
     }
-    return allTimers;
   }
 
   /**
@@ -1032,7 +1089,7 @@ public class WatermarkManager {
      * Removes the hold of the provided key.
      */
     public void removeHold(Object key) {
-      KeyedHold oldHold = keyedHolds.get(key);
+      KeyedHold oldHold = keyedHolds.remove(key);
       if (oldHold != null) {
         allHolds.remove(oldHold);
       }