You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@beam.apache.org by tg...@apache.org on 2016/11/11 18:59:40 UTC

[3/3] incubator-beam git commit: Use a Multiset to track Pending Bundles

Use a Multiset to track Pending Bundles

A bundle may be consumed by a Flatten multiple times. The last instance
completing should remove the hold caused by the bundle, not the first.


Project: http://git-wip-us.apache.org/repos/asf/incubator-beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-beam/commit/4b97cdb7
Tree: http://git-wip-us.apache.org/repos/asf/incubator-beam/tree/4b97cdb7
Diff: http://git-wip-us.apache.org/repos/asf/incubator-beam/diff/4b97cdb7

Branch: refs/heads/master
Commit: 4b97cdb73882083474a3479198b6dfda6f09bb5f
Parents: 4891784
Author: Thomas Groh <tg...@google.com>
Authored: Thu Nov 10 11:03:34 2016 -0800
Committer: Thomas Groh <tg...@google.com>
Committed: Fri Nov 11 10:58:52 2016 -0800

----------------------------------------------------------------------
 .../beam/runners/direct/WatermarkManager.java   |  9 ++-
 .../ConsumerTrackingPipelineVisitorTest.java    | 17 ++++
 .../runners/direct/WatermarkManagerTest.java    | 83 ++++++++++++++++++++
 3 files changed, 106 insertions(+), 3 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/4b97cdb7/runners/direct-java/src/main/java/org/apache/beam/runners/direct/WatermarkManager.java
----------------------------------------------------------------------
diff --git a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/WatermarkManager.java b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/WatermarkManager.java
index f235af0..fe2c2e5 100644
--- a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/WatermarkManager.java
+++ b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/WatermarkManager.java
@@ -27,6 +27,8 @@ import com.google.common.collect.ImmutableList;
 import com.google.common.collect.ImmutableSet;
 import com.google.common.collect.Iterables;
 import com.google.common.collect.Ordering;
+import com.google.common.collect.SortedMultiset;
+import com.google.common.collect.TreeMultiset;
 import java.util.ArrayList;
 import java.util.Collection;
 import java.util.Collections;
@@ -204,7 +206,7 @@ public class WatermarkManager {
    */
   private static class AppliedPTransformInputWatermark implements Watermark {
     private final Collection<? extends Watermark> inputWatermarks;
-    private final NavigableSet<CommittedBundle<?>> pendingElements;
+    private final SortedMultiset<CommittedBundle<?>> pendingElements;
     private final Map<StructuralKey<?>, NavigableSet<TimerData>> objectTimers;
 
     private AtomicReference<Instant> currentWatermark;
@@ -217,7 +219,7 @@ public class WatermarkManager {
       Ordering<CommittedBundle<?>> pendingBundleComparator =
           new BundleByElementTimestampComparator().compound(Ordering.arbitrary());
       this.pendingElements =
-          new TreeSet<>(pendingBundleComparator);
+          TreeMultiset.create(pendingBundleComparator);
       this.objectTimers = new HashMap<>();
       currentWatermark = new AtomicReference<>(BoundedWindow.TIMESTAMP_MIN_VALUE);
     }
@@ -251,7 +253,8 @@ public class WatermarkManager {
       }
       if (!pendingElements.isEmpty()) {
         minInputWatermark =
-            INSTANT_ORDERING.min(minInputWatermark, pendingElements.first().getMinTimestamp());
+            INSTANT_ORDERING.min(
+                minInputWatermark, pendingElements.firstEntry().getElement().getMinTimestamp());
       }
       Instant newWatermark = INSTANT_ORDERING.max(oldWatermark, minInputWatermark);
       currentWatermark.set(newWatermark);

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/4b97cdb7/runners/direct-java/src/test/java/org/apache/beam/runners/direct/ConsumerTrackingPipelineVisitorTest.java
----------------------------------------------------------------------
diff --git a/runners/direct-java/src/test/java/org/apache/beam/runners/direct/ConsumerTrackingPipelineVisitorTest.java b/runners/direct-java/src/test/java/org/apache/beam/runners/direct/ConsumerTrackingPipelineVisitorTest.java
index e8f2a7e..f7f4b71 100644
--- a/runners/direct-java/src/test/java/org/apache/beam/runners/direct/ConsumerTrackingPipelineVisitorTest.java
+++ b/runners/direct-java/src/test/java/org/apache/beam/runners/direct/ConsumerTrackingPipelineVisitorTest.java
@@ -133,6 +133,23 @@ public class ConsumerTrackingPipelineVisitorTest implements Serializable {
   }
 
   @Test
+  public void getValueToConsumersWithDuplicateInputSucceeds() {
+    PCollection<String> created = p.apply(Create.of("1", "2", "3"));
+
+    PCollection<String> flattened =
+        PCollectionList.of(created).and(created).apply(Flatten.<String>pCollections());
+
+    p.traverseTopologically(visitor);
+
+    assertThat(
+        visitor.getValueToConsumers().get(created),
+        Matchers.<AppliedPTransform<?, ?, ?>>containsInAnyOrder(
+            flattened.getProducingTransformInternal(),
+            flattened.getProducingTransformInternal()));
+    assertThat(visitor.getValueToConsumers().get(flattened), emptyIterable());
+  }
+
+  @Test
   public void getUnfinalizedPValuesContainsDanglingOutputs() {
     PCollection<String> created = p.apply(Create.of("1", "2", "3"));
     PCollection<String> transformed =

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/4b97cdb7/runners/direct-java/src/test/java/org/apache/beam/runners/direct/WatermarkManagerTest.java
----------------------------------------------------------------------
diff --git a/runners/direct-java/src/test/java/org/apache/beam/runners/direct/WatermarkManagerTest.java b/runners/direct-java/src/test/java/org/apache/beam/runners/direct/WatermarkManagerTest.java
index 6bde462..2e8ab84 100644
--- a/runners/direct-java/src/test/java/org/apache/beam/runners/direct/WatermarkManagerTest.java
+++ b/runners/direct-java/src/test/java/org/apache/beam/runners/direct/WatermarkManagerTest.java
@@ -25,6 +25,7 @@ import static org.hamcrest.Matchers.not;
 import static org.junit.Assert.assertThat;
 
 import com.google.common.collect.ImmutableList;
+import com.google.common.collect.ImmutableMap;
 import com.google.common.collect.Iterables;
 import java.io.Serializable;
 import java.util.Collection;
@@ -295,6 +296,88 @@ public class WatermarkManagerTest implements Serializable {
   }
 
   /**
+   * Demonstrates that getWatermark for a transform that takes multiple inputs is held to the
+   * minimum watermark across all of its inputs.
+   */
+  @Test
+  public void getWatermarkMultiIdenticalInput() {
+    TestPipeline p = TestPipeline.create();
+    PCollection<Integer> created = p.apply(Create.of(1, 2, 3));
+    PCollection<Integer> multiConsumer =
+        PCollectionList.of(created).and(created).apply(Flatten.<Integer>pCollections());
+    AppliedPTransform<?, ?, ?> theFlatten = multiConsumer.getProducingTransformInternal();
+
+    Map<PValue, Collection<AppliedPTransform<?, ?, ?>>> valueToConsumers =
+        ImmutableMap.<PValue, Collection<AppliedPTransform<?, ?, ?>>>builder()
+            .put(created, ImmutableList.<AppliedPTransform<?, ?, ?>>of(theFlatten, theFlatten))
+            .put(multiConsumer, Collections.<AppliedPTransform<?, ?, ?>>emptyList())
+            .build();
+
+    WatermarkManager tstMgr =
+        WatermarkManager.create(
+            clock,
+            Collections.<AppliedPTransform<?, ?, ?>>singleton(
+                created.getProducingTransformInternal()),
+            valueToConsumers);
+    CommittedBundle<Void> root =
+        bundleFactory
+            .<Void>createRootBundle()
+            .add(WindowedValue.<Void>valueInGlobalWindow(null))
+            .commit(clock.now());
+    CommittedBundle<Integer> createBundle =
+        bundleFactory
+            .createBundle(created)
+            .add(WindowedValue.timestampedValueInGlobalWindow(1, new Instant(33536)))
+            .commit(clock.now());
+
+    Map<AppliedPTransform<?, ?, ?>, Collection<CommittedBundle<?>>> initialInputs =
+        ImmutableMap.<AppliedPTransform<?, ?, ?>, Collection<CommittedBundle<?>>>builder()
+            .put(
+                created.getProducingTransformInternal(),
+                Collections.<CommittedBundle<?>>singleton(root))
+            .build();
+    tstMgr.initialize(initialInputs);
+    tstMgr.updateWatermarks(
+        root,
+        TimerUpdate.empty(),
+        CommittedResult.create(
+            StepTransformResult.withoutHold(created.getProducingTransformInternal()).build(),
+            root.withElements(Collections.<WindowedValue<Void>>emptyList()),
+            Collections.singleton(createBundle),
+            EnumSet.allOf(OutputType.class)),
+        BoundedWindow.TIMESTAMP_MAX_VALUE);
+
+    tstMgr.refreshAll();
+    TransformWatermarks flattenWms = tstMgr.getWatermarks(theFlatten);
+    assertThat(flattenWms.getInputWatermark(), equalTo(new Instant(33536)));
+
+    tstMgr.updateWatermarks(
+        createBundle,
+        TimerUpdate.empty(),
+        CommittedResult.create(
+            StepTransformResult.withoutHold(theFlatten).build(),
+            createBundle.withElements(Collections.<WindowedValue<Integer>>emptyList()),
+            Collections.<CommittedBundle<?>>emptyList(),
+            EnumSet.allOf(OutputType.class)),
+        BoundedWindow.TIMESTAMP_MAX_VALUE);
+
+    tstMgr.refreshAll();
+    assertThat(flattenWms.getInputWatermark(), equalTo(new Instant(33536)));
+
+    tstMgr.updateWatermarks(
+        createBundle,
+        TimerUpdate.empty(),
+        CommittedResult.create(
+            StepTransformResult.withoutHold(theFlatten).build(),
+            createBundle.withElements(Collections.<WindowedValue<Integer>>emptyList()),
+            Collections.<CommittedBundle<?>>emptyList(),
+            EnumSet.allOf(OutputType.class)),
+        BoundedWindow.TIMESTAMP_MAX_VALUE);
+    tstMgr.refreshAll();
+    assertThat(flattenWms.getInputWatermark(), equalTo(BoundedWindow.TIMESTAMP_MAX_VALUE));
+  }
+
+  /**
    * Demonstrates that pending elements are independent among
    * {@link AppliedPTransform AppliedPTransforms} that consume the same input {@link PCollection}.
    */