You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@beam.apache.org by tg...@apache.org on 2016/11/11 18:59:40 UTC
[3/3] incubator-beam git commit: Use a Multiset to track Pending
Bundles
Use a Multiset to track Pending Bundles
A bundle may be consumed by a Flatten multiple times. The last instance
completing should remove the hold caused by the bundle, not the first.
Project: http://git-wip-us.apache.org/repos/asf/incubator-beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-beam/commit/4b97cdb7
Tree: http://git-wip-us.apache.org/repos/asf/incubator-beam/tree/4b97cdb7
Diff: http://git-wip-us.apache.org/repos/asf/incubator-beam/diff/4b97cdb7
Branch: refs/heads/master
Commit: 4b97cdb73882083474a3479198b6dfda6f09bb5f
Parents: 4891784
Author: Thomas Groh <tg...@google.com>
Authored: Thu Nov 10 11:03:34 2016 -0800
Committer: Thomas Groh <tg...@google.com>
Committed: Fri Nov 11 10:58:52 2016 -0800
----------------------------------------------------------------------
.../beam/runners/direct/WatermarkManager.java | 9 ++-
.../ConsumerTrackingPipelineVisitorTest.java | 17 ++++
.../runners/direct/WatermarkManagerTest.java | 83 ++++++++++++++++++++
3 files changed, 106 insertions(+), 3 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/4b97cdb7/runners/direct-java/src/main/java/org/apache/beam/runners/direct/WatermarkManager.java
----------------------------------------------------------------------
diff --git a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/WatermarkManager.java b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/WatermarkManager.java
index f235af0..fe2c2e5 100644
--- a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/WatermarkManager.java
+++ b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/WatermarkManager.java
@@ -27,6 +27,8 @@ import com.google.common.collect.ImmutableList;
import com.google.common.collect.ImmutableSet;
import com.google.common.collect.Iterables;
import com.google.common.collect.Ordering;
+import com.google.common.collect.SortedMultiset;
+import com.google.common.collect.TreeMultiset;
import java.util.ArrayList;
import java.util.Collection;
import java.util.Collections;
@@ -204,7 +206,7 @@ public class WatermarkManager {
*/
private static class AppliedPTransformInputWatermark implements Watermark {
private final Collection<? extends Watermark> inputWatermarks;
- private final NavigableSet<CommittedBundle<?>> pendingElements;
+ private final SortedMultiset<CommittedBundle<?>> pendingElements;
private final Map<StructuralKey<?>, NavigableSet<TimerData>> objectTimers;
private AtomicReference<Instant> currentWatermark;
@@ -217,7 +219,7 @@ public class WatermarkManager {
Ordering<CommittedBundle<?>> pendingBundleComparator =
new BundleByElementTimestampComparator().compound(Ordering.arbitrary());
this.pendingElements =
- new TreeSet<>(pendingBundleComparator);
+ TreeMultiset.create(pendingBundleComparator);
this.objectTimers = new HashMap<>();
currentWatermark = new AtomicReference<>(BoundedWindow.TIMESTAMP_MIN_VALUE);
}
@@ -251,7 +253,8 @@ public class WatermarkManager {
}
if (!pendingElements.isEmpty()) {
minInputWatermark =
- INSTANT_ORDERING.min(minInputWatermark, pendingElements.first().getMinTimestamp());
+ INSTANT_ORDERING.min(
+ minInputWatermark, pendingElements.firstEntry().getElement().getMinTimestamp());
}
Instant newWatermark = INSTANT_ORDERING.max(oldWatermark, minInputWatermark);
currentWatermark.set(newWatermark);
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/4b97cdb7/runners/direct-java/src/test/java/org/apache/beam/runners/direct/ConsumerTrackingPipelineVisitorTest.java
----------------------------------------------------------------------
diff --git a/runners/direct-java/src/test/java/org/apache/beam/runners/direct/ConsumerTrackingPipelineVisitorTest.java b/runners/direct-java/src/test/java/org/apache/beam/runners/direct/ConsumerTrackingPipelineVisitorTest.java
index e8f2a7e..f7f4b71 100644
--- a/runners/direct-java/src/test/java/org/apache/beam/runners/direct/ConsumerTrackingPipelineVisitorTest.java
+++ b/runners/direct-java/src/test/java/org/apache/beam/runners/direct/ConsumerTrackingPipelineVisitorTest.java
@@ -133,6 +133,23 @@ public class ConsumerTrackingPipelineVisitorTest implements Serializable {
}
@Test
+ public void getValueToConsumersWithDuplicateInputSucceeds() {
+ PCollection<String> created = p.apply(Create.of("1", "2", "3"));
+
+ PCollection<String> flattened =
+ PCollectionList.of(created).and(created).apply(Flatten.<String>pCollections());
+
+ p.traverseTopologically(visitor);
+
+ assertThat(
+ visitor.getValueToConsumers().get(created),
+ Matchers.<AppliedPTransform<?, ?, ?>>containsInAnyOrder(
+ flattened.getProducingTransformInternal(),
+ flattened.getProducingTransformInternal()));
+ assertThat(visitor.getValueToConsumers().get(flattened), emptyIterable());
+ }
+
+ @Test
public void getUnfinalizedPValuesContainsDanglingOutputs() {
PCollection<String> created = p.apply(Create.of("1", "2", "3"));
PCollection<String> transformed =
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/4b97cdb7/runners/direct-java/src/test/java/org/apache/beam/runners/direct/WatermarkManagerTest.java
----------------------------------------------------------------------
diff --git a/runners/direct-java/src/test/java/org/apache/beam/runners/direct/WatermarkManagerTest.java b/runners/direct-java/src/test/java/org/apache/beam/runners/direct/WatermarkManagerTest.java
index 6bde462..2e8ab84 100644
--- a/runners/direct-java/src/test/java/org/apache/beam/runners/direct/WatermarkManagerTest.java
+++ b/runners/direct-java/src/test/java/org/apache/beam/runners/direct/WatermarkManagerTest.java
@@ -25,6 +25,7 @@ import static org.hamcrest.Matchers.not;
import static org.junit.Assert.assertThat;
import com.google.common.collect.ImmutableList;
+import com.google.common.collect.ImmutableMap;
import com.google.common.collect.Iterables;
import java.io.Serializable;
import java.util.Collection;
@@ -295,6 +296,88 @@ public class WatermarkManagerTest implements Serializable {
}
/**
+ * Demonstrates that getWatermark for a transform that takes multiple inputs is held to the
+ * minimum watermark across all of its inputs.
+ */
+ @Test
+ public void getWatermarkMultiIdenticalInput() {
+ TestPipeline p = TestPipeline.create();
+ PCollection<Integer> created = p.apply(Create.of(1, 2, 3));
+ PCollection<Integer> multiConsumer =
+ PCollectionList.of(created).and(created).apply(Flatten.<Integer>pCollections());
+ AppliedPTransform<?, ?, ?> theFlatten = multiConsumer.getProducingTransformInternal();
+
+ Map<PValue, Collection<AppliedPTransform<?, ?, ?>>> valueToConsumers =
+ ImmutableMap.<PValue, Collection<AppliedPTransform<?, ?, ?>>>builder()
+ .put(created, ImmutableList.<AppliedPTransform<?, ?, ?>>of(theFlatten, theFlatten))
+ .put(multiConsumer, Collections.<AppliedPTransform<?, ?, ?>>emptyList())
+ .build();
+
+ WatermarkManager tstMgr =
+ WatermarkManager.create(
+ clock,
+ Collections.<AppliedPTransform<?, ?, ?>>singleton(
+ created.getProducingTransformInternal()),
+ valueToConsumers);
+ CommittedBundle<Void> root =
+ bundleFactory
+ .<Void>createRootBundle()
+ .add(WindowedValue.<Void>valueInGlobalWindow(null))
+ .commit(clock.now());
+ CommittedBundle<Integer> createBundle =
+ bundleFactory
+ .createBundle(created)
+ .add(WindowedValue.timestampedValueInGlobalWindow(1, new Instant(33536)))
+ .commit(clock.now());
+
+ Map<AppliedPTransform<?, ?, ?>, Collection<CommittedBundle<?>>> initialInputs =
+ ImmutableMap.<AppliedPTransform<?, ?, ?>, Collection<CommittedBundle<?>>>builder()
+ .put(
+ created.getProducingTransformInternal(),
+ Collections.<CommittedBundle<?>>singleton(root))
+ .build();
+ tstMgr.initialize(initialInputs);
+ tstMgr.updateWatermarks(
+ root,
+ TimerUpdate.empty(),
+ CommittedResult.create(
+ StepTransformResult.withoutHold(created.getProducingTransformInternal()).build(),
+ root.withElements(Collections.<WindowedValue<Void>>emptyList()),
+ Collections.singleton(createBundle),
+ EnumSet.allOf(OutputType.class)),
+ BoundedWindow.TIMESTAMP_MAX_VALUE);
+
+ tstMgr.refreshAll();
+ TransformWatermarks flattenWms = tstMgr.getWatermarks(theFlatten);
+ assertThat(flattenWms.getInputWatermark(), equalTo(new Instant(33536)));
+
+ tstMgr.updateWatermarks(
+ createBundle,
+ TimerUpdate.empty(),
+ CommittedResult.create(
+ StepTransformResult.withoutHold(theFlatten).build(),
+ createBundle.withElements(Collections.<WindowedValue<Integer>>emptyList()),
+ Collections.<CommittedBundle<?>>emptyList(),
+ EnumSet.allOf(OutputType.class)),
+ BoundedWindow.TIMESTAMP_MAX_VALUE);
+
+ tstMgr.refreshAll();
+ assertThat(flattenWms.getInputWatermark(), equalTo(new Instant(33536)));
+
+ tstMgr.updateWatermarks(
+ createBundle,
+ TimerUpdate.empty(),
+ CommittedResult.create(
+ StepTransformResult.withoutHold(theFlatten).build(),
+ createBundle.withElements(Collections.<WindowedValue<Integer>>emptyList()),
+ Collections.<CommittedBundle<?>>emptyList(),
+ EnumSet.allOf(OutputType.class)),
+ BoundedWindow.TIMESTAMP_MAX_VALUE);
+ tstMgr.refreshAll();
+ assertThat(flattenWms.getInputWatermark(), equalTo(BoundedWindow.TIMESTAMP_MAX_VALUE));
+ }
+
+ /**
* Demonstrates that pending elements are independent among
* {@link AppliedPTransform AppliedPTransforms} that consume the same input {@link PCollection}.
*/