You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@beam.apache.org by bc...@apache.org on 2018/03/10 01:07:55 UTC

[beam] branch master updated: [BEAM-3806] Fix direct-runner hang (#4829)

This is an automated email from the ASF dual-hosted git repository.

bchambers pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/beam.git


The following commit(s) were added to refs/heads/master by this push:
     new 2180974  [BEAM-3806] Fix direct-runner hang (#4829)
2180974 is described below

commit 218097454f69288aa68e457ce0f9ea2e40dbb86b
Author: Ben Chambers <bj...@users.noreply.github.com>
AuthorDate: Fri Mar 9 17:07:51 2018 -0800

    [BEAM-3806] Fix direct-runner hang (#4829)
    
    Fix handling of timers in WatermarkManager
    
    1. TimerUpdates should use an ordered collection so that the most recent
       timestamp set by user code is the one taken throughout the system.
    2. Application of TimerUpdates needs to keep all three data structures
       in sync.
    3. Add unit tests that the data structures are consistent.
    
    Added ParDoTest hangs when run with the DirectRunner prior to the other changes.
---
 .../beam/runners/direct/WatermarkManager.java      | 38 ++++++++-------
 .../beam/runners/direct/WatermarkManagerTest.java  | 54 ++++++++++++++++++++++
 .../org/apache/beam/sdk/transforms/ParDoTest.java  | 33 +++++++++++++
 3 files changed, 108 insertions(+), 17 deletions(-)

diff --git a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/WatermarkManager.java b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/WatermarkManager.java
index b01f166..747a667 100644
--- a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/WatermarkManager.java
+++ b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/WatermarkManager.java
@@ -26,7 +26,6 @@ import com.google.common.base.MoreObjects;
 import com.google.common.collect.ComparisonChain;
 import com.google.common.collect.HashBasedTable;
 import com.google.common.collect.ImmutableList;
-import com.google.common.collect.ImmutableSet;
 import com.google.common.collect.Iterables;
 import com.google.common.collect.Ordering;
 import com.google.common.collect.SortedMultiset;
@@ -39,6 +38,7 @@ import java.util.Collections;
 import java.util.EnumMap;
 import java.util.HashMap;
 import java.util.HashSet;
+import java.util.LinkedHashSet;
 import java.util.List;
 import java.util.Map;
 import java.util.NavigableSet;
@@ -143,7 +143,7 @@ class WatermarkManager {
    * timestamp which indicates we have received all of the data and there will be no more on-time or
    * late data. This value is represented by {@link WatermarkManager#THE_END_OF_TIME}.
    */
-  private interface Watermark {
+  @VisibleForTesting interface Watermark {
     /**
      * Returns the current value of this watermark.
      */
@@ -211,13 +211,13 @@ class WatermarkManager {
    *
    * <p>See {@link #refresh()} for more information.
    */
-  private static class AppliedPTransformInputWatermark implements Watermark {
+  @VisibleForTesting static class AppliedPTransformInputWatermark implements Watermark {
     private final Collection<? extends Watermark> inputWatermarks;
     private final SortedMultiset<CommittedBundle<?>> pendingElements;
 
     // This tracks only the quantity of timers at each timestamp, for quickly getting the cross-key
     // minimum
-    private final SortedMultiset<Instant> pendingTimers;
+    private final SortedMultiset<TimerData> pendingTimers;
 
     // Entries in this table represent the authoritative timestamp for which
     // a per-key-and-StateNamespace timer is set.
@@ -290,15 +290,15 @@ class WatermarkManager {
       pendingElements.remove(completed);
     }
 
-    private synchronized Instant getEarliestTimerTimestamp() {
+    @VisibleForTesting synchronized Instant getEarliestTimerTimestamp() {
       if (pendingTimers.isEmpty()) {
         return BoundedWindow.TIMESTAMP_MAX_VALUE;
       } else {
-        return pendingTimers.firstEntry().getElement();
+        return pendingTimers.firstEntry().getElement().getTimestamp();
       }
     }
 
-    private synchronized void updateTimers(TimerUpdate update) {
+    @VisibleForTesting synchronized void updateTimers(TimerUpdate update) {
       NavigableSet<TimerData> keyTimers =
           objectTimers.computeIfAbsent(update.key, k -> new TreeSet<>());
       Table<StateNamespace, String, TimerData> existingTimersForKey =
@@ -311,10 +311,12 @@ class WatermarkManager {
               existingTimersForKey.get(timer.getNamespace(), timer.getTimerId());
 
           if (existingTimer == null) {
-            pendingTimers.add(timer.getTimestamp());
+            pendingTimers.add(timer);
             keyTimers.add(timer);
           } else if (!existingTimer.equals(timer)) {
+            pendingTimers.remove(existingTimer);
             keyTimers.remove(existingTimer);
+            pendingTimers.add(timer);
             keyTimers.add(timer);
           } // else the timer is already set identically, so noop
 
@@ -329,7 +331,7 @@ class WatermarkManager {
               existingTimersForKey.get(timer.getNamespace(), timer.getTimerId());
 
           if (existingTimer != null) {
-            pendingTimers.remove(existingTimer.getTimestamp());
+            pendingTimers.remove(existingTimer);
             keyTimers.remove(existingTimer);
             existingTimersForKey.remove(existingTimer.getNamespace(), existingTimer.getTimerId());
           }
@@ -338,12 +340,14 @@ class WatermarkManager {
 
       for (TimerData timer : update.getCompletedTimers()) {
         if (TimeDomain.EVENT_TIME.equals(timer.getDomain())) {
-          pendingTimers.remove(timer.getTimestamp());
+          keyTimers.remove(timer);
+          pendingTimers.remove(timer);
         }
       }
     }
 
-    private synchronized Map<StructuralKey<?>, List<TimerData>> extractFiredEventTimeTimers() {
+    @VisibleForTesting
+    synchronized Map<StructuralKey<?>, List<TimerData>> extractFiredEventTimeTimers() {
       return extractFiredTimers(currentWatermark.get(), objectTimers);
     }
 
@@ -1363,9 +1367,9 @@ class WatermarkManager {
 
       private TimerUpdateBuilder(StructuralKey<?> key) {
         this.key = key;
-        this.completedTimers = new HashSet<>();
-        this.setTimers = new HashSet<>();
-        this.deletedTimers = new HashSet<>();
+        this.completedTimers = new LinkedHashSet<>();
+        this.setTimers = new LinkedHashSet<>();
+        this.deletedTimers = new LinkedHashSet<>();
       }
 
       /**
@@ -1409,9 +1413,9 @@ class WatermarkManager {
       public TimerUpdate build() {
         return new TimerUpdate(
             key,
-            ImmutableSet.copyOf(completedTimers),
-            ImmutableSet.copyOf(setTimers),
-            ImmutableSet.copyOf(deletedTimers));
+            ImmutableList.copyOf(completedTimers),
+            ImmutableList.copyOf(setTimers),
+            ImmutableList.copyOf(deletedTimers));
       }
     }
 
diff --git a/runners/direct-java/src/test/java/org/apache/beam/runners/direct/WatermarkManagerTest.java b/runners/direct-java/src/test/java/org/apache/beam/runners/direct/WatermarkManagerTest.java
index c0ad157..4f6d4da 100644
--- a/runners/direct-java/src/test/java/org/apache/beam/runners/direct/WatermarkManagerTest.java
+++ b/runners/direct-java/src/test/java/org/apache/beam/runners/direct/WatermarkManagerTest.java
@@ -19,10 +19,14 @@ package org.apache.beam.runners.direct;
 
 import static org.hamcrest.Matchers.contains;
 import static org.hamcrest.Matchers.containsInAnyOrder;
+import static org.hamcrest.Matchers.empty;
 import static org.hamcrest.Matchers.emptyIterable;
 import static org.hamcrest.Matchers.equalTo;
 import static org.hamcrest.Matchers.not;
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertNotNull;
 import static org.junit.Assert.assertThat;
+import static org.mockito.Mockito.when;
 
 import com.google.common.base.Optional;
 import com.google.common.collect.ImmutableList;
@@ -32,15 +36,18 @@ import java.io.Serializable;
 import java.util.Collection;
 import java.util.Collections;
 import java.util.EnumSet;
+import java.util.List;
 import java.util.Map;
 import javax.annotation.Nullable;
 import org.apache.beam.runners.core.StateNamespaces;
 import org.apache.beam.runners.core.TimerInternals.TimerData;
 import org.apache.beam.runners.direct.CommittedResult.OutputType;
+import org.apache.beam.runners.direct.WatermarkManager.AppliedPTransformInputWatermark;
 import org.apache.beam.runners.direct.WatermarkManager.FiredTimers;
 import org.apache.beam.runners.direct.WatermarkManager.TimerUpdate;
 import org.apache.beam.runners.direct.WatermarkManager.TimerUpdate.TimerUpdateBuilder;
 import org.apache.beam.runners.direct.WatermarkManager.TransformWatermarks;
+import org.apache.beam.runners.direct.WatermarkManager.Watermark;
 import org.apache.beam.runners.local.StructuralKey;
 import org.apache.beam.sdk.coders.ByteArrayCoder;
 import org.apache.beam.sdk.coders.StringUtf8Coder;
@@ -76,6 +83,7 @@ import org.junit.Test;
 import org.junit.rules.ExpectedException;
 import org.junit.runner.RunWith;
 import org.junit.runners.JUnit4;
+import org.mockito.Mockito;
 
 /**
  * Tests for {@link WatermarkManager}.
@@ -1476,6 +1484,52 @@ public class WatermarkManagerTest implements Serializable {
   }
 
   @Test
+  public void inputWatermarkDuplicates() {
+    Watermark mockWatermark = Mockito.mock(Watermark.class);
+
+    AppliedPTransformInputWatermark underTest =
+        new AppliedPTransformInputWatermark(ImmutableList.of(mockWatermark));
+
+    // Refresh
+    when(mockWatermark.get()).thenReturn(new Instant(0));
+    underTest.refresh();
+    assertEquals(new Instant(0), underTest.get());
+
+    // Apply a timer update
+    StructuralKey<String> key = StructuralKey.of("key", StringUtf8Coder.of());
+    TimerData timer1 = TimerData
+        .of("a", StateNamespaces.global(), new Instant(100), TimeDomain.EVENT_TIME);
+    TimerData timer2 = TimerData
+        .of("a", StateNamespaces.global(), new Instant(200), TimeDomain.EVENT_TIME);
+    underTest.updateTimers(TimerUpdate.builder(key).setTimer(timer1).setTimer(timer2).build());
+
+    // Only the last timer update should be observable
+    assertEquals(timer2.getTimestamp(), underTest.getEarliestTimerTimestamp());
+
+    // Advance the input watermark
+    when(mockWatermark.get()).thenReturn(new Instant(1000));
+    underTest.refresh();
+    assertEquals(new Instant(1000), underTest.get()); // input watermark is not held by timers
+
+    // Examine the fired event time timers
+    Map<StructuralKey<?>, List<TimerData>> fired = underTest.extractFiredEventTimeTimers();
+    List<TimerData> timers = fired.get(key);
+    assertNotNull(timers);
+    assertThat(timers, contains(timer2));
+
+    // Update based on timer firings
+    underTest.updateTimers(TimerUpdate.builder(key)
+        .withCompletedTimers(timers).build());
+
+    // Now we should be able to advance
+    assertEquals(BoundedWindow.TIMESTAMP_MAX_VALUE, underTest.getEarliestTimerTimestamp());
+
+    // Nothing left to fire
+    fired = underTest.extractFiredEventTimeTimers();
+    assertThat(fired.entrySet(), empty());
+  }
+
+  @Test
   public void timerUpdateBuilderBuildAddsAllAddedTimers() {
     TimerData set = TimerData.of(StateNamespaces.global(), new Instant(10L), TimeDomain.EVENT_TIME);
     TimerData deleted =
diff --git a/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/ParDoTest.java b/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/ParDoTest.java
index f6f05ad..b5026b0 100644
--- a/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/ParDoTest.java
+++ b/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/ParDoTest.java
@@ -3353,4 +3353,37 @@ public class ParDoTest implements Serializable {
 
     pipeline.run();
   }
+
+  @Test
+  @Category(ValidatesRunner.class)
+  public void duplicateTimerSetting() {
+    TestStream<KV<String, String>> stream = TestStream
+        .create(KvCoder.of(StringUtf8Coder.of(), StringUtf8Coder.of()))
+        .addElements(KV.of("key1", "v1"))
+        .advanceWatermarkToInfinity();
+
+    PCollection<String> result = pipeline
+        .apply(stream)
+        .apply(ParDo.of(new TwoTimerDoFn()));
+    PAssert.that(result).containsInAnyOrder("It works");
+
+    pipeline.run().waitUntilFinish();
+  }
+
+  private static class TwoTimerDoFn extends DoFn<KV<String, String>, String> {
+    @TimerId("timer")
+    private final TimerSpec timer = TimerSpecs.timer(TimeDomain.EVENT_TIME);
+
+    @ProcessElement
+    public void process(ProcessContext c,
+        @TimerId("timer") Timer timer) {
+      timer.offset(Duration.standardMinutes(10)).setRelative();
+      timer.offset(Duration.standardMinutes(30)).setRelative();
+    }
+
+    @OnTimer("timer")
+    public void onTimer(OnTimerContext c, @TimerId("timer") Timer timer) {
+      c.output("It works");
+    }
+  }
 }

-- 
To stop receiving notification emails like this one, please contact
bchambers@apache.org.