You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@beam.apache.org by tg...@apache.org on 2016/11/07 23:48:31 UTC
[2/3] incubator-beam git commit: Track Minimum Element Timestamps
within Bundles
Track Minimum Element Timestamps within Bundles
This allows the Watermark Manager to track pending elements by bundles
of elements rather than per-element, which significantly reduces the
amount of work done per-element to track watermarks.
Project: http://git-wip-us.apache.org/repos/asf/incubator-beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-beam/commit/a58f1eba
Tree: http://git-wip-us.apache.org/repos/asf/incubator-beam/tree/a58f1eba
Diff: http://git-wip-us.apache.org/repos/asf/incubator-beam/diff/a58f1eba
Branch: refs/heads/master
Commit: a58f1eba1a3b5ae453f4d65ad409785cb717b2ae
Parents: 317b5e6
Author: Thomas Groh <tg...@google.com>
Authored: Fri Nov 4 12:54:00 2016 -0700
Committer: Thomas Groh <tg...@google.com>
Committed: Mon Nov 7 15:47:02 2016 -0800
----------------------------------------------------------------------
.../beam/runners/direct/DirectRunner.java | 5 ++
.../direct/ImmutableListBundleFactory.java | 21 ++++++++-
.../beam/runners/direct/WatermarkManager.java | 48 +++++++++-----------
.../direct/ImmutableListBundleFactoryTest.java | 15 +++++-
4 files changed, 60 insertions(+), 29 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/a58f1eba/runners/direct-java/src/main/java/org/apache/beam/runners/direct/DirectRunner.java
----------------------------------------------------------------------
diff --git a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/DirectRunner.java b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/DirectRunner.java
index 44d1986..4d5a449 100644
--- a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/DirectRunner.java
+++ b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/DirectRunner.java
@@ -151,6 +151,11 @@ public class DirectRunner
Iterable<WindowedValue<T>> getElements();
/**
+ * Returns the minimum timestamp among all of the elements of this {@link CommittedBundle}.
+ */
+ Instant getMinTimestamp();
+
+ /**
* Returns the processing time output watermark at the time the producing {@link PTransform}
* committed this bundle. Downstream synchronized processing time watermarks cannot progress
* past this point before consuming this bundle.
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/a58f1eba/runners/direct-java/src/main/java/org/apache/beam/runners/direct/ImmutableListBundleFactory.java
----------------------------------------------------------------------
diff --git a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/ImmutableListBundleFactory.java b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/ImmutableListBundleFactory.java
index abc6dd8..6b342d6 100644
--- a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/ImmutableListBundleFactory.java
+++ b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/ImmutableListBundleFactory.java
@@ -24,6 +24,7 @@ import com.google.common.collect.ImmutableList;
import javax.annotation.Nullable;
import org.apache.beam.runners.direct.DirectRunner.CommittedBundle;
import org.apache.beam.runners.direct.DirectRunner.UncommittedBundle;
+import org.apache.beam.sdk.transforms.windowing.BoundedWindow;
import org.apache.beam.sdk.util.WindowedValue;
import org.apache.beam.sdk.values.PCollection;
import org.joda.time.Instant;
@@ -64,6 +65,7 @@ class ImmutableListBundleFactory implements BundleFactory {
private final StructuralKey<?> key;
private boolean committed = false;
private ImmutableList.Builder<WindowedValue<T>> elements;
+ private Instant minSoFar = BoundedWindow.TIMESTAMP_MAX_VALUE;
/**
* Create a new {@link UncommittedImmutableListBundle} for the specified {@link PCollection}.
@@ -93,6 +95,9 @@ class ImmutableListBundleFactory implements BundleFactory {
element,
pcollection);
elements.add(element);
+ if (element.getTimestamp().isBefore(minSoFar)) {
+ minSoFar = element.getTimestamp();
+ }
return this;
}
@@ -102,7 +107,7 @@ class ImmutableListBundleFactory implements BundleFactory {
committed = true;
final Iterable<WindowedValue<T>> committedElements = elements.build();
return CommittedImmutableListBundle.create(
- pcollection, key, committedElements, synchronizedCompletionTime);
+ pcollection, key, committedElements, minSoFar, synchronizedCompletionTime);
}
}
@@ -112,9 +117,10 @@ class ImmutableListBundleFactory implements BundleFactory {
@Nullable PCollection<T> pcollection,
StructuralKey<?> key,
Iterable<WindowedValue<T>> committedElements,
+ Instant minElementTimestamp,
Instant synchronizedCompletionTime) {
return new AutoValue_ImmutableListBundleFactory_CommittedImmutableListBundle<>(
- pcollection, key, committedElements, synchronizedCompletionTime);
+ pcollection, key, committedElements, minElementTimestamp, synchronizedCompletionTime);
}
@Override
@@ -123,6 +129,7 @@ class ImmutableListBundleFactory implements BundleFactory {
getPCollection(),
getKey(),
ImmutableList.copyOf(elements),
+ minTimestamp(elements),
getSynchronizedProcessingOutputWatermark());
}
@@ -136,4 +143,14 @@ class ImmutableListBundleFactory implements BundleFactory {
return this == obj;
}
}
+
+ private static Instant minTimestamp(Iterable<? extends WindowedValue<?>> elements) {
+ Instant minTs = BoundedWindow.TIMESTAMP_MAX_VALUE;
+ for (WindowedValue<?> element : elements) {
+ if (element.getTimestamp().isBefore(minTs)) {
+ minTs = element.getTimestamp();
+ }
+ }
+ return minTs;
+ }
}
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/a58f1eba/runners/direct-java/src/main/java/org/apache/beam/runners/direct/WatermarkManager.java
----------------------------------------------------------------------
diff --git a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/WatermarkManager.java b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/WatermarkManager.java
index 2228cd5..f235af0 100644
--- a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/WatermarkManager.java
+++ b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/WatermarkManager.java
@@ -27,8 +27,6 @@ import com.google.common.collect.ImmutableList;
import com.google.common.collect.ImmutableSet;
import com.google.common.collect.Iterables;
import com.google.common.collect.Ordering;
-import com.google.common.collect.SortedMultiset;
-import com.google.common.collect.TreeMultiset;
import java.util.ArrayList;
import java.util.Collection;
import java.util.Collections;
@@ -55,7 +53,6 @@ import org.apache.beam.sdk.transforms.windowing.BoundedWindow;
import org.apache.beam.sdk.util.TimeDomain;
import org.apache.beam.sdk.util.TimerInternals;
import org.apache.beam.sdk.util.TimerInternals.TimerData;
-import org.apache.beam.sdk.util.WindowedValue;
import org.apache.beam.sdk.values.PCollection;
import org.apache.beam.sdk.values.PValue;
import org.joda.time.Instant;
@@ -123,6 +120,10 @@ import org.joda.time.Instant;
* </pre>
*/
public class WatermarkManager {
+ // The number of updates to apply in #tryApplyPendingUpdates
+ private static final int MAX_INCREMENTAL_UPDATES = 10;
+
+
/**
* The watermark of some {@link Pipeline} element, usually a {@link PTransform} or a
* {@link PCollection}.
@@ -203,7 +204,7 @@ public class WatermarkManager {
*/
private static class AppliedPTransformInputWatermark implements Watermark {
private final Collection<? extends Watermark> inputWatermarks;
- private final SortedMultiset<WindowedValue<?>> pendingElements;
+ private final NavigableSet<CommittedBundle<?>> pendingElements;
private final Map<StructuralKey<?>, NavigableSet<TimerData>> objectTimers;
private AtomicReference<Instant> currentWatermark;
@@ -213,10 +214,10 @@ public class WatermarkManager {
// The ordering must order elements by timestamp, and must not compare two distinct elements
// as equal. This is built on the assumption that any element added as a pending element will
// be consumed without modifications.
- Ordering<WindowedValue<?>> pendingElementComparator =
- new WindowedValueByTimestampComparator().compound(Ordering.arbitrary());
+ Ordering<CommittedBundle<?>> pendingBundleComparator =
+ new BundleByElementTimestampComparator().compound(Ordering.arbitrary());
this.pendingElements =
- TreeMultiset.create(pendingElementComparator);
+ new TreeSet<>(pendingBundleComparator);
this.objectTimers = new HashMap<>();
currentWatermark = new AtomicReference<>(BoundedWindow.TIMESTAMP_MIN_VALUE);
}
@@ -249,25 +250,20 @@ public class WatermarkManager {
minInputWatermark = INSTANT_ORDERING.min(minInputWatermark, inputWatermark.get());
}
if (!pendingElements.isEmpty()) {
- minInputWatermark = INSTANT_ORDERING.min(
- minInputWatermark, pendingElements.firstEntry().getElement().getTimestamp());
+ minInputWatermark =
+ INSTANT_ORDERING.min(minInputWatermark, pendingElements.first().getMinTimestamp());
}
Instant newWatermark = INSTANT_ORDERING.max(oldWatermark, minInputWatermark);
currentWatermark.set(newWatermark);
return WatermarkUpdate.fromTimestamps(oldWatermark, newWatermark);
}
- private synchronized void addPendingElements(Iterable<? extends WindowedValue<?>> newPending) {
- for (WindowedValue<?> pendingElement : newPending) {
- pendingElements.add(pendingElement);
- }
+ private synchronized void addPending(CommittedBundle<?> newPending) {
+ pendingElements.add(newPending);
}
- private synchronized void removePendingElements(
- Iterable<? extends WindowedValue<?>> finishedElements) {
- for (WindowedValue<?> finishedElement : finishedElements) {
- pendingElements.remove(finishedElement);
- }
+ private synchronized void removePending(CommittedBundle<?> completed) {
+ pendingElements.remove(completed);
}
private synchronized void updateTimers(TimerUpdate update) {
@@ -856,7 +852,7 @@ public class WatermarkManager {
private void tryApplyPendingUpdates() {
if (refreshLock.tryLock()) {
try {
- applyNUpdates(10);
+ applyNUpdates(MAX_INCREMENTAL_UPDATES);
} finally {
refreshLock.unlock();
}
@@ -867,7 +863,7 @@ public class WatermarkManager {
* Applies all pending updates to this {@link WatermarkManager}, causing the pending state
* of all {@link TransformWatermarks} to be advanced as far as possible.
*/
- private void applyPendingUpdates() {
+ private void applyAllPendingUpdates() {
refreshLock.lock();
try {
applyNUpdates(-1);
@@ -944,7 +940,7 @@ public class WatermarkManager {
synchronized void refreshAll() {
refreshLock.lock();
try {
- applyPendingUpdates();
+ applyAllPendingUpdates();
Set<AppliedPTransform<?, ?, ?>> toRefresh = pendingRefreshes;
while (!toRefresh.isEmpty()) {
toRefresh = refreshAllOf(toRefresh);
@@ -1180,12 +1176,12 @@ public class WatermarkManager {
}
private void removePending(CommittedBundle<?> bundle) {
- inputWatermark.removePendingElements(bundle.getElements());
+ inputWatermark.removePending(bundle);
synchronizedProcessingInputWatermark.removePending(bundle);
}
private void addPending(CommittedBundle<?> bundle) {
- inputWatermark.addPendingElements(bundle.getElements());
+ inputWatermark.addPending(bundle);
synchronizedProcessingInputWatermark.addPending(bundle);
}
@@ -1434,11 +1430,11 @@ public class WatermarkManager {
}
}
- private static class WindowedValueByTimestampComparator extends Ordering<WindowedValue<?>> {
+ private static class BundleByElementTimestampComparator extends Ordering<CommittedBundle<?>> {
@Override
- public int compare(WindowedValue<?> o1, WindowedValue<?> o2) {
+ public int compare(CommittedBundle<?> o1, CommittedBundle<?> o2) {
return ComparisonChain.start()
- .compare(o1.getTimestamp(), o2.getTimestamp())
+ .compare(o1.getMinTimestamp(), o2.getMinTimestamp())
.result();
}
}
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/a58f1eba/runners/direct-java/src/test/java/org/apache/beam/runners/direct/ImmutableListBundleFactoryTest.java
----------------------------------------------------------------------
diff --git a/runners/direct-java/src/test/java/org/apache/beam/runners/direct/ImmutableListBundleFactoryTest.java b/runners/direct-java/src/test/java/org/apache/beam/runners/direct/ImmutableListBundleFactoryTest.java
index 4a7477f..a36c408 100644
--- a/runners/direct-java/src/test/java/org/apache/beam/runners/direct/ImmutableListBundleFactoryTest.java
+++ b/runners/direct-java/src/test/java/org/apache/beam/runners/direct/ImmutableListBundleFactoryTest.java
@@ -19,6 +19,7 @@ package org.apache.beam.runners.direct;
import static org.hamcrest.Matchers.containsInAnyOrder;
import static org.hamcrest.Matchers.equalTo;
+import static org.hamcrest.Matchers.not;
import static org.junit.Assert.assertThat;
import com.google.common.collect.ImmutableList;
@@ -35,6 +36,7 @@ import org.apache.beam.sdk.coders.VoidCoder;
import org.apache.beam.sdk.testing.TestPipeline;
import org.apache.beam.sdk.transforms.Create;
import org.apache.beam.sdk.transforms.WithKeys;
+import org.apache.beam.sdk.transforms.windowing.BoundedWindow;
import org.apache.beam.sdk.transforms.windowing.IntervalWindow;
import org.apache.beam.sdk.transforms.windowing.PaneInfo;
import org.apache.beam.sdk.util.WindowedValue;
@@ -95,15 +97,25 @@ public class ImmutableListBundleFactoryTest {
afterCommitGetElementsShouldHaveAddedElements(Iterable<WindowedValue<T>> elems) {
UncommittedBundle<T> bundle = bundleFactory.createRootBundle();
Collection<Matcher<? super WindowedValue<T>>> expectations = new ArrayList<>();
+ Instant minElementTs = BoundedWindow.TIMESTAMP_MAX_VALUE;
for (WindowedValue<T> elem : elems) {
bundle.add(elem);
expectations.add(equalTo(elem));
+ if (elem.getTimestamp().isBefore(minElementTs)) {
+ minElementTs = elem.getTimestamp();
+ }
}
Matcher<Iterable<? extends WindowedValue<T>>> containsMatcher =
Matchers.<WindowedValue<T>>containsInAnyOrder(expectations);
- CommittedBundle<T> committed = bundle.commit(Instant.now());
+ Instant commitTime = Instant.now();
+ CommittedBundle<T> committed = bundle.commit(commitTime);
assertThat(committed.getElements(), containsMatcher);
+ // Sanity check that the test is meaningful.
+ assertThat(minElementTs, not(equalTo(commitTime)));
+ assertThat(committed.getMinTimestamp(), equalTo(minElementTs));
+ assertThat(committed.getSynchronizedProcessingOutputWatermark(), equalTo(commitTime));
+
return committed;
}
@@ -149,6 +161,7 @@ public class ImmutableListBundleFactoryTest {
assertThat(
withed.getSynchronizedProcessingOutputWatermark(),
equalTo(committed.getSynchronizedProcessingOutputWatermark()));
+ assertThat(withed.getMinTimestamp(), equalTo(new Instant(2048L)));
}
@Test