You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@beam.apache.org by tg...@apache.org on 2016/11/07 23:48:30 UTC
[1/3] incubator-beam git commit: This closes #1287
Repository: incubator-beam
Updated Branches:
refs/heads/master 9de9ce69f -> c6d9bf297
This closes #1287
Project: http://git-wip-us.apache.org/repos/asf/incubator-beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-beam/commit/c6d9bf29
Tree: http://git-wip-us.apache.org/repos/asf/incubator-beam/tree/c6d9bf29
Diff: http://git-wip-us.apache.org/repos/asf/incubator-beam/diff/c6d9bf29
Branch: refs/heads/master
Commit: c6d9bf29700d6f13a33423183201a18525040e05
Parents: 9de9ce6 a58f1eb
Author: Thomas Groh <tg...@google.com>
Authored: Mon Nov 7 15:47:02 2016 -0800
Committer: Thomas Groh <tg...@google.com>
Committed: Mon Nov 7 15:47:02 2016 -0800
----------------------------------------------------------------------
.../beam/runners/direct/DirectRunner.java | 5 +
.../direct/ImmutableListBundleFactory.java | 21 ++-
.../beam/runners/direct/WatermarkManager.java | 153 +++++++++++++------
.../direct/ImmutableListBundleFactoryTest.java | 15 +-
4 files changed, 141 insertions(+), 53 deletions(-)
----------------------------------------------------------------------
[2/3] incubator-beam git commit: Track Minimum Element Timestamps
within Bundles
Posted by tg...@apache.org.
Track Minimum Element Timestamps within Bundles
This allows the Watermark Manager to track pending elements by bundles
of elements rather than per-element, which significantly reduces the
amount of work done per-element to track watermarks.
Project: http://git-wip-us.apache.org/repos/asf/incubator-beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-beam/commit/a58f1eba
Tree: http://git-wip-us.apache.org/repos/asf/incubator-beam/tree/a58f1eba
Diff: http://git-wip-us.apache.org/repos/asf/incubator-beam/diff/a58f1eba
Branch: refs/heads/master
Commit: a58f1eba1a3b5ae453f4d65ad409785cb717b2ae
Parents: 317b5e6
Author: Thomas Groh <tg...@google.com>
Authored: Fri Nov 4 12:54:00 2016 -0700
Committer: Thomas Groh <tg...@google.com>
Committed: Mon Nov 7 15:47:02 2016 -0800
----------------------------------------------------------------------
.../beam/runners/direct/DirectRunner.java | 5 ++
.../direct/ImmutableListBundleFactory.java | 21 ++++++++-
.../beam/runners/direct/WatermarkManager.java | 48 +++++++++-----------
.../direct/ImmutableListBundleFactoryTest.java | 15 +++++-
4 files changed, 60 insertions(+), 29 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/a58f1eba/runners/direct-java/src/main/java/org/apache/beam/runners/direct/DirectRunner.java
----------------------------------------------------------------------
diff --git a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/DirectRunner.java b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/DirectRunner.java
index 44d1986..4d5a449 100644
--- a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/DirectRunner.java
+++ b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/DirectRunner.java
@@ -151,6 +151,11 @@ public class DirectRunner
Iterable<WindowedValue<T>> getElements();
/**
+ * Returns the minimum timestamp among all of the elements of this {@link CommittedBundle}.
+ */
+ Instant getMinTimestamp();
+
+ /**
* Returns the processing time output watermark at the time the producing {@link PTransform}
* committed this bundle. Downstream synchronized processing time watermarks cannot progress
* past this point before consuming this bundle.
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/a58f1eba/runners/direct-java/src/main/java/org/apache/beam/runners/direct/ImmutableListBundleFactory.java
----------------------------------------------------------------------
diff --git a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/ImmutableListBundleFactory.java b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/ImmutableListBundleFactory.java
index abc6dd8..6b342d6 100644
--- a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/ImmutableListBundleFactory.java
+++ b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/ImmutableListBundleFactory.java
@@ -24,6 +24,7 @@ import com.google.common.collect.ImmutableList;
import javax.annotation.Nullable;
import org.apache.beam.runners.direct.DirectRunner.CommittedBundle;
import org.apache.beam.runners.direct.DirectRunner.UncommittedBundle;
+import org.apache.beam.sdk.transforms.windowing.BoundedWindow;
import org.apache.beam.sdk.util.WindowedValue;
import org.apache.beam.sdk.values.PCollection;
import org.joda.time.Instant;
@@ -64,6 +65,7 @@ class ImmutableListBundleFactory implements BundleFactory {
private final StructuralKey<?> key;
private boolean committed = false;
private ImmutableList.Builder<WindowedValue<T>> elements;
+ private Instant minSoFar = BoundedWindow.TIMESTAMP_MAX_VALUE;
/**
* Create a new {@link UncommittedImmutableListBundle} for the specified {@link PCollection}.
@@ -93,6 +95,9 @@ class ImmutableListBundleFactory implements BundleFactory {
element,
pcollection);
elements.add(element);
+ if (element.getTimestamp().isBefore(minSoFar)) {
+ minSoFar = element.getTimestamp();
+ }
return this;
}
@@ -102,7 +107,7 @@ class ImmutableListBundleFactory implements BundleFactory {
committed = true;
final Iterable<WindowedValue<T>> committedElements = elements.build();
return CommittedImmutableListBundle.create(
- pcollection, key, committedElements, synchronizedCompletionTime);
+ pcollection, key, committedElements, minSoFar, synchronizedCompletionTime);
}
}
@@ -112,9 +117,10 @@ class ImmutableListBundleFactory implements BundleFactory {
@Nullable PCollection<T> pcollection,
StructuralKey<?> key,
Iterable<WindowedValue<T>> committedElements,
+ Instant minElementTimestamp,
Instant synchronizedCompletionTime) {
return new AutoValue_ImmutableListBundleFactory_CommittedImmutableListBundle<>(
- pcollection, key, committedElements, synchronizedCompletionTime);
+ pcollection, key, committedElements, minElementTimestamp, synchronizedCompletionTime);
}
@Override
@@ -123,6 +129,7 @@ class ImmutableListBundleFactory implements BundleFactory {
getPCollection(),
getKey(),
ImmutableList.copyOf(elements),
+ minTimestamp(elements),
getSynchronizedProcessingOutputWatermark());
}
@@ -136,4 +143,14 @@ class ImmutableListBundleFactory implements BundleFactory {
return this == obj;
}
}
+
+ private static Instant minTimestamp(Iterable<? extends WindowedValue<?>> elements) {
+ Instant minTs = BoundedWindow.TIMESTAMP_MAX_VALUE;
+ for (WindowedValue<?> element : elements) {
+ if (element.getTimestamp().isBefore(minTs)) {
+ minTs = element.getTimestamp();
+ }
+ }
+ return minTs;
+ }
}
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/a58f1eba/runners/direct-java/src/main/java/org/apache/beam/runners/direct/WatermarkManager.java
----------------------------------------------------------------------
diff --git a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/WatermarkManager.java b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/WatermarkManager.java
index 2228cd5..f235af0 100644
--- a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/WatermarkManager.java
+++ b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/WatermarkManager.java
@@ -27,8 +27,6 @@ import com.google.common.collect.ImmutableList;
import com.google.common.collect.ImmutableSet;
import com.google.common.collect.Iterables;
import com.google.common.collect.Ordering;
-import com.google.common.collect.SortedMultiset;
-import com.google.common.collect.TreeMultiset;
import java.util.ArrayList;
import java.util.Collection;
import java.util.Collections;
@@ -55,7 +53,6 @@ import org.apache.beam.sdk.transforms.windowing.BoundedWindow;
import org.apache.beam.sdk.util.TimeDomain;
import org.apache.beam.sdk.util.TimerInternals;
import org.apache.beam.sdk.util.TimerInternals.TimerData;
-import org.apache.beam.sdk.util.WindowedValue;
import org.apache.beam.sdk.values.PCollection;
import org.apache.beam.sdk.values.PValue;
import org.joda.time.Instant;
@@ -123,6 +120,10 @@ import org.joda.time.Instant;
* </pre>
*/
public class WatermarkManager {
+ // The number of updates to apply in #tryApplyPendingUpdates
+ private static final int MAX_INCREMENTAL_UPDATES = 10;
+
+
/**
* The watermark of some {@link Pipeline} element, usually a {@link PTransform} or a
* {@link PCollection}.
@@ -203,7 +204,7 @@ public class WatermarkManager {
*/
private static class AppliedPTransformInputWatermark implements Watermark {
private final Collection<? extends Watermark> inputWatermarks;
- private final SortedMultiset<WindowedValue<?>> pendingElements;
+ private final NavigableSet<CommittedBundle<?>> pendingElements;
private final Map<StructuralKey<?>, NavigableSet<TimerData>> objectTimers;
private AtomicReference<Instant> currentWatermark;
@@ -213,10 +214,10 @@ public class WatermarkManager {
// The ordering must order elements by timestamp, and must not compare two distinct elements
// as equal. This is built on the assumption that any element added as a pending element will
// be consumed without modifications.
- Ordering<WindowedValue<?>> pendingElementComparator =
- new WindowedValueByTimestampComparator().compound(Ordering.arbitrary());
+ Ordering<CommittedBundle<?>> pendingBundleComparator =
+ new BundleByElementTimestampComparator().compound(Ordering.arbitrary());
this.pendingElements =
- TreeMultiset.create(pendingElementComparator);
+ new TreeSet<>(pendingBundleComparator);
this.objectTimers = new HashMap<>();
currentWatermark = new AtomicReference<>(BoundedWindow.TIMESTAMP_MIN_VALUE);
}
@@ -249,25 +250,20 @@ public class WatermarkManager {
minInputWatermark = INSTANT_ORDERING.min(minInputWatermark, inputWatermark.get());
}
if (!pendingElements.isEmpty()) {
- minInputWatermark = INSTANT_ORDERING.min(
- minInputWatermark, pendingElements.firstEntry().getElement().getTimestamp());
+ minInputWatermark =
+ INSTANT_ORDERING.min(minInputWatermark, pendingElements.first().getMinTimestamp());
}
Instant newWatermark = INSTANT_ORDERING.max(oldWatermark, minInputWatermark);
currentWatermark.set(newWatermark);
return WatermarkUpdate.fromTimestamps(oldWatermark, newWatermark);
}
- private synchronized void addPendingElements(Iterable<? extends WindowedValue<?>> newPending) {
- for (WindowedValue<?> pendingElement : newPending) {
- pendingElements.add(pendingElement);
- }
+ private synchronized void addPending(CommittedBundle<?> newPending) {
+ pendingElements.add(newPending);
}
- private synchronized void removePendingElements(
- Iterable<? extends WindowedValue<?>> finishedElements) {
- for (WindowedValue<?> finishedElement : finishedElements) {
- pendingElements.remove(finishedElement);
- }
+ private synchronized void removePending(CommittedBundle<?> completed) {
+ pendingElements.remove(completed);
}
private synchronized void updateTimers(TimerUpdate update) {
@@ -856,7 +852,7 @@ public class WatermarkManager {
private void tryApplyPendingUpdates() {
if (refreshLock.tryLock()) {
try {
- applyNUpdates(10);
+ applyNUpdates(MAX_INCREMENTAL_UPDATES);
} finally {
refreshLock.unlock();
}
@@ -867,7 +863,7 @@ public class WatermarkManager {
* Applies all pending updates to this {@link WatermarkManager}, causing the pending state
* of all {@link TransformWatermarks} to be advanced as far as possible.
*/
- private void applyPendingUpdates() {
+ private void applyAllPendingUpdates() {
refreshLock.lock();
try {
applyNUpdates(-1);
@@ -944,7 +940,7 @@ public class WatermarkManager {
synchronized void refreshAll() {
refreshLock.lock();
try {
- applyPendingUpdates();
+ applyAllPendingUpdates();
Set<AppliedPTransform<?, ?, ?>> toRefresh = pendingRefreshes;
while (!toRefresh.isEmpty()) {
toRefresh = refreshAllOf(toRefresh);
@@ -1180,12 +1176,12 @@ public class WatermarkManager {
}
private void removePending(CommittedBundle<?> bundle) {
- inputWatermark.removePendingElements(bundle.getElements());
+ inputWatermark.removePending(bundle);
synchronizedProcessingInputWatermark.removePending(bundle);
}
private void addPending(CommittedBundle<?> bundle) {
- inputWatermark.addPendingElements(bundle.getElements());
+ inputWatermark.addPending(bundle);
synchronizedProcessingInputWatermark.addPending(bundle);
}
@@ -1434,11 +1430,11 @@ public class WatermarkManager {
}
}
- private static class WindowedValueByTimestampComparator extends Ordering<WindowedValue<?>> {
+ private static class BundleByElementTimestampComparator extends Ordering<CommittedBundle<?>> {
@Override
- public int compare(WindowedValue<?> o1, WindowedValue<?> o2) {
+ public int compare(CommittedBundle<?> o1, CommittedBundle<?> o2) {
return ComparisonChain.start()
- .compare(o1.getTimestamp(), o2.getTimestamp())
+ .compare(o1.getMinTimestamp(), o2.getMinTimestamp())
.result();
}
}
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/a58f1eba/runners/direct-java/src/test/java/org/apache/beam/runners/direct/ImmutableListBundleFactoryTest.java
----------------------------------------------------------------------
diff --git a/runners/direct-java/src/test/java/org/apache/beam/runners/direct/ImmutableListBundleFactoryTest.java b/runners/direct-java/src/test/java/org/apache/beam/runners/direct/ImmutableListBundleFactoryTest.java
index 4a7477f..a36c408 100644
--- a/runners/direct-java/src/test/java/org/apache/beam/runners/direct/ImmutableListBundleFactoryTest.java
+++ b/runners/direct-java/src/test/java/org/apache/beam/runners/direct/ImmutableListBundleFactoryTest.java
@@ -19,6 +19,7 @@ package org.apache.beam.runners.direct;
import static org.hamcrest.Matchers.containsInAnyOrder;
import static org.hamcrest.Matchers.equalTo;
+import static org.hamcrest.Matchers.not;
import static org.junit.Assert.assertThat;
import com.google.common.collect.ImmutableList;
@@ -35,6 +36,7 @@ import org.apache.beam.sdk.coders.VoidCoder;
import org.apache.beam.sdk.testing.TestPipeline;
import org.apache.beam.sdk.transforms.Create;
import org.apache.beam.sdk.transforms.WithKeys;
+import org.apache.beam.sdk.transforms.windowing.BoundedWindow;
import org.apache.beam.sdk.transforms.windowing.IntervalWindow;
import org.apache.beam.sdk.transforms.windowing.PaneInfo;
import org.apache.beam.sdk.util.WindowedValue;
@@ -95,15 +97,25 @@ public class ImmutableListBundleFactoryTest {
afterCommitGetElementsShouldHaveAddedElements(Iterable<WindowedValue<T>> elems) {
UncommittedBundle<T> bundle = bundleFactory.createRootBundle();
Collection<Matcher<? super WindowedValue<T>>> expectations = new ArrayList<>();
+ Instant minElementTs = BoundedWindow.TIMESTAMP_MAX_VALUE;
for (WindowedValue<T> elem : elems) {
bundle.add(elem);
expectations.add(equalTo(elem));
+ if (elem.getTimestamp().isBefore(minElementTs)) {
+ minElementTs = elem.getTimestamp();
+ }
}
Matcher<Iterable<? extends WindowedValue<T>>> containsMatcher =
Matchers.<WindowedValue<T>>containsInAnyOrder(expectations);
- CommittedBundle<T> committed = bundle.commit(Instant.now());
+ Instant commitTime = Instant.now();
+ CommittedBundle<T> committed = bundle.commit(commitTime);
assertThat(committed.getElements(), containsMatcher);
+ // Sanity check that the test is meaningful.
+ assertThat(minElementTs, not(equalTo(commitTime)));
+ assertThat(committed.getMinTimestamp(), equalTo(minElementTs));
+ assertThat(committed.getSynchronizedProcessingOutputWatermark(), equalTo(commitTime));
+
return committed;
}
@@ -149,6 +161,7 @@ public class ImmutableListBundleFactoryTest {
assertThat(
withed.getSynchronizedProcessingOutputWatermark(),
equalTo(committed.getSynchronizedProcessingOutputWatermark()));
+ assertThat(withed.getMinTimestamp(), equalTo(new Instant(2048L)));
}
@Test
[3/3] incubator-beam git commit: Incrementally update Pending
elements when work completes
Posted by tg...@apache.org.
Incrementally update Pending elements when work completes
This reduces the amount of single-threaded updates the monitor thread
performs before firing timers.
Project: http://git-wip-us.apache.org/repos/asf/incubator-beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-beam/commit/317b5e65
Tree: http://git-wip-us.apache.org/repos/asf/incubator-beam/tree/317b5e65
Diff: http://git-wip-us.apache.org/repos/asf/incubator-beam/diff/317b5e65
Branch: refs/heads/master
Commit: 317b5e6577a623fa8fddeac90e6a3c9510a250e5
Parents: 9de9ce6
Author: Thomas Groh <tg...@google.com>
Authored: Fri Nov 4 11:28:03 2016 -0700
Committer: Thomas Groh <tg...@google.com>
Committed: Mon Nov 7 15:47:02 2016 -0800
----------------------------------------------------------------------
.../beam/runners/direct/WatermarkManager.java | 109 ++++++++++++++-----
1 file changed, 83 insertions(+), 26 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/317b5e65/runners/direct-java/src/main/java/org/apache/beam/runners/direct/WatermarkManager.java
----------------------------------------------------------------------
diff --git a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/WatermarkManager.java b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/WatermarkManager.java
index f01c13c..2228cd5 100644
--- a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/WatermarkManager.java
+++ b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/WatermarkManager.java
@@ -43,7 +43,10 @@ import java.util.Set;
import java.util.TreeSet;
import java.util.concurrent.ConcurrentLinkedQueue;
import java.util.concurrent.atomic.AtomicReference;
+import java.util.concurrent.locks.Lock;
+import java.util.concurrent.locks.ReentrantLock;
import javax.annotation.Nullable;
+import javax.annotation.concurrent.GuardedBy;
import org.apache.beam.runners.direct.DirectRunner.CommittedBundle;
import org.apache.beam.sdk.Pipeline;
import org.apache.beam.sdk.transforms.AppliedPTransform;
@@ -682,10 +685,16 @@ public class WatermarkManager {
private final ConcurrentLinkedQueue<PendingWatermarkUpdate> pendingUpdates;
/**
+ * A lock used to control concurrency for updating pending values.
+ */
+ private final Lock refreshLock;
+
+ /**
* A queue of pending {@link AppliedPTransform AppliedPTransforms} that have potentially
* stale data.
*/
- private final ConcurrentLinkedQueue<AppliedPTransform<?, ?, ?>> pendingRefreshes;
+ @GuardedBy("refreshLock")
+ private final Set<AppliedPTransform<?, ?, ?>> pendingRefreshes;
/**
* Creates a new {@link WatermarkManager}. All watermarks within the newly created
@@ -710,7 +719,9 @@ public class WatermarkManager {
this.clock = clock;
this.consumers = consumers;
this.pendingUpdates = new ConcurrentLinkedQueue<>();
- this.pendingRefreshes = new ConcurrentLinkedQueue<>();
+
+ this.refreshLock = new ReentrantLock();
+ this.pendingRefreshes = new HashSet<>();
transformToWatermarks = new HashMap<>();
@@ -795,13 +806,18 @@ public class WatermarkManager {
public void initialize(
Map<AppliedPTransform<?, ?, ?>, ? extends Iterable<CommittedBundle<?>>> initialBundles) {
- for (Map.Entry<AppliedPTransform<?, ?, ?>, ? extends Iterable<CommittedBundle<?>>> rootEntry :
- initialBundles.entrySet()) {
- TransformWatermarks rootWms = transformToWatermarks.get(rootEntry.getKey());
- for (CommittedBundle<?> initialBundle : rootEntry.getValue()) {
- rootWms.addPending(initialBundle);
+ refreshLock.lock();
+ try {
+ for (Map.Entry<AppliedPTransform<?, ?, ?>, ? extends Iterable<CommittedBundle<?>>> rootEntry :
+ initialBundles.entrySet()) {
+ TransformWatermarks rootWms = transformToWatermarks.get(rootEntry.getKey());
+ for (CommittedBundle<?> initialBundle : rootEntry.getValue()) {
+ rootWms.addPending(initialBundle);
+ }
+ pendingRefreshes.add(rootEntry.getKey());
}
- pendingRefreshes.offer(rootEntry.getKey());
+ } finally {
+ refreshLock.unlock();
}
}
@@ -834,6 +850,17 @@ public class WatermarkManager {
timerUpdate,
result,
earliestHold));
+ tryApplyPendingUpdates();
+ }
+
+ private void tryApplyPendingUpdates() {
+ if (refreshLock.tryLock()) {
+ try {
+ applyNUpdates(10);
+ } finally {
+ refreshLock.unlock();
+ }
+ }
}
/**
@@ -841,14 +868,24 @@ public class WatermarkManager {
* of all {@link TransformWatermarks} to be advanced as far as possible.
*/
private void applyPendingUpdates() {
- Set<AppliedPTransform<?, ?, ?>> updatedTransforms = new HashSet<>();
- PendingWatermarkUpdate pending = pendingUpdates.poll();
- while (pending != null) {
+ refreshLock.lock();
+ try {
+ applyNUpdates(-1);
+ } finally {
+ refreshLock.unlock();
+ }
+ }
+
+ @GuardedBy("refreshLock")
+ /**
+ * Applies up to {@code numUpdates}, or all available updates if numUpdates is non-positive.
+ */
+ private void applyNUpdates(int numUpdates) {
+ for (int i = 0; !pendingUpdates.isEmpty() && (i < numUpdates || numUpdates <= 0); i++) {
+ PendingWatermarkUpdate pending = pendingUpdates.poll();
applyPendingUpdate(pending);
- updatedTransforms.add(pending.getTransform());
- pending = pendingUpdates.poll();
+ pendingRefreshes.add(pending.getTransform());
}
- pendingRefreshes.addAll(updatedTransforms);
}
private void applyPendingUpdate(PendingWatermarkUpdate pending) {
@@ -905,22 +942,37 @@ public class WatermarkManager {
* watermarks to be advanced as far as possible.
*/
synchronized void refreshAll() {
- applyPendingUpdates();
- while (!pendingRefreshes.isEmpty()) {
- refreshWatermarks(pendingRefreshes.poll());
+ refreshLock.lock();
+ try {
+ applyPendingUpdates();
+ Set<AppliedPTransform<?, ?, ?>> toRefresh = pendingRefreshes;
+ while (!toRefresh.isEmpty()) {
+ toRefresh = refreshAllOf(toRefresh);
+ }
+ } finally {
+ refreshLock.unlock();
}
}
- private void refreshWatermarks(AppliedPTransform<?, ?, ?> toRefresh) {
+ private Set<AppliedPTransform<?, ?, ?>> refreshAllOf(Set<AppliedPTransform<?, ?, ?>> toRefresh) {
+ Set<AppliedPTransform<?, ?, ?>> newRefreshes = new HashSet<>();
+ for (AppliedPTransform<?, ?, ?> transform : toRefresh) {
+ newRefreshes.addAll(refreshWatermarks(transform));
+ }
+ return newRefreshes;
+ }
+
+ private Set<AppliedPTransform<?, ?, ?>> refreshWatermarks(AppliedPTransform<?, ?, ?> toRefresh) {
TransformWatermarks myWatermarks = transformToWatermarks.get(toRefresh);
WatermarkUpdate updateResult = myWatermarks.refresh();
- Set<AppliedPTransform<?, ?, ?>> additionalRefreshes = new HashSet<>();
if (updateResult.isAdvanced()) {
+ Set<AppliedPTransform<?, ?, ?>> additionalRefreshes = new HashSet<>();
for (PValue outputPValue : toRefresh.getOutput().expand()) {
additionalRefreshes.addAll(consumers.get(outputPValue));
}
+ return additionalRefreshes;
}
- pendingRefreshes.addAll(additionalRefreshes);
+ return Collections.emptySet();
}
/**
@@ -929,12 +981,17 @@ public class WatermarkManager {
*/
public Collection<FiredTimers> extractFiredTimers() {
Collection<FiredTimers> allTimers = new ArrayList<>();
- for (Map.Entry<AppliedPTransform<?, ?, ?>, TransformWatermarks> watermarksEntry :
- transformToWatermarks.entrySet()) {
- Collection<FiredTimers> firedTimers = watermarksEntry.getValue().extractFiredTimers();
- allTimers.addAll(firedTimers);
+ refreshLock.lock();
+ try {
+ for (Map.Entry<AppliedPTransform<?, ?, ?>, TransformWatermarks> watermarksEntry
+ : transformToWatermarks.entrySet()) {
+ Collection<FiredTimers> firedTimers = watermarksEntry.getValue().extractFiredTimers();
+ allTimers.addAll(firedTimers);
+ }
+ return allTimers;
+ } finally {
+ refreshLock.unlock();
}
- return allTimers;
}
/**
@@ -1032,7 +1089,7 @@ public class WatermarkManager {
* Removes the hold of the provided key.
*/
public void removeHold(Object key) {
- KeyedHold oldHold = keyedHolds.get(key);
+ KeyedHold oldHold = keyedHolds.remove(key);
if (oldHold != null) {
allHolds.remove(oldHold);
}