You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@beam.apache.org by ke...@apache.org on 2016/10/11 22:41:38 UTC

[1/2] incubator-beam git commit: Only remove Elements that were pending from Pending Elements

Repository: incubator-beam
Updated Branches:
  refs/heads/master 9fab4ba51 -> 7c2124ba4


Only remove Elements that were pending from Pending Elements

Update the comparator so elements will only be removed from the
collection of pending elements if they were formerly added. This ensures
that elements cannot be removed if a timer delivery has an identical
timestamp to them.


Project: http://git-wip-us.apache.org/repos/asf/incubator-beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-beam/commit/da57ae81
Tree: http://git-wip-us.apache.org/repos/asf/incubator-beam/tree/da57ae81
Diff: http://git-wip-us.apache.org/repos/asf/incubator-beam/diff/da57ae81

Branch: refs/heads/master
Commit: da57ae814bdc9c8d0661c27906f4a6e3c9e7474f
Parents: ecbc641
Author: Thomas Groh <tg...@google.com>
Authored: Fri Oct 7 15:50:05 2016 -0700
Committer: Thomas Groh <tg...@google.com>
Committed: Fri Oct 7 15:50:05 2016 -0700

----------------------------------------------------------------------
 .../beam/runners/direct/WatermarkManager.java   | 31 +++++------------
 .../runners/direct/WatermarkManagerTest.java    | 36 ++++++++++++++++++++
 2 files changed, 45 insertions(+), 22 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/da57ae81/runners/direct-java/src/main/java/org/apache/beam/runners/direct/WatermarkManager.java
----------------------------------------------------------------------
diff --git a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/WatermarkManager.java b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/WatermarkManager.java
index b3d1fc5..4792c39 100644
--- a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/WatermarkManager.java
+++ b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/WatermarkManager.java
@@ -21,10 +21,8 @@ import static com.google.common.base.Preconditions.checkNotNull;
 
 import com.google.auto.value.AutoValue;
 import com.google.common.annotations.VisibleForTesting;
-import com.google.common.base.Function;
 import com.google.common.base.MoreObjects;
 import com.google.common.collect.ComparisonChain;
-import com.google.common.collect.FluentIterable;
 import com.google.common.collect.ImmutableList;
 import com.google.common.collect.ImmutableSet;
 import com.google.common.collect.Iterables;
@@ -210,7 +208,13 @@ public class WatermarkManager {
 
     public AppliedPTransformInputWatermark(Collection<? extends Watermark> inputWatermarks) {
       this.inputWatermarks = inputWatermarks;
-      this.pendingElements = TreeMultiset.create(new WindowedValueByTimestampComparator());
+      // The ordering must order elements by timestamp, and must not compare two distinct elements
+      // as equal. This is built on the assumption that any element added as a pending element will
+      // be consumed without modifications.
+      Ordering<WindowedValue<?>> pendingElementComparator =
+          new WindowedValueByTimestampComparator().compound(Ordering.arbitrary());
+      this.pendingElements =
+          TreeMultiset.create(pendingElementComparator);
       this.objectTimers = new HashMap<>();
       currentWatermark = new AtomicReference<>(BoundedWindow.TIMESTAMP_MIN_VALUE);
     }
@@ -626,19 +630,6 @@ public class WatermarkManager {
   private static final Ordering<Instant> INSTANT_ORDERING = Ordering.natural();
 
   /**
-   * A function that takes a WindowedValue and returns the exploded representation of that
-   * {@link WindowedValue}.
-   */
-  private static final Function<WindowedValue<?>, ? extends Iterable<? extends WindowedValue<?>>>
-      EXPLODE_WINDOWS_FN =
-          new Function<WindowedValue<?>, Iterable<? extends WindowedValue<?>>>() {
-            @Override
-            public Iterable<? extends WindowedValue<?>> apply(WindowedValue<?> input) {
-              return input.explodeWindows();
-            }
-          };
-
-  /**
    * For each (Object, PriorityQueue) pair in the provided map, remove each Timer that is before the
    * latestTime argument and put in in the result with the same key, then remove all of the keys
    * which have no more pending timers.
@@ -1128,19 +1119,15 @@ public class WatermarkManager {
     }
 
     private void removePending(CommittedBundle<?> bundle) {
-      inputWatermark.removePendingElements(elementsFromBundle(bundle));
+      inputWatermark.removePendingElements(bundle.getElements());
       synchronizedProcessingInputWatermark.removePending(bundle);
     }
 
     private void addPending(CommittedBundle<?> bundle) {
-      inputWatermark.addPendingElements(elementsFromBundle(bundle));
+      inputWatermark.addPendingElements(bundle.getElements());
       synchronizedProcessingInputWatermark.addPending(bundle);
     }
 
-    private Iterable<? extends WindowedValue<?>> elementsFromBundle(CommittedBundle<?> bundle) {
-      return FluentIterable.from(bundle.getElements()).transformAndConcat(EXPLODE_WINDOWS_FN);
-    }
-
     private Map<StructuralKey<?>, FiredTimers> extractFiredTimers() {
       Map<StructuralKey<?>, List<TimerData>> eventTimeTimers =
           inputWatermark.extractFiredEventTimeTimers();

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/da57ae81/runners/direct-java/src/test/java/org/apache/beam/runners/direct/WatermarkManagerTest.java
----------------------------------------------------------------------
diff --git a/runners/direct-java/src/test/java/org/apache/beam/runners/direct/WatermarkManagerTest.java b/runners/direct-java/src/test/java/org/apache/beam/runners/direct/WatermarkManagerTest.java
index a722b49..8a27243 100644
--- a/runners/direct-java/src/test/java/org/apache/beam/runners/direct/WatermarkManagerTest.java
+++ b/runners/direct-java/src/test/java/org/apache/beam/runners/direct/WatermarkManagerTest.java
@@ -563,6 +563,7 @@ public class WatermarkManagerTest implements Serializable {
         .add(second)
         .add(third)
         .commit(clock.now());
+
     manager.updateWatermarks(null,
         TimerUpdate.empty(),
         result(createdInts.getProducingTransformInternal(),
@@ -585,6 +586,41 @@ public class WatermarkManagerTest implements Serializable {
         keyedWatermarks.getInputWatermark(), not(laterThan(new Instant(-1000L))));
   }
 
+  @Test
+  public void updateWatermarkWithCompletedElementsNotPending() {
+    WindowedValue<Integer> first = WindowedValue.timestampedValueInGlobalWindow(1, new Instant(22));
+    CommittedBundle<Integer> createdBundle = bundleFactory.createBundle(createdInts)
+        .add(first)
+        .commit(clock.now());
+
+    WindowedValue<Integer> second =
+        WindowedValue.timestampedValueInGlobalWindow(2, new Instant(22));
+    CommittedBundle<Integer> neverCreatedBundle = bundleFactory.createBundle(createdInts)
+        .add(second)
+        .commit(clock.now());
+
+    manager.updateWatermarks(null,
+        TimerUpdate.empty(),
+        result(createdInts.getProducingTransformInternal(),
+            null,
+            Collections.<CommittedBundle<?>>singleton(createdBundle)),
+        BoundedWindow.TIMESTAMP_MAX_VALUE);
+
+    manager.updateWatermarks(
+        neverCreatedBundle,
+        TimerUpdate.empty(),
+        result(
+            filtered.getProducingTransformInternal(),
+            neverCreatedBundle.withElements(Collections.<WindowedValue<Integer>>emptyList()),
+            Collections.<CommittedBundle<?>>emptyList()),
+        BoundedWindow.TIMESTAMP_MAX_VALUE);
+
+    manager.refreshAll();
+    TransformWatermarks filteredWms =
+        manager.getWatermarks(filtered.getProducingTransformInternal());
+    assertThat(filteredWms.getInputWatermark(), equalTo(new Instant(22L)));
+  }
+
   /**
    * Demonstrates that updateWatermarks in the presence of late data is monotonic.
    */


[2/2] incubator-beam git commit: This closes #1069

Posted by ke...@apache.org.
This closes #1069


Project: http://git-wip-us.apache.org/repos/asf/incubator-beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-beam/commit/7c2124ba
Tree: http://git-wip-us.apache.org/repos/asf/incubator-beam/tree/7c2124ba
Diff: http://git-wip-us.apache.org/repos/asf/incubator-beam/diff/7c2124ba

Branch: refs/heads/master
Commit: 7c2124ba4a83add6c0c3c8caff85e6633792033a
Parents: 9fab4ba da57ae8
Author: Kenneth Knowles <kl...@google.com>
Authored: Tue Oct 11 15:41:26 2016 -0700
Committer: Kenneth Knowles <kl...@google.com>
Committed: Tue Oct 11 15:41:26 2016 -0700

----------------------------------------------------------------------
 .../beam/runners/direct/WatermarkManager.java   | 31 +++++------------
 .../runners/direct/WatermarkManagerTest.java    | 36 ++++++++++++++++++++
 2 files changed, 45 insertions(+), 22 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/7c2124ba/runners/direct-java/src/main/java/org/apache/beam/runners/direct/WatermarkManager.java
----------------------------------------------------------------------