You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@beam.apache.org by ke...@apache.org on 2016/12/17 21:11:32 UTC

[1/5] incubator-beam git commit: Make TimerSpec and StateSpec fields accessible

Repository: incubator-beam
Updated Branches:
  refs/heads/master 5255a3381 -> c28957d16


Make TimerSpec and StateSpec fields accessible


Project: http://git-wip-us.apache.org/repos/asf/incubator-beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-beam/commit/c1e1017d
Tree: http://git-wip-us.apache.org/repos/asf/incubator-beam/tree/c1e1017d
Diff: http://git-wip-us.apache.org/repos/asf/incubator-beam/diff/c1e1017d

Branch: refs/heads/master
Commit: c1e1017d6e4d75aee7f32cc3d08b9e2a7c21dbb2
Parents: ffe3ab3
Author: Kenneth Knowles <kl...@google.com>
Authored: Wed Dec 7 20:09:06 2016 -0800
Committer: Kenneth Knowles <kl...@google.com>
Committed: Fri Dec 16 20:10:37 2016 -0800

----------------------------------------------------------------------
 .../org/apache/beam/sdk/transforms/reflect/DoFnSignatures.java   | 4 ++++
 1 file changed, 4 insertions(+)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/c1e1017d/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/reflect/DoFnSignatures.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/reflect/DoFnSignatures.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/reflect/DoFnSignatures.java
index e3ba966..d72cea4 100644
--- a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/reflect/DoFnSignatures.java
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/reflect/DoFnSignatures.java
@@ -1040,6 +1040,8 @@ public class DoFnSignatures {
       ErrorReporter errors, Class<?> fnClazz) {
     Map<String, DoFnSignature.TimerDeclaration> declarations = new HashMap<>();
     for (Field field : declaredFieldsWithAnnotation(DoFn.TimerId.class, fnClazz, DoFn.class)) {
+      // TimerSpec fields may generally be private, but will be accessed via the signature
+      field.setAccessible(true);
       String id = field.getAnnotation(DoFn.TimerId.class).value();
       validateTimerField(errors, declarations, id, field);
       declarations.put(id, DoFnSignature.TimerDeclaration.create(id, field));
@@ -1205,6 +1207,8 @@ public class DoFnSignatures {
     Map<String, DoFnSignature.StateDeclaration> declarations = new HashMap<>();
 
     for (Field field : declaredFieldsWithAnnotation(DoFn.StateId.class, fnClazz, DoFn.class)) {
+      // StateSpec fields may generally be private, but will be accessed via the signature
+      field.setAccessible(true);
       String id = field.getAnnotation(DoFn.StateId.class).value();
 
       if (declarations.containsKey(id)) {


[3/5] incubator-beam git commit: Add timer support to DoFnRunner(s)

Posted by ke...@apache.org.
Add timer support to DoFnRunner(s)


Project: http://git-wip-us.apache.org/repos/asf/incubator-beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-beam/commit/8af13b01
Tree: http://git-wip-us.apache.org/repos/asf/incubator-beam/tree/8af13b01
Diff: http://git-wip-us.apache.org/repos/asf/incubator-beam/diff/8af13b01

Branch: refs/heads/master
Commit: 8af13b0102cda6c68601efa4119723900d12ca5c
Parents: c1e1017
Author: Kenneth Knowles <kl...@google.com>
Authored: Wed Nov 23 14:21:40 2016 -0800
Committer: Kenneth Knowles <kl...@google.com>
Committed: Fri Dec 16 20:14:19 2016 -0800

----------------------------------------------------------------------
 .../apache/beam/runners/core/DoFnRunner.java    |   9 +
 .../core/LateDataDroppingDoFnRunner.java        |   7 +
 .../core/PushbackSideInputDoFnRunner.java       |   8 +
 .../beam/runners/core/SimpleDoFnRunner.java     | 236 +++++++++++++++++-
 .../beam/runners/core/SimpleOldDoFnRunner.java  |   8 +
 .../core/PushbackSideInputDoFnRunnerTest.java   |  41 +++
 .../beam/runners/core/SimpleDoFnRunnerTest.java | 247 +++++++++++++++++++
 7 files changed, 555 insertions(+), 1 deletion(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/8af13b01/runners/core-java/src/main/java/org/apache/beam/runners/core/DoFnRunner.java
----------------------------------------------------------------------
diff --git a/runners/core-java/src/main/java/org/apache/beam/runners/core/DoFnRunner.java b/runners/core-java/src/main/java/org/apache/beam/runners/core/DoFnRunner.java
index 501667e..7c73a34 100644
--- a/runners/core-java/src/main/java/org/apache/beam/runners/core/DoFnRunner.java
+++ b/runners/core-java/src/main/java/org/apache/beam/runners/core/DoFnRunner.java
@@ -20,8 +20,11 @@ package org.apache.beam.runners.core;
 import org.apache.beam.sdk.transforms.Aggregator;
 import org.apache.beam.sdk.transforms.DoFn;
 import org.apache.beam.sdk.transforms.OldDoFn;
+import org.apache.beam.sdk.transforms.windowing.BoundedWindow;
+import org.apache.beam.sdk.util.TimeDomain;
 import org.apache.beam.sdk.util.WindowedValue;
 import org.apache.beam.sdk.values.KV;
+import org.joda.time.Instant;
 
 /**
  * An wrapper interface that represents the execution of a {@link DoFn}.
@@ -39,6 +42,12 @@ public interface DoFnRunner<InputT, OutputT> {
   void processElement(WindowedValue<InputT> elem);
 
   /**
+   * Calls a {@link DoFn DoFn's} {@link DoFn.OnTimer @OnTimer} method for the given timer
+   * in the given window.
+   */
+  void onTimer(String timerId, BoundedWindow window, Instant timestamp, TimeDomain timeDomain);
+
+  /**
    * Calls a {@link DoFn DoFn's} {@link DoFn.FinishBundle @FinishBundle} method and performs
    * additional tasks, such as flushing in-memory states.
    */

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/8af13b01/runners/core-java/src/main/java/org/apache/beam/runners/core/LateDataDroppingDoFnRunner.java
----------------------------------------------------------------------
diff --git a/runners/core-java/src/main/java/org/apache/beam/runners/core/LateDataDroppingDoFnRunner.java b/runners/core-java/src/main/java/org/apache/beam/runners/core/LateDataDroppingDoFnRunner.java
index 9bfe9ae..290171a 100644
--- a/runners/core-java/src/main/java/org/apache/beam/runners/core/LateDataDroppingDoFnRunner.java
+++ b/runners/core-java/src/main/java/org/apache/beam/runners/core/LateDataDroppingDoFnRunner.java
@@ -24,6 +24,7 @@ import com.google.common.collect.Iterables;
 import org.apache.beam.sdk.transforms.Aggregator;
 import org.apache.beam.sdk.transforms.OldDoFn;
 import org.apache.beam.sdk.transforms.windowing.BoundedWindow;
+import org.apache.beam.sdk.util.TimeDomain;
 import org.apache.beam.sdk.util.TimerInternals;
 import org.apache.beam.sdk.util.WindowTracing;
 import org.apache.beam.sdk.util.WindowedValue;
@@ -73,6 +74,12 @@ public class LateDataDroppingDoFnRunner<K, InputT, OutputT, W extends BoundedWin
   }
 
   @Override
+  public void onTimer(String timerId, BoundedWindow window, Instant timestamp,
+      TimeDomain timeDomain) {
+    doFnRunner.onTimer(timerId, window, timestamp, timeDomain);
+  }
+
+  @Override
   public void finishBundle() {
     doFnRunner.finishBundle();
   }

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/8af13b01/runners/core-java/src/main/java/org/apache/beam/runners/core/PushbackSideInputDoFnRunner.java
----------------------------------------------------------------------
diff --git a/runners/core-java/src/main/java/org/apache/beam/runners/core/PushbackSideInputDoFnRunner.java b/runners/core-java/src/main/java/org/apache/beam/runners/core/PushbackSideInputDoFnRunner.java
index 0bb9153..2962832 100644
--- a/runners/core-java/src/main/java/org/apache/beam/runners/core/PushbackSideInputDoFnRunner.java
+++ b/runners/core-java/src/main/java/org/apache/beam/runners/core/PushbackSideInputDoFnRunner.java
@@ -25,8 +25,10 @@ import java.util.HashSet;
 import java.util.Set;
 import org.apache.beam.sdk.transforms.windowing.BoundedWindow;
 import org.apache.beam.sdk.util.ReadyCheckingSideInputReader;
+import org.apache.beam.sdk.util.TimeDomain;
 import org.apache.beam.sdk.util.WindowedValue;
 import org.apache.beam.sdk.values.PCollectionView;
+import org.joda.time.Instant;
 
 /**
  * A {@link DoFnRunner} that can refuse to process elements that are not ready, instead returning
@@ -109,6 +111,12 @@ public class PushbackSideInputDoFnRunner<InputT, OutputT> implements DoFnRunner<
     underlying.processElement(elem);
   }
 
+  @Override
+  public void onTimer(String timerId, BoundedWindow window, Instant timestamp,
+      TimeDomain timeDomain) {
+    underlying.onTimer(timerId, window, timestamp, timeDomain);
+  }
+
   /**
    * Call the underlying {@link DoFnRunner#finishBundle()}.
    */

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/8af13b01/runners/core-java/src/main/java/org/apache/beam/runners/core/SimpleDoFnRunner.java
----------------------------------------------------------------------
diff --git a/runners/core-java/src/main/java/org/apache/beam/runners/core/SimpleDoFnRunner.java b/runners/core-java/src/main/java/org/apache/beam/runners/core/SimpleDoFnRunner.java
index 29ef3ef..a7d82bf 100644
--- a/runners/core-java/src/main/java/org/apache/beam/runners/core/SimpleDoFnRunner.java
+++ b/runners/core-java/src/main/java/org/apache/beam/runners/core/SimpleDoFnRunner.java
@@ -50,8 +50,10 @@ import org.apache.beam.sdk.transforms.windowing.WindowFn;
 import org.apache.beam.sdk.util.ExecutionContext.StepContext;
 import org.apache.beam.sdk.util.SideInputReader;
 import org.apache.beam.sdk.util.SystemDoFnInternal;
+import org.apache.beam.sdk.util.TimeDomain;
 import org.apache.beam.sdk.util.Timer;
 import org.apache.beam.sdk.util.TimerInternals;
+import org.apache.beam.sdk.util.TimerSpec;
 import org.apache.beam.sdk.util.UserCodeException;
 import org.apache.beam.sdk.util.WindowedValue;
 import org.apache.beam.sdk.util.WindowingInternals;
@@ -64,6 +66,7 @@ import org.apache.beam.sdk.util.state.StateSpec;
 import org.apache.beam.sdk.util.state.StateTags;
 import org.apache.beam.sdk.values.PCollectionView;
 import org.apache.beam.sdk.values.TupleTag;
+import org.joda.time.Duration;
 import org.joda.time.Instant;
 import org.joda.time.format.PeriodFormat;
 
@@ -161,6 +164,35 @@ public class SimpleDoFnRunner<InputT, OutputT> implements DoFnRunner<InputT, Out
     }
   }
 
+  @Override
+  public void onTimer(
+      String timerId, BoundedWindow window, Instant timestamp, TimeDomain timeDomain) {
+
+    // The effective timestamp is when derived elements will have their timestamp set, if not
+    // otherwise specified. If this is an event time timer, then they have the timestamp of the
+    // timer itself. Otherwise, they are set to the input timestamp, which is by definition
+    // non-late.
+    Instant effectiveTimestamp;
+    switch (timeDomain) {
+      case EVENT_TIME:
+        effectiveTimestamp = timestamp;
+        break;
+
+      case PROCESSING_TIME:
+      case SYNCHRONIZED_PROCESSING_TIME:
+        effectiveTimestamp = context.stepContext.timerInternals().currentInputWatermarkTime();
+        break;
+
+      default:
+        throw new IllegalArgumentException(
+            String.format("Unknown time domain: %s", timeDomain));
+    }
+
+    OnTimerArgumentProvider<InputT, OutputT> argumentProvider =
+        new OnTimerArgumentProvider<>(fn, context, window, effectiveTimestamp, timeDomain);
+    invoker.invokeOnTimer(timerId, argumentProvider);
+  }
+
   private void invokeProcessElement(WindowedValue<InputT> elem) {
     final DoFnProcessContext<InputT, OutputT> processContext = createProcessContext(elem);
 
@@ -630,7 +662,13 @@ public class SimpleDoFnRunner<InputT, OutputT> implements DoFnRunner<InputT, Out
 
     @Override
     public Timer timer(String timerId) {
-      throw new UnsupportedOperationException("Timer parameters are not supported.");
+      try {
+        TimerSpec spec =
+            (TimerSpec) signature.timerDeclarations().get(timerId).field().get(fn);
+        return new TimerInternalsTimer(getNamespace(), timerId, spec, stepContext.timerInternals());
+      } catch (IllegalAccessException e) {
+        throw new RuntimeException(e);
+      }
     }
 
     @Override
@@ -682,5 +720,201 @@ public class SimpleDoFnRunner<InputT, OutputT> implements DoFnRunner<InputT, Out
         }
       };
     }
+
+  }
+
+  /**
+   * A concrete implementation of {@link DoFnInvoker.ArgumentProvider} used for running a {@link
+   * DoFn} on a timer.
+   *
+   * @param <InputT> the type of the {@link DoFn} (main) input elements
+   * @param <OutputT> the type of the {@link DoFn} (main) output elements
+   */
+  private class OnTimerArgumentProvider<InputT, OutputT>
+      extends DoFn<InputT, OutputT>.OnTimerContext
+      implements DoFnInvoker.ArgumentProvider<InputT, OutputT> {
+
+    final DoFn<InputT, OutputT> fn;
+    final DoFnContext<InputT, OutputT> context;
+    private final BoundedWindow window;
+    private final Instant timestamp;
+    private final TimeDomain timeDomain;
+
+    /** Lazily initialized; should only be accessed via {@link #getNamespace()}. */
+    private StateNamespace namespace;
+
+    /**
+     * The state namespace for this context.
+     *
+     * <p>Any call to {@link #getNamespace()} when more than one window is present will crash; this
+     * represents a bug in the runner or the {@link DoFnSignature}, since values must be in exactly
+     * one window when state or timers are relevant.
+     */
+    private StateNamespace getNamespace() {
+      if (namespace == null) {
+        namespace = StateNamespaces.window(windowCoder, window);
+      }
+      return namespace;
+    }
+
+    private OnTimerArgumentProvider(
+        DoFn<InputT, OutputT> fn,
+        DoFnContext<InputT, OutputT> context,
+        BoundedWindow window,
+        Instant timestamp,
+        TimeDomain timeDomain) {
+      fn.super();
+      this.fn = fn;
+      this.context = context;
+      this.window = window;
+      this.timestamp = timestamp;
+      this.timeDomain = timeDomain;
+    }
+
+    @Override
+    public Instant timestamp() {
+      return timestamp;
+    }
+
+    @Override
+    public BoundedWindow window() {
+      return window;
+    }
+
+    @Override
+    public TimeDomain timeDomain() {
+      return timeDomain;
+    }
+
+    @Override
+    public Context context(DoFn<InputT, OutputT> doFn) {
+      throw new UnsupportedOperationException("Context parameters are not supported.");
+    }
+
+    @Override
+    public ProcessContext processContext(DoFn<InputT, OutputT> doFn) {
+      throw new UnsupportedOperationException("ProcessContext parameters are not supported.");
+    }
+
+    @Override
+    public OnTimerContext onTimerContext(DoFn<InputT, OutputT> doFn) {
+      return this;
+    }
+
+    @Override
+    public InputProvider<InputT> inputProvider() {
+      throw new UnsupportedOperationException("InputProvider parameters are not supported.");
+    }
+
+    @Override
+    public OutputReceiver<OutputT> outputReceiver() {
+      throw new UnsupportedOperationException("OutputReceiver parameters are not supported.");
+    }
+
+    @Override
+    public <RestrictionT> RestrictionTracker<RestrictionT> restrictionTracker() {
+      throw new UnsupportedOperationException("RestrictionTracker parameters are not supported.");
+    }
+
+    @Override
+    public State state(String stateId) {
+      try {
+        StateSpec<?, ?> spec =
+            (StateSpec<?, ?>) signature.stateDeclarations().get(stateId).field().get(fn);
+        return stepContext
+            .stateInternals()
+            .state(getNamespace(), StateTags.tagForSpec(stateId, (StateSpec) spec));
+      } catch (IllegalAccessException e) {
+        throw new RuntimeException(e);
+      }
+    }
+
+    @Override
+    public Timer timer(String timerId) {
+      try {
+        TimerSpec spec =
+            (TimerSpec) signature.timerDeclarations().get(timerId).field().get(fn);
+        return new TimerInternalsTimer(getNamespace(), timerId, spec, stepContext.timerInternals());
+      } catch (IllegalAccessException e) {
+        throw new RuntimeException(e);
+      }
+    }
+
+    @Override
+    public PipelineOptions getPipelineOptions() {
+      return context.getPipelineOptions();
+    }
+
+    @Override
+    public void output(OutputT output) {
+      context.outputWithTimestamp(output, timestamp);
+    }
+
+    @Override
+    public void outputWithTimestamp(OutputT output, Instant timestamp) {
+      context.outputWithTimestamp(output, timestamp);
+    }
+
+    @Override
+    public <T> void sideOutput(TupleTag<T> tag, T output) {
+      context.sideOutputWithTimestamp(tag, output, timestamp);
+    }
+
+    @Override
+    public <T> void sideOutputWithTimestamp(TupleTag<T> tag, T output, Instant timestamp) {
+      context.sideOutputWithTimestamp(tag, output, timestamp);
+    }
+
+    @Override
+    protected <AggInputT, AggOutputT> Aggregator<AggInputT, AggOutputT> createAggregator(
+        String name,
+        CombineFn<AggInputT, ?, AggOutputT> combiner) {
+      throw new UnsupportedOperationException("Cannot createAggregator in @OnTimer method");
+    }
+
+    @Override
+    public WindowingInternals<InputT, OutputT> windowingInternals() {
+      throw new UnsupportedOperationException("WindowingInternals are unsupported.");
+    }
+  }
+
+  private static class TimerInternalsTimer implements Timer {
+    private final TimerInternals timerInternals;
+    private final String timerId;
+    private final TimerSpec spec;
+    private final StateNamespace namespace;
+
+    public TimerInternalsTimer(
+        StateNamespace namespace, String timerId, TimerSpec spec, TimerInternals timerInternals) {
+      this.namespace = namespace;
+      this.timerId = timerId;
+      this.spec = spec;
+      this.timerInternals = timerInternals;
+    }
+
+    @Override
+    public void setForNowPlus(Duration durationFromNow) {
+      timerInternals.setTimer(
+          namespace, timerId, getCurrentTime().plus(durationFromNow), spec.getTimeDomain());
+    }
+
+    @Override
+    public void cancel() {
+      timerInternals.deleteTimer(namespace, timerId);
+    }
+
+    private Instant getCurrentTime() {
+      switch(spec.getTimeDomain()) {
+        case EVENT_TIME:
+          return timerInternals.currentInputWatermarkTime();
+        case PROCESSING_TIME:
+          return timerInternals.currentProcessingTime();
+        case SYNCHRONIZED_PROCESSING_TIME:
+          return timerInternals.currentSynchronizedProcessingTime();
+        default:
+          throw new IllegalStateException(
+              String.format("Timer created for unknown time domain %s", spec.getTimeDomain()));
+      }
+    }
   }
 }

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/8af13b01/runners/core-java/src/main/java/org/apache/beam/runners/core/SimpleOldDoFnRunner.java
----------------------------------------------------------------------
diff --git a/runners/core-java/src/main/java/org/apache/beam/runners/core/SimpleOldDoFnRunner.java b/runners/core-java/src/main/java/org/apache/beam/runners/core/SimpleOldDoFnRunner.java
index 1048fdc..342a4a8 100644
--- a/runners/core-java/src/main/java/org/apache/beam/runners/core/SimpleOldDoFnRunner.java
+++ b/runners/core-java/src/main/java/org/apache/beam/runners/core/SimpleOldDoFnRunner.java
@@ -39,6 +39,7 @@ import org.apache.beam.sdk.transforms.windowing.WindowFn;
 import org.apache.beam.sdk.util.ExecutionContext.StepContext;
 import org.apache.beam.sdk.util.SideInputReader;
 import org.apache.beam.sdk.util.SystemDoFnInternal;
+import org.apache.beam.sdk.util.TimeDomain;
 import org.apache.beam.sdk.util.TimerInternals;
 import org.apache.beam.sdk.util.UserCodeException;
 import org.apache.beam.sdk.util.WindowedValue;
@@ -107,6 +108,13 @@ class SimpleOldDoFnRunner<InputT, OutputT> implements DoFnRunner<InputT, OutputT
     }
   }
 
+  @Override
+  public void onTimer(String timerId, BoundedWindow window, Instant timestamp,
+      TimeDomain timeDomain) {
+    throw new UnsupportedOperationException(
+        String.format("Timers are not supported by %s", OldDoFn.class.getSimpleName()));
+  }
+
   private void invokeProcessElement(WindowedValue<InputT> elem) {
     final OldDoFn<InputT, OutputT>.ProcessContext processContext = createProcessContext(elem);
     // This can contain user code. Wrap it in case it throws an exception.

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/8af13b01/runners/core-java/src/test/java/org/apache/beam/runners/core/PushbackSideInputDoFnRunnerTest.java
----------------------------------------------------------------------
diff --git a/runners/core-java/src/test/java/org/apache/beam/runners/core/PushbackSideInputDoFnRunnerTest.java b/runners/core-java/src/test/java/org/apache/beam/runners/core/PushbackSideInputDoFnRunnerTest.java
index 176ab26..a1cdbf6 100644
--- a/runners/core-java/src/test/java/org/apache/beam/runners/core/PushbackSideInputDoFnRunnerTest.java
+++ b/runners/core-java/src/test/java/org/apache/beam/runners/core/PushbackSideInputDoFnRunnerTest.java
@@ -17,6 +17,7 @@
  */
 package org.apache.beam.runners.core;
 
+import static org.hamcrest.Matchers.contains;
 import static org.hamcrest.Matchers.containsInAnyOrder;
 import static org.hamcrest.Matchers.emptyIterable;
 import static org.hamcrest.Matchers.equalTo;
@@ -37,7 +38,10 @@ import org.apache.beam.sdk.transforms.windowing.PaneInfo;
 import org.apache.beam.sdk.transforms.windowing.Window;
 import org.apache.beam.sdk.util.IdentitySideInputWindowFn;
 import org.apache.beam.sdk.util.ReadyCheckingSideInputReader;
+import org.apache.beam.sdk.util.TimeDomain;
+import org.apache.beam.sdk.util.TimerInternals.TimerData;
 import org.apache.beam.sdk.util.WindowedValue;
+import org.apache.beam.sdk.util.state.StateNamespaces;
 import org.apache.beam.sdk.values.PCollection;
 import org.apache.beam.sdk.values.PCollectionView;
 import org.hamcrest.Matchers;
@@ -215,8 +219,33 @@ public class PushbackSideInputDoFnRunnerTest {
     assertThat(underlying.inputElems, containsInAnyOrder(multiWindow));
   }
 
+  /** Tests that a call to onTimer gets delegated. */
+  @Test
+  public void testOnTimerCalled() {
+    PushbackSideInputDoFnRunner<Integer, Integer> runner =
+        createRunner(ImmutableList.<PCollectionView<?>>of());
+
+    String timerId = "fooTimer";
+    IntervalWindow window = new IntervalWindow(new Instant(4), new Instant(16));
+    Instant timestamp = new Instant(72);
+
+    // Mocking is not easily compatible with annotation analysis, so we manually record
+    // the method call.
+    runner.onTimer(timerId, window, new Instant(timestamp), TimeDomain.EVENT_TIME);
+
+    assertThat(
+        underlying.firedTimers,
+        contains(
+            TimerData.of(
+                timerId,
+                StateNamespaces.window(IntervalWindow.getCoder(), window),
+                timestamp,
+                TimeDomain.EVENT_TIME)));
+  }
+
   private static class TestDoFnRunner<InputT, OutputT> implements DoFnRunner<InputT, OutputT> {
     List<WindowedValue<InputT>> inputElems;
+    List<TimerData> firedTimers;
     private boolean started = false;
     private boolean finished = false;
 
@@ -224,6 +253,7 @@ public class PushbackSideInputDoFnRunnerTest {
     public void startBundle() {
       started = true;
       inputElems = new ArrayList<>();
+      firedTimers = new ArrayList<>();
     }
 
     @Override
@@ -232,6 +262,17 @@ public class PushbackSideInputDoFnRunnerTest {
     }
 
     @Override
+    public void onTimer(String timerId, BoundedWindow window, Instant timestamp,
+        TimeDomain timeDomain) {
+      firedTimers.add(
+          TimerData.of(
+              timerId,
+              StateNamespaces.window(IntervalWindow.getCoder(), (IntervalWindow) window),
+              timestamp,
+              timeDomain));
+    }
+
+    @Override
     public void finishBundle() {
       finished = true;
     }

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/8af13b01/runners/core-java/src/test/java/org/apache/beam/runners/core/SimpleDoFnRunnerTest.java
----------------------------------------------------------------------
diff --git a/runners/core-java/src/test/java/org/apache/beam/runners/core/SimpleDoFnRunnerTest.java b/runners/core-java/src/test/java/org/apache/beam/runners/core/SimpleDoFnRunnerTest.java
new file mode 100644
index 0000000..f068c19
--- /dev/null
+++ b/runners/core-java/src/test/java/org/apache/beam/runners/core/SimpleDoFnRunnerTest.java
@@ -0,0 +1,247 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.runners.core;
+
+import static org.hamcrest.Matchers.contains;
+import static org.hamcrest.Matchers.is;
+import static org.junit.Assert.assertThat;
+import static org.mockito.Mockito.verify;
+import static org.mockito.Mockito.when;
+
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.List;
+import org.apache.beam.sdk.coders.Coder;
+import org.apache.beam.sdk.transforms.DoFn;
+import org.apache.beam.sdk.transforms.windowing.BoundedWindow;
+import org.apache.beam.sdk.transforms.windowing.GlobalWindow;
+import org.apache.beam.sdk.transforms.windowing.GlobalWindows;
+import org.apache.beam.sdk.transforms.windowing.WindowFn;
+import org.apache.beam.sdk.util.BaseExecutionContext.StepContext;
+import org.apache.beam.sdk.util.TimeDomain;
+import org.apache.beam.sdk.util.Timer;
+import org.apache.beam.sdk.util.TimerInternals;
+import org.apache.beam.sdk.util.TimerInternals.TimerData;
+import org.apache.beam.sdk.util.TimerSpec;
+import org.apache.beam.sdk.util.TimerSpecs;
+import org.apache.beam.sdk.util.UserCodeException;
+import org.apache.beam.sdk.util.WindowedValue;
+import org.apache.beam.sdk.util.WindowingStrategy;
+import org.apache.beam.sdk.util.state.StateNamespaces;
+import org.apache.beam.sdk.values.TupleTag;
+import org.joda.time.Duration;
+import org.joda.time.Instant;
+import org.junit.Before;
+import org.junit.Rule;
+import org.junit.Test;
+import org.junit.rules.ExpectedException;
+import org.junit.runner.RunWith;
+import org.junit.runners.JUnit4;
+import org.mockito.Mock;
+import org.mockito.MockitoAnnotations;
+
+/** Tests for {@link SimpleDoFnRunner}. */
+@RunWith(JUnit4.class)
+public class SimpleDoFnRunnerTest {
+  @Rule public ExpectedException thrown = ExpectedException.none();
+
+  @Mock StepContext mockStepContext;
+
+  @Mock TimerInternals mockTimerInternals;
+
+  @Before
+  public void setup() {
+    MockitoAnnotations.initMocks(this);
+    when(mockStepContext.timerInternals()).thenReturn(mockTimerInternals);
+  }
+
+  @Test
+  public void testProcessElementExceptionsWrappedAsUserCodeException() {
+    ThrowingDoFn fn = new ThrowingDoFn();
+    DoFnRunner<String, String> runner =
+        new SimpleDoFnRunner<>(
+            null,
+            fn,
+            null,
+            null,
+            null,
+            Collections.<TupleTag<?>>emptyList(),
+            mockStepContext,
+            null,
+            WindowingStrategy.of(new GlobalWindows()));
+
+    thrown.expect(UserCodeException.class);
+    thrown.expectCause(is(fn.exceptionToThrow));
+
+    runner.processElement(WindowedValue.valueInGlobalWindow("anyValue"));
+  }
+
+  @Test
+  public void testOnTimerExceptionsWrappedAsUserCodeException() {
+    ThrowingDoFn fn = new ThrowingDoFn();
+    DoFnRunner<String, String> runner =
+        new SimpleDoFnRunner<>(
+            null,
+            fn,
+            null,
+            null,
+            null,
+            Collections.<TupleTag<?>>emptyList(),
+            mockStepContext,
+            null,
+            WindowingStrategy.of(new GlobalWindows()));
+
+    thrown.expect(UserCodeException.class);
+    thrown.expectCause(is(fn.exceptionToThrow));
+
+    runner.onTimer(
+        ThrowingDoFn.TIMER_ID,
+        GlobalWindow.INSTANCE,
+        new Instant(0),
+        TimeDomain.EVENT_TIME);
+  }
+
+  /**
+   * Tests that a users call to set a timer gets properly dispatched to the timer internals. From
+   * there on, it is the duty of the runner & step context to set it in whatever way is right for
+   * that runner.
+   */
+  @Test
+  public void testTimerSet() {
+    WindowFn<?, ?> windowFn = new GlobalWindows();
+    DoFnWithTimers<GlobalWindow> fn = new DoFnWithTimers(windowFn.windowCoder());
+    DoFnRunner<String, String> runner =
+        new SimpleDoFnRunner<>(
+            null,
+            fn,
+            null,
+            null,
+            null,
+            Collections.<TupleTag<?>>emptyList(),
+            mockStepContext,
+            null,
+            WindowingStrategy.of(new GlobalWindows()));
+
+    // Setting the timer needs the current time, as it is set relative
+    Instant currentTime = new Instant(42);
+    when(mockTimerInternals.currentInputWatermarkTime()).thenReturn(currentTime);
+
+    runner.processElement(WindowedValue.valueInGlobalWindow("anyValue"));
+
+    verify(mockTimerInternals)
+        .setTimer(
+            StateNamespaces.window(new GlobalWindows().windowCoder(), GlobalWindow.INSTANCE),
+            DoFnWithTimers.TIMER_ID,
+            currentTime.plus(DoFnWithTimers.TIMER_OFFSET),
+            TimeDomain.EVENT_TIME);
+  }
+
+  /**
+   * Tests that {@link SimpleDoFnRunner#onTimer} properly dispatches to the underlying
+   * {@link DoFn}.
+   */
+  @Test
+  public void testOnTimerCalled() {
+    WindowFn<?, GlobalWindow> windowFn = new GlobalWindows();
+    DoFnWithTimers<GlobalWindow> fn = new DoFnWithTimers(windowFn.windowCoder());
+    DoFnRunner<String, String> runner =
+        new SimpleDoFnRunner<>(
+            null,
+            fn,
+            null,
+            null,
+            null,
+            Collections.<TupleTag<?>>emptyList(),
+            mockStepContext,
+            null,
+            WindowingStrategy.of(windowFn));
+
+    Instant currentTime = new Instant(42);
+    Duration offset = Duration.millis(37);
+
+    // Mocking is not easily compatible with annotation analysis, so we manually record
+    // the method call.
+    runner.onTimer(
+        DoFnWithTimers.TIMER_ID,
+        GlobalWindow.INSTANCE,
+        currentTime.plus(offset),
+        TimeDomain.EVENT_TIME);
+
+    assertThat(
+        fn.onTimerInvocations,
+        contains(
+            TimerData.of(
+                DoFnWithTimers.TIMER_ID,
+                StateNamespaces.window(windowFn.windowCoder(), GlobalWindow.INSTANCE),
+                currentTime.plus(offset),
+                TimeDomain.EVENT_TIME)));
+  }
+
+  static class ThrowingDoFn extends DoFn<String, String> {
+    final Exception exceptionToThrow = new UnsupportedOperationException("Expected exception");
+
+    static final String TIMER_ID = "throwingTimerId";
+
+    @TimerId(TIMER_ID)
+    private static final TimerSpec timer = TimerSpecs.timer(TimeDomain.EVENT_TIME);
+
+    @ProcessElement
+    public void processElement(ProcessContext c) throws Exception {
+      throw exceptionToThrow;
+    }
+
+    @OnTimer(TIMER_ID)
+    public void onTimer(OnTimerContext context) throws Exception {
+      throw exceptionToThrow;
+    }
+  }
+
+  private static class DoFnWithTimers<W extends BoundedWindow> extends DoFn<String, String> {
+    static final String TIMER_ID = "testTimerId";
+
+    static final Duration TIMER_OFFSET = Duration.millis(100);
+
+    private final Coder<W> windowCoder;
+
+    // Mutable
+    List<TimerData> onTimerInvocations;
+
+    DoFnWithTimers(Coder<W> windowCoder) {
+      this.windowCoder = windowCoder;
+      this.onTimerInvocations = new ArrayList<>();
+    }
+
+    @TimerId(TIMER_ID)
+    private static final TimerSpec timer = TimerSpecs.timer(TimeDomain.EVENT_TIME);
+
+    @ProcessElement
+    public void process(ProcessContext context, @TimerId(TIMER_ID) Timer timer) {
+      timer.setForNowPlus(TIMER_OFFSET);
+    }
+
+    @OnTimer(TIMER_ID)
+    public void onTimer(OnTimerContext context) {
+      onTimerInvocations.add(
+          TimerData.of(
+              DoFnWithTimers.TIMER_ID,
+              StateNamespaces.window(windowCoder, (W) context.window()),
+              context.timestamp(),
+              context.timeDomain()));
+    }
+  }
+}


[5/5] incubator-beam git commit: This closes #1612: [BEAM-27] Support timer setting and receiving in SimpleDoFnRunner

Posted by ke...@apache.org.
This closes #1612: [BEAM-27] Support timer setting and receiving in SimpleDoFnRunner

  Use empty SideInputReader, fixes NPE in SimpleDoFnRunnerTest
  Test that SimpleDoFnRunner wraps exceptions in startBundle and finishBundle
  Add timer support to DoFnRunner(s)
  Make TimerSpec and StateSpec fields accessible


Project: http://git-wip-us.apache.org/repos/asf/incubator-beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-beam/commit/c28957d1
Tree: http://git-wip-us.apache.org/repos/asf/incubator-beam/tree/c28957d1
Diff: http://git-wip-us.apache.org/repos/asf/incubator-beam/diff/c28957d1

Branch: refs/heads/master
Commit: c28957d16fb0f63f82f578cf904df61bf7bb63e5
Parents: 5255a33 b78aa66
Author: Kenneth Knowles <kl...@google.com>
Authored: Sat Dec 17 12:59:25 2016 -0800
Committer: Kenneth Knowles <kl...@google.com>
Committed: Sat Dec 17 12:59:25 2016 -0800

----------------------------------------------------------------------
 .../apache/beam/runners/core/DoFnRunner.java    |   9 +
 .../core/LateDataDroppingDoFnRunner.java        |   7 +
 .../core/PushbackSideInputDoFnRunner.java       |   8 +
 .../beam/runners/core/SimpleDoFnRunner.java     | 236 ++++++++++++++-
 .../beam/runners/core/SimpleOldDoFnRunner.java  |   8 +
 .../core/PushbackSideInputDoFnRunnerTest.java   |  41 +++
 .../beam/runners/core/SimpleDoFnRunnerTest.java | 301 +++++++++++++++++++
 .../sdk/transforms/reflect/DoFnSignatures.java  |   4 +
 8 files changed, 613 insertions(+), 1 deletion(-)
----------------------------------------------------------------------



[2/5] incubator-beam git commit: Test that SimpleDoFnRunner wraps exceptions in startBundle and finishBundle

Posted by ke...@apache.org.
Test that SimpleDoFnRunner wraps exceptions in startBundle and finishBundle


Project: http://git-wip-us.apache.org/repos/asf/incubator-beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-beam/commit/3b4c7d10
Tree: http://git-wip-us.apache.org/repos/asf/incubator-beam/tree/3b4c7d10
Diff: http://git-wip-us.apache.org/repos/asf/incubator-beam/diff/3b4c7d10

Branch: refs/heads/master
Commit: 3b4c7d103c07e73d30b2ad534a17b3059232dbda
Parents: 8af13b0
Author: Kenneth Knowles <kl...@google.com>
Authored: Fri Dec 16 13:43:54 2016 -0800
Committer: Kenneth Knowles <kl...@google.com>
Committed: Fri Dec 16 20:14:19 2016 -0800

----------------------------------------------------------------------
 .../beam/runners/core/SimpleDoFnRunnerTest.java | 53 ++++++++++++++++++++
 1 file changed, 53 insertions(+)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/3b4c7d10/runners/core-java/src/test/java/org/apache/beam/runners/core/SimpleDoFnRunnerTest.java
----------------------------------------------------------------------
diff --git a/runners/core-java/src/test/java/org/apache/beam/runners/core/SimpleDoFnRunnerTest.java b/runners/core-java/src/test/java/org/apache/beam/runners/core/SimpleDoFnRunnerTest.java
index f068c19..837a162 100644
--- a/runners/core-java/src/test/java/org/apache/beam/runners/core/SimpleDoFnRunnerTest.java
+++ b/runners/core-java/src/test/java/org/apache/beam/runners/core/SimpleDoFnRunnerTest.java
@@ -151,6 +151,49 @@ public class SimpleDoFnRunnerTest {
             TimeDomain.EVENT_TIME);
   }
 
+  @Test
+  public void testStartBundleExceptionsWrappedAsUserCodeException() {
+    ThrowingDoFn fn = new ThrowingDoFn();
+    DoFnRunner<String, String> runner =
+        new SimpleDoFnRunner<>(
+            null,
+            fn,
+            null,
+            null,
+            null,
+            Collections.<TupleTag<?>>emptyList(),
+            mockStepContext,
+            null,
+            WindowingStrategy.of(new GlobalWindows()));
+
+    thrown.expect(UserCodeException.class);
+    thrown.expectCause(is(fn.exceptionToThrow));
+
+    runner.startBundle();
+  }
+
+  @Test
+  public void testFinishBundleExceptionsWrappedAsUserCodeException() {
+    ThrowingDoFn fn = new ThrowingDoFn();
+    DoFnRunner<String, String> runner =
+        new SimpleDoFnRunner<>(
+            null,
+            fn,
+            null,
+            null,
+            null,
+            Collections.<TupleTag<?>>emptyList(),
+            mockStepContext,
+            null,
+            WindowingStrategy.of(new GlobalWindows()));
+
+    thrown.expect(UserCodeException.class);
+    thrown.expectCause(is(fn.exceptionToThrow));
+
+    runner.finishBundle();
+  }
+
+
   /**
    * Tests that {@link SimpleDoFnRunner#onTimer} properly dispatches to the underlying
    * {@link DoFn}.
@@ -200,6 +243,16 @@ public class SimpleDoFnRunnerTest {
     @TimerId(TIMER_ID)
     private static final TimerSpec timer = TimerSpecs.timer(TimeDomain.EVENT_TIME);
 
+    @StartBundle
+    public void startBundle(Context c) throws Exception {
+      throw exceptionToThrow;
+    }
+
+    @FinishBundle
+    public void finishBundle(Context c) throws Exception {
+      throw exceptionToThrow;
+    }
+
     @ProcessElement
     public void processElement(ProcessContext c) throws Exception {
       throw exceptionToThrow;


[4/5] incubator-beam git commit: Use empty SideInputReader, fixes NPE in SimpleDoFnRunnerTest

Posted by ke...@apache.org.
Use empty SideInputReader, fixes NPE in SimpleDoFnRunnerTest


Project: http://git-wip-us.apache.org/repos/asf/incubator-beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-beam/commit/b78aa669
Tree: http://git-wip-us.apache.org/repos/asf/incubator-beam/tree/b78aa669
Diff: http://git-wip-us.apache.org/repos/asf/incubator-beam/diff/b78aa669

Branch: refs/heads/master
Commit: b78aa669831154f82266eb12ab795442c02f8977
Parents: 3b4c7d1
Author: Kenneth Knowles <kl...@google.com>
Authored: Fri Dec 16 20:57:06 2016 -0800
Committer: Kenneth Knowles <kl...@google.com>
Committed: Fri Dec 16 20:57:06 2016 -0800

----------------------------------------------------------------------
 .../apache/beam/runners/core/SimpleDoFnRunnerTest.java | 13 +++++++------
 1 file changed, 7 insertions(+), 6 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/b78aa669/runners/core-java/src/test/java/org/apache/beam/runners/core/SimpleDoFnRunnerTest.java
----------------------------------------------------------------------
diff --git a/runners/core-java/src/test/java/org/apache/beam/runners/core/SimpleDoFnRunnerTest.java b/runners/core-java/src/test/java/org/apache/beam/runners/core/SimpleDoFnRunnerTest.java
index 837a162..ec5d375 100644
--- a/runners/core-java/src/test/java/org/apache/beam/runners/core/SimpleDoFnRunnerTest.java
+++ b/runners/core-java/src/test/java/org/apache/beam/runners/core/SimpleDoFnRunnerTest.java
@@ -33,6 +33,7 @@ import org.apache.beam.sdk.transforms.windowing.GlobalWindow;
 import org.apache.beam.sdk.transforms.windowing.GlobalWindows;
 import org.apache.beam.sdk.transforms.windowing.WindowFn;
 import org.apache.beam.sdk.util.BaseExecutionContext.StepContext;
+import org.apache.beam.sdk.util.NullSideInputReader;
 import org.apache.beam.sdk.util.TimeDomain;
 import org.apache.beam.sdk.util.Timer;
 import org.apache.beam.sdk.util.TimerInternals;
@@ -77,7 +78,7 @@ public class SimpleDoFnRunnerTest {
         new SimpleDoFnRunner<>(
             null,
             fn,
-            null,
+            NullSideInputReader.empty(),
             null,
             null,
             Collections.<TupleTag<?>>emptyList(),
@@ -98,7 +99,7 @@ public class SimpleDoFnRunnerTest {
         new SimpleDoFnRunner<>(
             null,
             fn,
-            null,
+            NullSideInputReader.empty(),
             null,
             null,
             Collections.<TupleTag<?>>emptyList(),
@@ -129,7 +130,7 @@ public class SimpleDoFnRunnerTest {
         new SimpleDoFnRunner<>(
             null,
             fn,
-            null,
+            NullSideInputReader.empty(),
             null,
             null,
             Collections.<TupleTag<?>>emptyList(),
@@ -158,7 +159,7 @@ public class SimpleDoFnRunnerTest {
         new SimpleDoFnRunner<>(
             null,
             fn,
-            null,
+            NullSideInputReader.empty(),
             null,
             null,
             Collections.<TupleTag<?>>emptyList(),
@@ -179,7 +180,7 @@ public class SimpleDoFnRunnerTest {
         new SimpleDoFnRunner<>(
             null,
             fn,
-            null,
+            NullSideInputReader.empty(),
             null,
             null,
             Collections.<TupleTag<?>>emptyList(),
@@ -206,7 +207,7 @@ public class SimpleDoFnRunnerTest {
         new SimpleDoFnRunner<>(
             null,
             fn,
-            null,
+            NullSideInputReader.empty(),
             null,
             null,
             Collections.<TupleTag<?>>emptyList(),