You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@beam.apache.org by tg...@apache.org on 2016/11/07 23:48:31 UTC

[2/3] incubator-beam git commit: Track Minimum Element Timestamps within Bundles

Track Minimum Element Timestamps within Bundles

This allows the Watermark Manager to track pending elements by bundles
of elements rather than per-element, which significantly reduces the
amount of work done per-element to track watermarks.


Project: http://git-wip-us.apache.org/repos/asf/incubator-beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-beam/commit/a58f1eba
Tree: http://git-wip-us.apache.org/repos/asf/incubator-beam/tree/a58f1eba
Diff: http://git-wip-us.apache.org/repos/asf/incubator-beam/diff/a58f1eba

Branch: refs/heads/master
Commit: a58f1eba1a3b5ae453f4d65ad409785cb717b2ae
Parents: 317b5e6
Author: Thomas Groh <tg...@google.com>
Authored: Fri Nov 4 12:54:00 2016 -0700
Committer: Thomas Groh <tg...@google.com>
Committed: Mon Nov 7 15:47:02 2016 -0800

----------------------------------------------------------------------
 .../beam/runners/direct/DirectRunner.java       |  5 ++
 .../direct/ImmutableListBundleFactory.java      | 21 ++++++++-
 .../beam/runners/direct/WatermarkManager.java   | 48 +++++++++-----------
 .../direct/ImmutableListBundleFactoryTest.java  | 15 +++++-
 4 files changed, 60 insertions(+), 29 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/a58f1eba/runners/direct-java/src/main/java/org/apache/beam/runners/direct/DirectRunner.java
----------------------------------------------------------------------
diff --git a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/DirectRunner.java b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/DirectRunner.java
index 44d1986..4d5a449 100644
--- a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/DirectRunner.java
+++ b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/DirectRunner.java
@@ -151,6 +151,11 @@ public class DirectRunner
     Iterable<WindowedValue<T>> getElements();
 
     /**
+     * Returns the minimum timestamp among all of the elements of this {@link CommittedBundle}.
+     */
+    Instant getMinTimestamp();
+
+    /**
      * Returns the processing time output watermark at the time the producing {@link PTransform}
      * committed this bundle. Downstream synchronized processing time watermarks cannot progress
      * past this point before consuming this bundle.

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/a58f1eba/runners/direct-java/src/main/java/org/apache/beam/runners/direct/ImmutableListBundleFactory.java
----------------------------------------------------------------------
diff --git a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/ImmutableListBundleFactory.java b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/ImmutableListBundleFactory.java
index abc6dd8..6b342d6 100644
--- a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/ImmutableListBundleFactory.java
+++ b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/ImmutableListBundleFactory.java
@@ -24,6 +24,7 @@ import com.google.common.collect.ImmutableList;
 import javax.annotation.Nullable;
 import org.apache.beam.runners.direct.DirectRunner.CommittedBundle;
 import org.apache.beam.runners.direct.DirectRunner.UncommittedBundle;
+import org.apache.beam.sdk.transforms.windowing.BoundedWindow;
 import org.apache.beam.sdk.util.WindowedValue;
 import org.apache.beam.sdk.values.PCollection;
 import org.joda.time.Instant;
@@ -64,6 +65,7 @@ class ImmutableListBundleFactory implements BundleFactory {
     private final StructuralKey<?> key;
     private boolean committed = false;
     private ImmutableList.Builder<WindowedValue<T>> elements;
+    private Instant minSoFar = BoundedWindow.TIMESTAMP_MAX_VALUE;
 
     /**
      * Create a new {@link UncommittedImmutableListBundle} for the specified {@link PCollection}.
@@ -93,6 +95,9 @@ class ImmutableListBundleFactory implements BundleFactory {
           element,
           pcollection);
       elements.add(element);
+      if (element.getTimestamp().isBefore(minSoFar)) {
+        minSoFar = element.getTimestamp();
+      }
       return this;
     }
 
@@ -102,7 +107,7 @@ class ImmutableListBundleFactory implements BundleFactory {
       committed = true;
       final Iterable<WindowedValue<T>> committedElements = elements.build();
       return CommittedImmutableListBundle.create(
-          pcollection, key, committedElements, synchronizedCompletionTime);
+          pcollection, key, committedElements, minSoFar, synchronizedCompletionTime);
     }
   }
 
@@ -112,9 +117,10 @@ class ImmutableListBundleFactory implements BundleFactory {
         @Nullable PCollection<T> pcollection,
         StructuralKey<?> key,
         Iterable<WindowedValue<T>> committedElements,
+        Instant minElementTimestamp,
         Instant synchronizedCompletionTime) {
       return new AutoValue_ImmutableListBundleFactory_CommittedImmutableListBundle<>(
-          pcollection, key, committedElements, synchronizedCompletionTime);
+          pcollection, key, committedElements, minElementTimestamp, synchronizedCompletionTime);
     }
 
     @Override
@@ -123,6 +129,7 @@ class ImmutableListBundleFactory implements BundleFactory {
           getPCollection(),
           getKey(),
           ImmutableList.copyOf(elements),
+          minTimestamp(elements),
           getSynchronizedProcessingOutputWatermark());
     }
 
@@ -136,4 +143,14 @@ class ImmutableListBundleFactory implements BundleFactory {
       return this == obj;
     }
   }
+
+  private static Instant minTimestamp(Iterable<? extends WindowedValue<?>> elements) {
+    Instant minTs = BoundedWindow.TIMESTAMP_MAX_VALUE;
+    for (WindowedValue<?> element : elements) {
+      if (element.getTimestamp().isBefore(minTs)) {
+        minTs = element.getTimestamp();
+      }
+    }
+    return minTs;
+  }
 }

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/a58f1eba/runners/direct-java/src/main/java/org/apache/beam/runners/direct/WatermarkManager.java
----------------------------------------------------------------------
diff --git a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/WatermarkManager.java b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/WatermarkManager.java
index 2228cd5..f235af0 100644
--- a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/WatermarkManager.java
+++ b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/WatermarkManager.java
@@ -27,8 +27,6 @@ import com.google.common.collect.ImmutableList;
 import com.google.common.collect.ImmutableSet;
 import com.google.common.collect.Iterables;
 import com.google.common.collect.Ordering;
-import com.google.common.collect.SortedMultiset;
-import com.google.common.collect.TreeMultiset;
 import java.util.ArrayList;
 import java.util.Collection;
 import java.util.Collections;
@@ -55,7 +53,6 @@ import org.apache.beam.sdk.transforms.windowing.BoundedWindow;
 import org.apache.beam.sdk.util.TimeDomain;
 import org.apache.beam.sdk.util.TimerInternals;
 import org.apache.beam.sdk.util.TimerInternals.TimerData;
-import org.apache.beam.sdk.util.WindowedValue;
 import org.apache.beam.sdk.values.PCollection;
 import org.apache.beam.sdk.values.PValue;
 import org.joda.time.Instant;
@@ -123,6 +120,10 @@ import org.joda.time.Instant;
  * </pre>
  */
 public class WatermarkManager {
+  // The number of updates to apply in #tryApplyPendingUpdates
+  private static final int MAX_INCREMENTAL_UPDATES = 10;
+
+
   /**
    * The watermark of some {@link Pipeline} element, usually a {@link PTransform} or a
    * {@link PCollection}.
@@ -203,7 +204,7 @@ public class WatermarkManager {
    */
   private static class AppliedPTransformInputWatermark implements Watermark {
     private final Collection<? extends Watermark> inputWatermarks;
-    private final SortedMultiset<WindowedValue<?>> pendingElements;
+    private final NavigableSet<CommittedBundle<?>> pendingElements;
     private final Map<StructuralKey<?>, NavigableSet<TimerData>> objectTimers;
 
     private AtomicReference<Instant> currentWatermark;
@@ -213,10 +214,10 @@ public class WatermarkManager {
       // The ordering must order elements by timestamp, and must not compare two distinct elements
       // as equal. This is built on the assumption that any element added as a pending element will
       // be consumed without modifications.
-      Ordering<WindowedValue<?>> pendingElementComparator =
-          new WindowedValueByTimestampComparator().compound(Ordering.arbitrary());
+      Ordering<CommittedBundle<?>> pendingBundleComparator =
+          new BundleByElementTimestampComparator().compound(Ordering.arbitrary());
       this.pendingElements =
-          TreeMultiset.create(pendingElementComparator);
+          new TreeSet<>(pendingBundleComparator);
       this.objectTimers = new HashMap<>();
       currentWatermark = new AtomicReference<>(BoundedWindow.TIMESTAMP_MIN_VALUE);
     }
@@ -249,25 +250,20 @@ public class WatermarkManager {
         minInputWatermark = INSTANT_ORDERING.min(minInputWatermark, inputWatermark.get());
       }
       if (!pendingElements.isEmpty()) {
-        minInputWatermark = INSTANT_ORDERING.min(
-            minInputWatermark, pendingElements.firstEntry().getElement().getTimestamp());
+        minInputWatermark =
+            INSTANT_ORDERING.min(minInputWatermark, pendingElements.first().getMinTimestamp());
       }
       Instant newWatermark = INSTANT_ORDERING.max(oldWatermark, minInputWatermark);
       currentWatermark.set(newWatermark);
       return WatermarkUpdate.fromTimestamps(oldWatermark, newWatermark);
     }
 
-    private synchronized void addPendingElements(Iterable<? extends WindowedValue<?>> newPending) {
-      for (WindowedValue<?> pendingElement : newPending) {
-        pendingElements.add(pendingElement);
-      }
+    private synchronized void addPending(CommittedBundle<?> newPending) {
+      pendingElements.add(newPending);
     }
 
-    private synchronized void removePendingElements(
-        Iterable<? extends WindowedValue<?>> finishedElements) {
-      for (WindowedValue<?> finishedElement : finishedElements) {
-        pendingElements.remove(finishedElement);
-      }
+    private synchronized void removePending(CommittedBundle<?> completed) {
+      pendingElements.remove(completed);
     }
 
     private synchronized void updateTimers(TimerUpdate update) {
@@ -856,7 +852,7 @@ public class WatermarkManager {
   private void tryApplyPendingUpdates() {
     if (refreshLock.tryLock()) {
       try {
-        applyNUpdates(10);
+        applyNUpdates(MAX_INCREMENTAL_UPDATES);
       } finally {
         refreshLock.unlock();
       }
@@ -867,7 +863,7 @@ public class WatermarkManager {
    * Applies all pending updates to this {@link WatermarkManager}, causing the pending state
    * of all {@link TransformWatermarks} to be advanced as far as possible.
    */
-  private void applyPendingUpdates() {
+  private void applyAllPendingUpdates() {
     refreshLock.lock();
     try {
       applyNUpdates(-1);
@@ -944,7 +940,7 @@ public class WatermarkManager {
   synchronized void refreshAll() {
     refreshLock.lock();
     try {
-      applyPendingUpdates();
+      applyAllPendingUpdates();
       Set<AppliedPTransform<?, ?, ?>> toRefresh = pendingRefreshes;
       while (!toRefresh.isEmpty()) {
         toRefresh = refreshAllOf(toRefresh);
@@ -1180,12 +1176,12 @@ public class WatermarkManager {
     }
 
     private void removePending(CommittedBundle<?> bundle) {
-      inputWatermark.removePendingElements(bundle.getElements());
+      inputWatermark.removePending(bundle);
       synchronizedProcessingInputWatermark.removePending(bundle);
     }
 
     private void addPending(CommittedBundle<?> bundle) {
-      inputWatermark.addPendingElements(bundle.getElements());
+      inputWatermark.addPending(bundle);
       synchronizedProcessingInputWatermark.addPending(bundle);
     }
 
@@ -1434,11 +1430,11 @@ public class WatermarkManager {
     }
   }
 
-  private static class WindowedValueByTimestampComparator extends Ordering<WindowedValue<?>> {
+  private static class BundleByElementTimestampComparator extends Ordering<CommittedBundle<?>> {
     @Override
-    public int compare(WindowedValue<?> o1, WindowedValue<?> o2) {
+    public int compare(CommittedBundle<?> o1, CommittedBundle<?> o2) {
       return ComparisonChain.start()
-          .compare(o1.getTimestamp(), o2.getTimestamp())
+          .compare(o1.getMinTimestamp(), o2.getMinTimestamp())
           .result();
     }
   }

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/a58f1eba/runners/direct-java/src/test/java/org/apache/beam/runners/direct/ImmutableListBundleFactoryTest.java
----------------------------------------------------------------------
diff --git a/runners/direct-java/src/test/java/org/apache/beam/runners/direct/ImmutableListBundleFactoryTest.java b/runners/direct-java/src/test/java/org/apache/beam/runners/direct/ImmutableListBundleFactoryTest.java
index 4a7477f..a36c408 100644
--- a/runners/direct-java/src/test/java/org/apache/beam/runners/direct/ImmutableListBundleFactoryTest.java
+++ b/runners/direct-java/src/test/java/org/apache/beam/runners/direct/ImmutableListBundleFactoryTest.java
@@ -19,6 +19,7 @@ package org.apache.beam.runners.direct;
 
 import static org.hamcrest.Matchers.containsInAnyOrder;
 import static org.hamcrest.Matchers.equalTo;
+import static org.hamcrest.Matchers.not;
 import static org.junit.Assert.assertThat;
 
 import com.google.common.collect.ImmutableList;
@@ -35,6 +36,7 @@ import org.apache.beam.sdk.coders.VoidCoder;
 import org.apache.beam.sdk.testing.TestPipeline;
 import org.apache.beam.sdk.transforms.Create;
 import org.apache.beam.sdk.transforms.WithKeys;
+import org.apache.beam.sdk.transforms.windowing.BoundedWindow;
 import org.apache.beam.sdk.transforms.windowing.IntervalWindow;
 import org.apache.beam.sdk.transforms.windowing.PaneInfo;
 import org.apache.beam.sdk.util.WindowedValue;
@@ -95,15 +97,25 @@ public class ImmutableListBundleFactoryTest {
   afterCommitGetElementsShouldHaveAddedElements(Iterable<WindowedValue<T>> elems) {
     UncommittedBundle<T> bundle = bundleFactory.createRootBundle();
     Collection<Matcher<? super WindowedValue<T>>> expectations = new ArrayList<>();
+    Instant minElementTs = BoundedWindow.TIMESTAMP_MAX_VALUE;
     for (WindowedValue<T> elem : elems) {
       bundle.add(elem);
       expectations.add(equalTo(elem));
+      if (elem.getTimestamp().isBefore(minElementTs)) {
+        minElementTs = elem.getTimestamp();
+      }
     }
     Matcher<Iterable<? extends WindowedValue<T>>> containsMatcher =
         Matchers.<WindowedValue<T>>containsInAnyOrder(expectations);
-    CommittedBundle<T> committed = bundle.commit(Instant.now());
+    Instant commitTime = Instant.now();
+    CommittedBundle<T> committed = bundle.commit(commitTime);
     assertThat(committed.getElements(), containsMatcher);
 
+    // Sanity check that the test is meaningful.
+    assertThat(minElementTs, not(equalTo(commitTime)));
+    assertThat(committed.getMinTimestamp(), equalTo(minElementTs));
+    assertThat(committed.getSynchronizedProcessingOutputWatermark(), equalTo(commitTime));
+
     return committed;
   }
 
@@ -149,6 +161,7 @@ public class ImmutableListBundleFactoryTest {
     assertThat(
         withed.getSynchronizedProcessingOutputWatermark(),
         equalTo(committed.getSynchronizedProcessingOutputWatermark()));
+    assertThat(withed.getMinTimestamp(), equalTo(new Instant(2048L)));
   }
 
   @Test