You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@beam.apache.org by ke...@apache.org on 2017/01/03 20:55:43 UTC

[2/3] beam git commit: Deliver timers in the direct runner

Deliver timers in the direct runner


Project: http://git-wip-us.apache.org/repos/asf/beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/beam/commit/d4cc0c33
Tree: http://git-wip-us.apache.org/repos/asf/beam/tree/d4cc0c33
Diff: http://git-wip-us.apache.org/repos/asf/beam/diff/d4cc0c33

Branch: refs/heads/master
Commit: d4cc0c33fd97d4d7a65412432548d42172b58aa0
Parents: 70ff6bf
Author: Kenneth Knowles <kl...@google.com>
Authored: Thu Dec 8 15:18:44 2016 -0800
Committer: Kenneth Knowles <kl...@google.com>
Committed: Tue Jan 3 06:52:40 2017 -0800

----------------------------------------------------------------------
 ...ecycleManagerRemovingTransformEvaluator.java |  19 +++-
 .../beam/runners/direct/ParDoEvaluator.java     |   9 ++
 .../runners/direct/ParDoEvaluatorFactory.java   |   2 +-
 .../direct/StatefulParDoEvaluatorFactory.java   |  15 ++-
 ...leManagerRemovingTransformEvaluatorTest.java | 103 +++++++++----------
 .../apache/beam/sdk/transforms/ParDoTest.java   |  74 ++++++++++++-
 6 files changed, 160 insertions(+), 62 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/beam/blob/d4cc0c33/runners/direct-java/src/main/java/org/apache/beam/runners/direct/DoFnLifecycleManagerRemovingTransformEvaluator.java
----------------------------------------------------------------------
diff --git a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/DoFnLifecycleManagerRemovingTransformEvaluator.java b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/DoFnLifecycleManagerRemovingTransformEvaluator.java
index fb13b0f..226e499 100644
--- a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/DoFnLifecycleManagerRemovingTransformEvaluator.java
+++ b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/DoFnLifecycleManagerRemovingTransformEvaluator.java
@@ -18,6 +18,8 @@
 
 package org.apache.beam.runners.direct;
 
+import org.apache.beam.sdk.transforms.windowing.BoundedWindow;
+import org.apache.beam.sdk.util.TimerInternals.TimerData;
 import org.apache.beam.sdk.util.WindowedValue;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -29,16 +31,16 @@ import org.slf4j.LoggerFactory;
 class DoFnLifecycleManagerRemovingTransformEvaluator<InputT> implements TransformEvaluator<InputT> {
   private static final Logger LOG =
       LoggerFactory.getLogger(DoFnLifecycleManagerRemovingTransformEvaluator.class);
-  private final TransformEvaluator<InputT> underlying;
+  private final ParDoEvaluator<InputT, ?> underlying;
   private final DoFnLifecycleManager lifecycleManager;
 
-  public static <InputT> TransformEvaluator<InputT> wrapping(
-      TransformEvaluator<InputT> underlying, DoFnLifecycleManager lifecycleManager) {
+  public static <InputT> DoFnLifecycleManagerRemovingTransformEvaluator<InputT> wrapping(
+      ParDoEvaluator<InputT, ?> underlying, DoFnLifecycleManager lifecycleManager) {
     return new DoFnLifecycleManagerRemovingTransformEvaluator<>(underlying, lifecycleManager);
   }
 
   private DoFnLifecycleManagerRemovingTransformEvaluator(
-      TransformEvaluator<InputT> underlying, DoFnLifecycleManager lifecycleManager) {
+      ParDoEvaluator<InputT, ?> underlying, DoFnLifecycleManager lifecycleManager) {
     this.underlying = underlying;
     this.lifecycleManager = lifecycleManager;
   }
@@ -53,6 +55,15 @@ class DoFnLifecycleManagerRemovingTransformEvaluator<InputT> implements Transfor
     }
   }
 
+  public void onTimer(TimerData timer, BoundedWindow window) throws Exception {
+    try {
+      underlying.onTimer(timer, window);
+    } catch (Exception e) {
+      onException(e, "Exception encountered while cleaning up after processing a timer");
+      throw e;
+    }
+  }
+
   @Override
   public TransformResult<InputT> finishBundle() throws Exception {
     try {

http://git-wip-us.apache.org/repos/asf/beam/blob/d4cc0c33/runners/direct-java/src/main/java/org/apache/beam/runners/direct/ParDoEvaluator.java
----------------------------------------------------------------------
diff --git a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/ParDoEvaluator.java b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/ParDoEvaluator.java
index a5de4c6..e146470 100644
--- a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/ParDoEvaluator.java
+++ b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/ParDoEvaluator.java
@@ -32,6 +32,7 @@ import org.apache.beam.runners.direct.DirectRunner.UncommittedBundle;
 import org.apache.beam.sdk.transforms.AppliedPTransform;
 import org.apache.beam.sdk.transforms.windowing.BoundedWindow;
 import org.apache.beam.sdk.util.ReadyCheckingSideInputReader;
+import org.apache.beam.sdk.util.TimerInternals.TimerData;
 import org.apache.beam.sdk.util.UserCodeException;
 import org.apache.beam.sdk.util.WindowedValue;
 import org.apache.beam.sdk.util.WindowingStrategy;
@@ -136,6 +137,14 @@ class ParDoEvaluator<InputT, OutputT> implements TransformEvaluator<InputT> {
     }
   }
 
+  public void onTimer(TimerData timer, BoundedWindow window) {
+    try {
+      fnRunner.onTimer(timer.getTimerId(), window, timer.getTimestamp(), timer.getDomain());
+    } catch (Exception e) {
+      throw UserCodeException.wrap(e);
+    }
+  }
+
   @Override
   public TransformResult<InputT> finishBundle() {
     try {

http://git-wip-us.apache.org/repos/asf/beam/blob/d4cc0c33/runners/direct-java/src/main/java/org/apache/beam/runners/direct/ParDoEvaluatorFactory.java
----------------------------------------------------------------------
diff --git a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/ParDoEvaluatorFactory.java b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/ParDoEvaluatorFactory.java
index 835e6ce..2fc19b7 100644
--- a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/ParDoEvaluatorFactory.java
+++ b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/ParDoEvaluatorFactory.java
@@ -93,7 +93,7 @@ final class ParDoEvaluatorFactory<InputT, OutputT> implements TransformEvaluator
    * correspond with the type in the unpacked {@link DoFn}, side inputs, and output tags.
    */
   @SuppressWarnings({"unchecked", "rawtypes"})
-  TransformEvaluator<InputT> createEvaluator(
+  DoFnLifecycleManagerRemovingTransformEvaluator<InputT> createEvaluator(
       AppliedPTransform<PCollection<InputT>, PCollectionTuple, ?> application,
       StructuralKey<?> inputBundleKey,
       DoFn<InputT, OutputT> doFn,

http://git-wip-us.apache.org/repos/asf/beam/blob/d4cc0c33/runners/direct-java/src/main/java/org/apache/beam/runners/direct/StatefulParDoEvaluatorFactory.java
----------------------------------------------------------------------
diff --git a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/StatefulParDoEvaluatorFactory.java b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/StatefulParDoEvaluatorFactory.java
index 003df0f..9582d5c 100644
--- a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/StatefulParDoEvaluatorFactory.java
+++ b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/StatefulParDoEvaluatorFactory.java
@@ -21,6 +21,7 @@ import com.google.auto.value.AutoValue;
 import com.google.common.cache.CacheBuilder;
 import com.google.common.cache.CacheLoader;
 import com.google.common.cache.LoadingCache;
+import com.google.common.collect.Iterables;
 import com.google.common.collect.Lists;
 import java.util.Collections;
 import org.apache.beam.runners.core.KeyedWorkItem;
@@ -36,6 +37,7 @@ import org.apache.beam.sdk.transforms.reflect.DoFnSignature;
 import org.apache.beam.sdk.transforms.reflect.DoFnSignature.StateDeclaration;
 import org.apache.beam.sdk.transforms.reflect.DoFnSignatures;
 import org.apache.beam.sdk.transforms.windowing.BoundedWindow;
+import org.apache.beam.sdk.util.TimerInternals.TimerData;
 import org.apache.beam.sdk.util.WindowedValue;
 import org.apache.beam.sdk.util.WindowingStrategy;
 import org.apache.beam.sdk.util.state.StateNamespace;
@@ -104,7 +106,7 @@ final class StatefulParDoEvaluatorFactory<K, InputT, OutputT> implements Transfo
       }
     }
 
-    TransformEvaluator<KV<K, InputT>> delegateEvaluator =
+    DoFnLifecycleManagerRemovingTransformEvaluator<KV<K, InputT>> delegateEvaluator =
         delegateFactory.createEvaluator(
             (AppliedPTransform) application,
             inputBundle.getKey(),
@@ -210,9 +212,10 @@ final class StatefulParDoEvaluatorFactory<K, InputT, OutputT> implements Transfo
   private static class StatefulParDoEvaluator<K, InputT>
       implements TransformEvaluator<KeyedWorkItem<K, KV<K, InputT>>> {
 
-    private final TransformEvaluator<KV<K, InputT>> delegateEvaluator;
+    private final DoFnLifecycleManagerRemovingTransformEvaluator<KV<K, InputT>> delegateEvaluator;
 
-    public StatefulParDoEvaluator(TransformEvaluator<KV<K, InputT>> delegateEvaluator) {
+    public StatefulParDoEvaluator(
+        DoFnLifecycleManagerRemovingTransformEvaluator<KV<K, InputT>> delegateEvaluator) {
       this.delegateEvaluator = delegateEvaluator;
     }
 
@@ -220,9 +223,15 @@ final class StatefulParDoEvaluatorFactory<K, InputT, OutputT> implements Transfo
     public void processElement(WindowedValue<KeyedWorkItem<K, KV<K, InputT>>> gbkResult)
         throws Exception {
 
+      BoundedWindow window = Iterables.getOnlyElement(gbkResult.getWindows());
+
       for (WindowedValue<KV<K, InputT>> windowedValue : gbkResult.getValue().elementsIterable()) {
         delegateEvaluator.processElement(windowedValue);
       }
+
+      for (TimerData timer : gbkResult.getValue().timersIterable()) {
+        delegateEvaluator.onTimer(timer, window);
+      }
     }
 
     @Override

http://git-wip-us.apache.org/repos/asf/beam/blob/d4cc0c33/runners/direct-java/src/test/java/org/apache/beam/runners/direct/DoFnLifecycleManagerRemovingTransformEvaluatorTest.java
----------------------------------------------------------------------
diff --git a/runners/direct-java/src/test/java/org/apache/beam/runners/direct/DoFnLifecycleManagerRemovingTransformEvaluatorTest.java b/runners/direct-java/src/test/java/org/apache/beam/runners/direct/DoFnLifecycleManagerRemovingTransformEvaluatorTest.java
index b5eec63..a9d51e8 100644
--- a/runners/direct-java/src/test/java/org/apache/beam/runners/direct/DoFnLifecycleManagerRemovingTransformEvaluatorTest.java
+++ b/runners/direct-java/src/test/java/org/apache/beam/runners/direct/DoFnLifecycleManagerRemovingTransformEvaluatorTest.java
@@ -18,26 +18,30 @@
 
 package org.apache.beam.runners.direct;
 
-import static org.hamcrest.Matchers.containsInAnyOrder;
 import static org.hamcrest.Matchers.not;
 import static org.hamcrest.Matchers.nullValue;
-import static org.hamcrest.core.Is.is;
 import static org.junit.Assert.assertThat;
 import static org.junit.Assert.fail;
+import static org.mockito.Matchers.any;
+import static org.mockito.Mockito.doThrow;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.verify;
 
-import java.util.ArrayList;
-import java.util.List;
 import org.apache.beam.sdk.transforms.DoFn;
+import org.apache.beam.sdk.transforms.windowing.BoundedWindow;
+import org.apache.beam.sdk.transforms.windowing.GlobalWindow;
+import org.apache.beam.sdk.util.TimeDomain;
+import org.apache.beam.sdk.util.TimerInternals.TimerData;
 import org.apache.beam.sdk.util.WindowedValue;
+import org.apache.beam.sdk.util.state.StateNamespaces;
 import org.hamcrest.Matchers;
+import org.joda.time.Instant;
 import org.junit.Before;
 import org.junit.Test;
 import org.junit.runner.RunWith;
 import org.junit.runners.JUnit4;
 
-/**
- * Tests for {@link DoFnLifecycleManagerRemovingTransformEvaluator}.
- */
+/** Tests for {@link DoFnLifecycleManagerRemovingTransformEvaluator}. */
 @RunWith(JUnit4.class)
 public class DoFnLifecycleManagerRemovingTransformEvaluatorTest {
   private DoFnLifecycleManager lifecycleManager;
@@ -49,24 +53,28 @@ public class DoFnLifecycleManagerRemovingTransformEvaluatorTest {
 
   @Test
   public void delegatesToUnderlying() throws Exception {
-    RecordingTransformEvaluator underlying = new RecordingTransformEvaluator();
+    ParDoEvaluator<Object, Object> underlying = mock(ParDoEvaluator.class);
     DoFn<?, ?> original = lifecycleManager.get();
     TransformEvaluator<Object> evaluator =
         DoFnLifecycleManagerRemovingTransformEvaluator.wrapping(underlying, lifecycleManager);
     WindowedValue<Object> first = WindowedValue.valueInGlobalWindow(new Object());
     WindowedValue<Object> second = WindowedValue.valueInGlobalWindow(new Object());
+
     evaluator.processElement(first);
-    assertThat(underlying.objects, containsInAnyOrder(first));
+    verify(underlying).processElement(first);
+
     evaluator.processElement(second);
-    evaluator.finishBundle();
+    verify(underlying).processElement(second);
 
-    assertThat(underlying.finishBundleCalled, is(true));
-    assertThat(underlying.objects, containsInAnyOrder(second, first));
+    evaluator.finishBundle();
+    verify(underlying).finishBundle();
   }
 
   @Test
   public void removesOnExceptionInProcessElement() throws Exception {
-    ThrowingTransformEvaluator underlying = new ThrowingTransformEvaluator();
+    ParDoEvaluator<Object, Object> underlying = mock(ParDoEvaluator.class);
+    doThrow(Exception.class).when(underlying).processElement(any(WindowedValue.class));
+
     DoFn<?, ?> original = lifecycleManager.get();
     assertThat(original, not(nullValue()));
     TransformEvaluator<Object> evaluator =
@@ -78,65 +86,54 @@ public class DoFnLifecycleManagerRemovingTransformEvaluatorTest {
       assertThat(lifecycleManager.get(), not(Matchers.<DoFn<?, ?>>theInstance(original)));
       return;
     }
-    fail("Expected ThrowingTransformEvaluator to throw on method call");
+    fail("Expected underlying evaluator to throw on method call");
   }
 
   @Test
-  public void removesOnExceptionInFinishBundle() throws Exception {
-    ThrowingTransformEvaluator underlying = new ThrowingTransformEvaluator();
+  public void removesOnExceptionInOnTimer() throws Exception {
+    ParDoEvaluator<Object, Object> underlying = mock(ParDoEvaluator.class);
+    doThrow(Exception.class)
+        .when(underlying)
+        .onTimer(any(TimerData.class), any(BoundedWindow.class));
+
     DoFn<?, ?> original = lifecycleManager.get();
-    // the LifecycleManager is set when the evaluator starts
     assertThat(original, not(nullValue()));
-    TransformEvaluator<Object> evaluator =
+    DoFnLifecycleManagerRemovingTransformEvaluator<Object> evaluator =
         DoFnLifecycleManagerRemovingTransformEvaluator.wrapping(underlying, lifecycleManager);
 
     try {
-      evaluator.finishBundle();
+      evaluator.onTimer(
+          TimerData.of("foo", StateNamespaces.global(), new Instant(0), TimeDomain.EVENT_TIME),
+          GlobalWindow.INSTANCE);
     } catch (Exception e) {
-      assertThat(lifecycleManager.get(),
-          Matchers.not(Matchers.<DoFn<?, ?>>theInstance(original)));
+      assertThat(lifecycleManager.get(), not(Matchers.<DoFn<?, ?>>theInstance(original)));
       return;
     }
-    fail("Expected ThrowingTransformEvaluator to throw on method call");
+    fail("Expected underlying evaluator to throw on method call");
   }
 
-  private class RecordingTransformEvaluator implements TransformEvaluator<Object> {
-    private boolean finishBundleCalled;
-    private List<WindowedValue<Object>> objects;
-
-    public RecordingTransformEvaluator() {
-      this.finishBundleCalled = true;
-      this.objects = new ArrayList<>();
-    }
-
-    @Override
-    public void processElement(WindowedValue<Object> element) throws Exception {
-      objects.add(element);
-    }
-
-    @Override
-    public TransformResult<Object> finishBundle() throws Exception {
-      finishBundleCalled = true;
-      return null;
-    }
-  }
+  @Test
+  public void removesOnExceptionInFinishBundle() throws Exception {
+    ParDoEvaluator<Object, Object> underlying = mock(ParDoEvaluator.class);
+    doThrow(Exception.class).when(underlying).finishBundle();
 
-  private class ThrowingTransformEvaluator implements TransformEvaluator<Object> {
-    @Override
-    public void processElement(WindowedValue<Object> element) throws Exception {
-      throw new Exception();
-    }
+    DoFn<?, ?> original = lifecycleManager.get();
+    // the LifecycleManager is set when the evaluator starts
+    assertThat(original, not(nullValue()));
+    TransformEvaluator<Object> evaluator =
+        DoFnLifecycleManagerRemovingTransformEvaluator.wrapping(underlying, lifecycleManager);
 
-    @Override
-    public TransformResult<Object> finishBundle() throws Exception {
-      throw new Exception();
+    try {
+      evaluator.finishBundle();
+    } catch (Exception e) {
+      assertThat(lifecycleManager.get(), Matchers.not(Matchers.<DoFn<?, ?>>theInstance(original)));
+      return;
     }
+    fail("Expected underlying evaluator to throw on method call");
   }
 
-
   private static class TestFn extends DoFn<Object, Object> {
     @ProcessElement
-    public void processElement(ProcessContext c) throws Exception {
-    }
+    public void processElement(ProcessContext c) throws Exception {}
   }
 }

http://git-wip-us.apache.org/repos/asf/beam/blob/d4cc0c33/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/ParDoTest.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/ParDoTest.java b/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/ParDoTest.java
index d95b2d0..2e3fb85 100644
--- a/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/ParDoTest.java
+++ b/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/ParDoTest.java
@@ -50,6 +50,8 @@ import java.util.Collections;
 import java.util.List;
 import org.apache.beam.sdk.coders.AtomicCoder;
 import org.apache.beam.sdk.coders.CoderException;
+import org.apache.beam.sdk.coders.KvCoder;
+import org.apache.beam.sdk.coders.StringUtf8Coder;
 import org.apache.beam.sdk.coders.VarIntCoder;
 import org.apache.beam.sdk.io.CountingInput;
 import org.apache.beam.sdk.testing.NeedsRunner;
@@ -58,6 +60,7 @@ import org.apache.beam.sdk.testing.RunnableOnService;
 import org.apache.beam.sdk.testing.TestPipeline;
 import org.apache.beam.sdk.testing.TestStream;
 import org.apache.beam.sdk.testing.UsesStatefulParDo;
+import org.apache.beam.sdk.testing.UsesTestStream;
 import org.apache.beam.sdk.testing.UsesTimersInParDo;
 import org.apache.beam.sdk.transforms.DoFn.OnTimer;
 import org.apache.beam.sdk.transforms.DoFn.ProcessElement;
@@ -1646,7 +1649,7 @@ public class ParDoTest implements Serializable {
    */
   @Test
   @Category({RunnableOnService.class, UsesTimersInParDo.class})
-  public void testSimpleEventTimeTimer() throws Exception {
+  public void testEventTimeTimerBounded() throws Exception {
     final String timerId = "foo";
 
     DoFn<KV<String, Integer>, Integer> fn =
@@ -1673,6 +1676,75 @@ public class ParDoTest implements Serializable {
   }
 
   @Test
+  @Category({NeedsRunner.class, UsesTimersInParDo.class, UsesTestStream.class})
+  public void testSimpleProcessingTimerTimer() throws Exception {
+    final String timerId = "foo";
+
+    DoFn<KV<String, Integer>, Integer> fn =
+        new DoFn<KV<String, Integer>, Integer>() {
+
+          @TimerId(timerId)
+          private final TimerSpec spec = TimerSpecs.timer(TimeDomain.PROCESSING_TIME);
+
+          @ProcessElement
+          public void processElement(ProcessContext context, @TimerId(timerId) Timer timer) {
+            timer.setForNowPlus(Duration.standardSeconds(1));
+            context.output(3);
+          }
+
+          @OnTimer(timerId)
+          public void onTimer(OnTimerContext context) {
+            context.output(42);
+          }
+        };
+
+    TestStream<KV<String, Integer>> stream =
+        TestStream.create(KvCoder.of(StringUtf8Coder.of(), VarIntCoder.of()))
+            .addElements(KV.of("hello", 37))
+            .advanceProcessingTime(Duration.standardSeconds(2))
+            .advanceWatermarkToInfinity();
+
+    PCollection<Integer> output = pipeline.apply(stream).apply(ParDo.of(fn));
+    PAssert.that(output).containsInAnyOrder(3, 42);
+    pipeline.run();
+  }
+
+  @Test
+  @Category({NeedsRunner.class, UsesTimersInParDo.class, UsesTestStream.class})
+  public void testEventTimeTimerUnbounded() throws Exception {
+    final String timerId = "foo";
+
+    DoFn<KV<String, Integer>, Integer> fn =
+        new DoFn<KV<String, Integer>, Integer>() {
+
+          @TimerId(timerId)
+          private final TimerSpec spec = TimerSpecs.timer(TimeDomain.EVENT_TIME);
+
+          @ProcessElement
+          public void processElement(ProcessContext context, @TimerId(timerId) Timer timer) {
+            timer.setForNowPlus(Duration.standardSeconds(1));
+            context.output(3);
+          }
+
+          @OnTimer(timerId)
+          public void onTimer(OnTimerContext context) {
+            context.output(42);
+          }
+        };
+
+    TestStream<KV<String, Integer>> stream = TestStream.create(KvCoder
+        .of(StringUtf8Coder.of(), VarIntCoder.of()))
+        .advanceWatermarkTo(new Instant(0))
+        .addElements(KV.of("hello", 37))
+        .advanceWatermarkTo(new Instant(0).plus(Duration.standardSeconds(1)))
+        .advanceWatermarkToInfinity();
+
+    PCollection<Integer> output = pipeline.apply(stream).apply(ParDo.of(fn));
+    PAssert.that(output).containsInAnyOrder(3, 42);
+    pipeline.run();
+  }
+
+  @Test
   public void testWithOutputTagsDisplayData() {
     DoFn<String, String> fn = new DoFn<String, String>() {
       @ProcessElement