You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@beam.apache.org by ke...@apache.org on 2016/12/21 20:23:22 UTC
[14/50] incubator-beam git commit: Add timer support to DoFnRunner(s)
Add timer support to DoFnRunner(s)
Project: http://git-wip-us.apache.org/repos/asf/incubator-beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-beam/commit/8af13b01
Tree: http://git-wip-us.apache.org/repos/asf/incubator-beam/tree/8af13b01
Diff: http://git-wip-us.apache.org/repos/asf/incubator-beam/diff/8af13b01
Branch: refs/heads/gearpump-runner
Commit: 8af13b0102cda6c68601efa4119723900d12ca5c
Parents: c1e1017
Author: Kenneth Knowles <kl...@google.com>
Authored: Wed Nov 23 14:21:40 2016 -0800
Committer: Kenneth Knowles <kl...@google.com>
Committed: Fri Dec 16 20:14:19 2016 -0800
----------------------------------------------------------------------
.../apache/beam/runners/core/DoFnRunner.java | 9 +
.../core/LateDataDroppingDoFnRunner.java | 7 +
.../core/PushbackSideInputDoFnRunner.java | 8 +
.../beam/runners/core/SimpleDoFnRunner.java | 236 +++++++++++++++++-
.../beam/runners/core/SimpleOldDoFnRunner.java | 8 +
.../core/PushbackSideInputDoFnRunnerTest.java | 41 +++
.../beam/runners/core/SimpleDoFnRunnerTest.java | 247 +++++++++++++++++++
7 files changed, 555 insertions(+), 1 deletion(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/8af13b01/runners/core-java/src/main/java/org/apache/beam/runners/core/DoFnRunner.java
----------------------------------------------------------------------
diff --git a/runners/core-java/src/main/java/org/apache/beam/runners/core/DoFnRunner.java b/runners/core-java/src/main/java/org/apache/beam/runners/core/DoFnRunner.java
index 501667e..7c73a34 100644
--- a/runners/core-java/src/main/java/org/apache/beam/runners/core/DoFnRunner.java
+++ b/runners/core-java/src/main/java/org/apache/beam/runners/core/DoFnRunner.java
@@ -20,8 +20,11 @@ package org.apache.beam.runners.core;
import org.apache.beam.sdk.transforms.Aggregator;
import org.apache.beam.sdk.transforms.DoFn;
import org.apache.beam.sdk.transforms.OldDoFn;
+import org.apache.beam.sdk.transforms.windowing.BoundedWindow;
+import org.apache.beam.sdk.util.TimeDomain;
import org.apache.beam.sdk.util.WindowedValue;
import org.apache.beam.sdk.values.KV;
+import org.joda.time.Instant;
/**
* An wrapper interface that represents the execution of a {@link DoFn}.
@@ -39,6 +42,12 @@ public interface DoFnRunner<InputT, OutputT> {
void processElement(WindowedValue<InputT> elem);
/**
+ * Calls a {@link DoFn DoFn's} {@link DoFn.OnTimer @OnTimer} method for the given timer
+ * in the given window.
+ */
+ void onTimer(String timerId, BoundedWindow window, Instant timestamp, TimeDomain timeDomain);
+
+ /**
* Calls a {@link DoFn DoFn's} {@link DoFn.FinishBundle @FinishBundle} method and performs
* additional tasks, such as flushing in-memory states.
*/
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/8af13b01/runners/core-java/src/main/java/org/apache/beam/runners/core/LateDataDroppingDoFnRunner.java
----------------------------------------------------------------------
diff --git a/runners/core-java/src/main/java/org/apache/beam/runners/core/LateDataDroppingDoFnRunner.java b/runners/core-java/src/main/java/org/apache/beam/runners/core/LateDataDroppingDoFnRunner.java
index 9bfe9ae..290171a 100644
--- a/runners/core-java/src/main/java/org/apache/beam/runners/core/LateDataDroppingDoFnRunner.java
+++ b/runners/core-java/src/main/java/org/apache/beam/runners/core/LateDataDroppingDoFnRunner.java
@@ -24,6 +24,7 @@ import com.google.common.collect.Iterables;
import org.apache.beam.sdk.transforms.Aggregator;
import org.apache.beam.sdk.transforms.OldDoFn;
import org.apache.beam.sdk.transforms.windowing.BoundedWindow;
+import org.apache.beam.sdk.util.TimeDomain;
import org.apache.beam.sdk.util.TimerInternals;
import org.apache.beam.sdk.util.WindowTracing;
import org.apache.beam.sdk.util.WindowedValue;
@@ -73,6 +74,12 @@ public class LateDataDroppingDoFnRunner<K, InputT, OutputT, W extends BoundedWin
}
@Override
+ public void onTimer(String timerId, BoundedWindow window, Instant timestamp,
+ TimeDomain timeDomain) {
+ doFnRunner.onTimer(timerId, window, timestamp, timeDomain);
+ }
+
+ @Override
public void finishBundle() {
doFnRunner.finishBundle();
}
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/8af13b01/runners/core-java/src/main/java/org/apache/beam/runners/core/PushbackSideInputDoFnRunner.java
----------------------------------------------------------------------
diff --git a/runners/core-java/src/main/java/org/apache/beam/runners/core/PushbackSideInputDoFnRunner.java b/runners/core-java/src/main/java/org/apache/beam/runners/core/PushbackSideInputDoFnRunner.java
index 0bb9153..2962832 100644
--- a/runners/core-java/src/main/java/org/apache/beam/runners/core/PushbackSideInputDoFnRunner.java
+++ b/runners/core-java/src/main/java/org/apache/beam/runners/core/PushbackSideInputDoFnRunner.java
@@ -25,8 +25,10 @@ import java.util.HashSet;
import java.util.Set;
import org.apache.beam.sdk.transforms.windowing.BoundedWindow;
import org.apache.beam.sdk.util.ReadyCheckingSideInputReader;
+import org.apache.beam.sdk.util.TimeDomain;
import org.apache.beam.sdk.util.WindowedValue;
import org.apache.beam.sdk.values.PCollectionView;
+import org.joda.time.Instant;
/**
* A {@link DoFnRunner} that can refuse to process elements that are not ready, instead returning
@@ -109,6 +111,12 @@ public class PushbackSideInputDoFnRunner<InputT, OutputT> implements DoFnRunner<
underlying.processElement(elem);
}
+ @Override
+ public void onTimer(String timerId, BoundedWindow window, Instant timestamp,
+ TimeDomain timeDomain) {
+ underlying.onTimer(timerId, window, timestamp, timeDomain);
+ }
+
/**
* Call the underlying {@link DoFnRunner#finishBundle()}.
*/
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/8af13b01/runners/core-java/src/main/java/org/apache/beam/runners/core/SimpleDoFnRunner.java
----------------------------------------------------------------------
diff --git a/runners/core-java/src/main/java/org/apache/beam/runners/core/SimpleDoFnRunner.java b/runners/core-java/src/main/java/org/apache/beam/runners/core/SimpleDoFnRunner.java
index 29ef3ef..a7d82bf 100644
--- a/runners/core-java/src/main/java/org/apache/beam/runners/core/SimpleDoFnRunner.java
+++ b/runners/core-java/src/main/java/org/apache/beam/runners/core/SimpleDoFnRunner.java
@@ -50,8 +50,10 @@ import org.apache.beam.sdk.transforms.windowing.WindowFn;
import org.apache.beam.sdk.util.ExecutionContext.StepContext;
import org.apache.beam.sdk.util.SideInputReader;
import org.apache.beam.sdk.util.SystemDoFnInternal;
+import org.apache.beam.sdk.util.TimeDomain;
import org.apache.beam.sdk.util.Timer;
import org.apache.beam.sdk.util.TimerInternals;
+import org.apache.beam.sdk.util.TimerSpec;
import org.apache.beam.sdk.util.UserCodeException;
import org.apache.beam.sdk.util.WindowedValue;
import org.apache.beam.sdk.util.WindowingInternals;
@@ -64,6 +66,7 @@ import org.apache.beam.sdk.util.state.StateSpec;
import org.apache.beam.sdk.util.state.StateTags;
import org.apache.beam.sdk.values.PCollectionView;
import org.apache.beam.sdk.values.TupleTag;
+import org.joda.time.Duration;
import org.joda.time.Instant;
import org.joda.time.format.PeriodFormat;
@@ -161,6 +164,35 @@ public class SimpleDoFnRunner<InputT, OutputT> implements DoFnRunner<InputT, Out
}
}
+ @Override
+ public void onTimer(
+ String timerId, BoundedWindow window, Instant timestamp, TimeDomain timeDomain) {
+
+ // The effective timestamp is when derived elements will have their timestamp set, if not
+ // otherwise specified. If this is an event time timer, then they have the timestamp of the
+ // timer itself. Otherwise, they are set to the input timestamp, which is by definition
+ // non-late.
+ Instant effectiveTimestamp;
+ switch (timeDomain) {
+ case EVENT_TIME:
+ effectiveTimestamp = timestamp;
+ break;
+
+ case PROCESSING_TIME:
+ case SYNCHRONIZED_PROCESSING_TIME:
+ effectiveTimestamp = context.stepContext.timerInternals().currentInputWatermarkTime();
+ break;
+
+ default:
+ throw new IllegalArgumentException(
+ String.format("Unknown time domain: %s", timeDomain));
+ }
+
+ OnTimerArgumentProvider<InputT, OutputT> argumentProvider =
+ new OnTimerArgumentProvider<>(fn, context, window, effectiveTimestamp, timeDomain);
+ invoker.invokeOnTimer(timerId, argumentProvider);
+ }
+
private void invokeProcessElement(WindowedValue<InputT> elem) {
final DoFnProcessContext<InputT, OutputT> processContext = createProcessContext(elem);
@@ -630,7 +662,13 @@ public class SimpleDoFnRunner<InputT, OutputT> implements DoFnRunner<InputT, Out
@Override
public Timer timer(String timerId) {
- throw new UnsupportedOperationException("Timer parameters are not supported.");
+ try {
+ TimerSpec spec =
+ (TimerSpec) signature.timerDeclarations().get(timerId).field().get(fn);
+ return new TimerInternalsTimer(getNamespace(), timerId, spec, stepContext.timerInternals());
+ } catch (IllegalAccessException e) {
+ throw new RuntimeException(e);
+ }
}
@Override
@@ -682,5 +720,201 @@ public class SimpleDoFnRunner<InputT, OutputT> implements DoFnRunner<InputT, Out
}
};
}
+
+ }
+
+ /**
+ * A concrete implementation of {@link DoFnInvoker.ArgumentProvider} used for running a {@link
+ * DoFn} on a timer.
+ *
+ * @param <InputT> the type of the {@link DoFn} (main) input elements
+ * @param <OutputT> the type of the {@link DoFn} (main) output elements
+ */
+ private class OnTimerArgumentProvider<InputT, OutputT>
+ extends DoFn<InputT, OutputT>.OnTimerContext
+ implements DoFnInvoker.ArgumentProvider<InputT, OutputT> {
+
+ final DoFn<InputT, OutputT> fn;
+ final DoFnContext<InputT, OutputT> context;
+ private final BoundedWindow window;
+ private final Instant timestamp;
+ private final TimeDomain timeDomain;
+
+ /** Lazily initialized; should only be accessed via {@link #getNamespace()}. */
+ private StateNamespace namespace;
+
+ /**
+ * The state namespace for this context.
+ *
+ * <p>Any call to {@link #getNamespace()} when more than one window is present will crash; this
+ * represents a bug in the runner or the {@link DoFnSignature}, since values must be in exactly
+ * one window when state or timers are relevant.
+ */
+ private StateNamespace getNamespace() {
+ if (namespace == null) {
+ namespace = StateNamespaces.window(windowCoder, window);
+ }
+ return namespace;
+ }
+
+ private OnTimerArgumentProvider(
+ DoFn<InputT, OutputT> fn,
+ DoFnContext<InputT, OutputT> context,
+ BoundedWindow window,
+ Instant timestamp,
+ TimeDomain timeDomain) {
+ fn.super();
+ this.fn = fn;
+ this.context = context;
+ this.window = window;
+ this.timestamp = timestamp;
+ this.timeDomain = timeDomain;
+ }
+
+ @Override
+ public Instant timestamp() {
+ return timestamp;
+ }
+
+ @Override
+ public BoundedWindow window() {
+ return window;
+ }
+
+ @Override
+ public TimeDomain timeDomain() {
+ return timeDomain;
+ }
+
+ @Override
+ public Context context(DoFn<InputT, OutputT> doFn) {
+ throw new UnsupportedOperationException("Context parameters are not supported.");
+ }
+
+ @Override
+ public ProcessContext processContext(DoFn<InputT, OutputT> doFn) {
+ throw new UnsupportedOperationException("ProcessContext parameters are not supported.");
+ }
+
+ @Override
+ public OnTimerContext onTimerContext(DoFn<InputT, OutputT> doFn) {
+ return this;
+ }
+
+ @Override
+ public InputProvider<InputT> inputProvider() {
+ throw new UnsupportedOperationException("InputProvider parameters are not supported.");
+ }
+
+ @Override
+ public OutputReceiver<OutputT> outputReceiver() {
+ throw new UnsupportedOperationException("OutputReceiver parameters are not supported.");
+ }
+
+ @Override
+ public <RestrictionT> RestrictionTracker<RestrictionT> restrictionTracker() {
+ throw new UnsupportedOperationException("RestrictionTracker parameters are not supported.");
+ }
+
+ @Override
+ public State state(String stateId) {
+ try {
+ StateSpec<?, ?> spec =
+ (StateSpec<?, ?>) signature.stateDeclarations().get(stateId).field().get(fn);
+ return stepContext
+ .stateInternals()
+ .state(getNamespace(), StateTags.tagForSpec(stateId, (StateSpec) spec));
+ } catch (IllegalAccessException e) {
+ throw new RuntimeException(e);
+ }
+ }
+
+ @Override
+ public Timer timer(String timerId) {
+ try {
+ TimerSpec spec =
+ (TimerSpec) signature.timerDeclarations().get(timerId).field().get(fn);
+ return new TimerInternalsTimer(getNamespace(), timerId, spec, stepContext.timerInternals());
+ } catch (IllegalAccessException e) {
+ throw new RuntimeException(e);
+ }
+ }
+
+ @Override
+ public PipelineOptions getPipelineOptions() {
+ return context.getPipelineOptions();
+ }
+
+ @Override
+ public void output(OutputT output) {
+ context.outputWithTimestamp(output, timestamp);
+ }
+
+ @Override
+ public void outputWithTimestamp(OutputT output, Instant timestamp) {
+ context.outputWithTimestamp(output, timestamp);
+ }
+
+ @Override
+ public <T> void sideOutput(TupleTag<T> tag, T output) {
+ context.sideOutputWithTimestamp(tag, output, timestamp);
+ }
+
+ @Override
+ public <T> void sideOutputWithTimestamp(TupleTag<T> tag, T output, Instant timestamp) {
+ context.sideOutputWithTimestamp(tag, output, timestamp);
+ }
+
+ @Override
+ protected <AggInputT, AggOutputT> Aggregator<AggInputT, AggOutputT> createAggregator(
+ String name,
+ CombineFn<AggInputT, ?, AggOutputT> combiner) {
+ throw new UnsupportedOperationException("Cannot createAggregator in @OnTimer method");
+ }
+
+ @Override
+ public WindowingInternals<InputT, OutputT> windowingInternals() {
+ throw new UnsupportedOperationException("WindowingInternals are unsupported.");
+ }
+ }
+
+ private static class TimerInternalsTimer implements Timer {
+ private final TimerInternals timerInternals;
+ private final String timerId;
+ private final TimerSpec spec;
+ private final StateNamespace namespace;
+
+ public TimerInternalsTimer(
+ StateNamespace namespace, String timerId, TimerSpec spec, TimerInternals timerInternals) {
+ this.namespace = namespace;
+ this.timerId = timerId;
+ this.spec = spec;
+ this.timerInternals = timerInternals;
+ }
+
+ @Override
+ public void setForNowPlus(Duration durationFromNow) {
+ timerInternals.setTimer(
+ namespace, timerId, getCurrentTime().plus(durationFromNow), spec.getTimeDomain());
+ }
+
+ @Override
+ public void cancel() {
+ timerInternals.deleteTimer(namespace, timerId);
+ }
+
+ private Instant getCurrentTime() {
+ switch(spec.getTimeDomain()) {
+ case EVENT_TIME:
+ return timerInternals.currentInputWatermarkTime();
+ case PROCESSING_TIME:
+ return timerInternals.currentProcessingTime();
+ case SYNCHRONIZED_PROCESSING_TIME:
+ return timerInternals.currentSynchronizedProcessingTime();
+ default:
+ throw new IllegalStateException(
+ String.format("Timer created for unknown time domain %s", spec.getTimeDomain()));
+ }
+ }
}
}
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/8af13b01/runners/core-java/src/main/java/org/apache/beam/runners/core/SimpleOldDoFnRunner.java
----------------------------------------------------------------------
diff --git a/runners/core-java/src/main/java/org/apache/beam/runners/core/SimpleOldDoFnRunner.java b/runners/core-java/src/main/java/org/apache/beam/runners/core/SimpleOldDoFnRunner.java
index 1048fdc..342a4a8 100644
--- a/runners/core-java/src/main/java/org/apache/beam/runners/core/SimpleOldDoFnRunner.java
+++ b/runners/core-java/src/main/java/org/apache/beam/runners/core/SimpleOldDoFnRunner.java
@@ -39,6 +39,7 @@ import org.apache.beam.sdk.transforms.windowing.WindowFn;
import org.apache.beam.sdk.util.ExecutionContext.StepContext;
import org.apache.beam.sdk.util.SideInputReader;
import org.apache.beam.sdk.util.SystemDoFnInternal;
+import org.apache.beam.sdk.util.TimeDomain;
import org.apache.beam.sdk.util.TimerInternals;
import org.apache.beam.sdk.util.UserCodeException;
import org.apache.beam.sdk.util.WindowedValue;
@@ -107,6 +108,13 @@ class SimpleOldDoFnRunner<InputT, OutputT> implements DoFnRunner<InputT, OutputT
}
}
+ @Override
+ public void onTimer(String timerId, BoundedWindow window, Instant timestamp,
+ TimeDomain timeDomain) {
+ throw new UnsupportedOperationException(
+ String.format("Timers are not supported by %s", OldDoFn.class.getSimpleName()));
+ }
+
private void invokeProcessElement(WindowedValue<InputT> elem) {
final OldDoFn<InputT, OutputT>.ProcessContext processContext = createProcessContext(elem);
// This can contain user code. Wrap it in case it throws an exception.
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/8af13b01/runners/core-java/src/test/java/org/apache/beam/runners/core/PushbackSideInputDoFnRunnerTest.java
----------------------------------------------------------------------
diff --git a/runners/core-java/src/test/java/org/apache/beam/runners/core/PushbackSideInputDoFnRunnerTest.java b/runners/core-java/src/test/java/org/apache/beam/runners/core/PushbackSideInputDoFnRunnerTest.java
index 176ab26..a1cdbf6 100644
--- a/runners/core-java/src/test/java/org/apache/beam/runners/core/PushbackSideInputDoFnRunnerTest.java
+++ b/runners/core-java/src/test/java/org/apache/beam/runners/core/PushbackSideInputDoFnRunnerTest.java
@@ -17,6 +17,7 @@
*/
package org.apache.beam.runners.core;
+import static org.hamcrest.Matchers.contains;
import static org.hamcrest.Matchers.containsInAnyOrder;
import static org.hamcrest.Matchers.emptyIterable;
import static org.hamcrest.Matchers.equalTo;
@@ -37,7 +38,10 @@ import org.apache.beam.sdk.transforms.windowing.PaneInfo;
import org.apache.beam.sdk.transforms.windowing.Window;
import org.apache.beam.sdk.util.IdentitySideInputWindowFn;
import org.apache.beam.sdk.util.ReadyCheckingSideInputReader;
+import org.apache.beam.sdk.util.TimeDomain;
+import org.apache.beam.sdk.util.TimerInternals.TimerData;
import org.apache.beam.sdk.util.WindowedValue;
+import org.apache.beam.sdk.util.state.StateNamespaces;
import org.apache.beam.sdk.values.PCollection;
import org.apache.beam.sdk.values.PCollectionView;
import org.hamcrest.Matchers;
@@ -215,8 +219,33 @@ public class PushbackSideInputDoFnRunnerTest {
assertThat(underlying.inputElems, containsInAnyOrder(multiWindow));
}
+ /** Tests that a call to onTimer gets delegated. */
+ @Test
+ public void testOnTimerCalled() {
+ PushbackSideInputDoFnRunner<Integer, Integer> runner =
+ createRunner(ImmutableList.<PCollectionView<?>>of());
+
+ String timerId = "fooTimer";
+ IntervalWindow window = new IntervalWindow(new Instant(4), new Instant(16));
+ Instant timestamp = new Instant(72);
+
+ // Mocking is not easily compatible with annotation analysis, so we manually record
+ // the method call.
+ runner.onTimer(timerId, window, new Instant(timestamp), TimeDomain.EVENT_TIME);
+
+ assertThat(
+ underlying.firedTimers,
+ contains(
+ TimerData.of(
+ timerId,
+ StateNamespaces.window(IntervalWindow.getCoder(), window),
+ timestamp,
+ TimeDomain.EVENT_TIME)));
+ }
+
private static class TestDoFnRunner<InputT, OutputT> implements DoFnRunner<InputT, OutputT> {
List<WindowedValue<InputT>> inputElems;
+ List<TimerData> firedTimers;
private boolean started = false;
private boolean finished = false;
@@ -224,6 +253,7 @@ public class PushbackSideInputDoFnRunnerTest {
public void startBundle() {
started = true;
inputElems = new ArrayList<>();
+ firedTimers = new ArrayList<>();
}
@Override
@@ -232,6 +262,17 @@ public class PushbackSideInputDoFnRunnerTest {
}
@Override
+ public void onTimer(String timerId, BoundedWindow window, Instant timestamp,
+ TimeDomain timeDomain) {
+ firedTimers.add(
+ TimerData.of(
+ timerId,
+ StateNamespaces.window(IntervalWindow.getCoder(), (IntervalWindow) window),
+ timestamp,
+ timeDomain));
+ }
+
+ @Override
public void finishBundle() {
finished = true;
}
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/8af13b01/runners/core-java/src/test/java/org/apache/beam/runners/core/SimpleDoFnRunnerTest.java
----------------------------------------------------------------------
diff --git a/runners/core-java/src/test/java/org/apache/beam/runners/core/SimpleDoFnRunnerTest.java b/runners/core-java/src/test/java/org/apache/beam/runners/core/SimpleDoFnRunnerTest.java
new file mode 100644
index 0000000..f068c19
--- /dev/null
+++ b/runners/core-java/src/test/java/org/apache/beam/runners/core/SimpleDoFnRunnerTest.java
@@ -0,0 +1,247 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.runners.core;
+
+import static org.hamcrest.Matchers.contains;
+import static org.hamcrest.Matchers.is;
+import static org.junit.Assert.assertThat;
+import static org.mockito.Mockito.verify;
+import static org.mockito.Mockito.when;
+
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.List;
+import org.apache.beam.sdk.coders.Coder;
+import org.apache.beam.sdk.transforms.DoFn;
+import org.apache.beam.sdk.transforms.windowing.BoundedWindow;
+import org.apache.beam.sdk.transforms.windowing.GlobalWindow;
+import org.apache.beam.sdk.transforms.windowing.GlobalWindows;
+import org.apache.beam.sdk.transforms.windowing.WindowFn;
+import org.apache.beam.sdk.util.BaseExecutionContext.StepContext;
+import org.apache.beam.sdk.util.TimeDomain;
+import org.apache.beam.sdk.util.Timer;
+import org.apache.beam.sdk.util.TimerInternals;
+import org.apache.beam.sdk.util.TimerInternals.TimerData;
+import org.apache.beam.sdk.util.TimerSpec;
+import org.apache.beam.sdk.util.TimerSpecs;
+import org.apache.beam.sdk.util.UserCodeException;
+import org.apache.beam.sdk.util.WindowedValue;
+import org.apache.beam.sdk.util.WindowingStrategy;
+import org.apache.beam.sdk.util.state.StateNamespaces;
+import org.apache.beam.sdk.values.TupleTag;
+import org.joda.time.Duration;
+import org.joda.time.Instant;
+import org.junit.Before;
+import org.junit.Rule;
+import org.junit.Test;
+import org.junit.rules.ExpectedException;
+import org.junit.runner.RunWith;
+import org.junit.runners.JUnit4;
+import org.mockito.Mock;
+import org.mockito.MockitoAnnotations;
+
+/** Tests for {@link SimpleDoFnRunner}. */
+@RunWith(JUnit4.class)
+public class SimpleDoFnRunnerTest {
+ @Rule public ExpectedException thrown = ExpectedException.none();
+
+ @Mock StepContext mockStepContext;
+
+ @Mock TimerInternals mockTimerInternals;
+
+ @Before
+ public void setup() {
+ MockitoAnnotations.initMocks(this);
+ when(mockStepContext.timerInternals()).thenReturn(mockTimerInternals);
+ }
+
+ @Test
+ public void testProcessElementExceptionsWrappedAsUserCodeException() {
+ ThrowingDoFn fn = new ThrowingDoFn();
+ DoFnRunner<String, String> runner =
+ new SimpleDoFnRunner<>(
+ null,
+ fn,
+ null,
+ null,
+ null,
+ Collections.<TupleTag<?>>emptyList(),
+ mockStepContext,
+ null,
+ WindowingStrategy.of(new GlobalWindows()));
+
+ thrown.expect(UserCodeException.class);
+ thrown.expectCause(is(fn.exceptionToThrow));
+
+ runner.processElement(WindowedValue.valueInGlobalWindow("anyValue"));
+ }
+
+ @Test
+ public void testOnTimerExceptionsWrappedAsUserCodeException() {
+ ThrowingDoFn fn = new ThrowingDoFn();
+ DoFnRunner<String, String> runner =
+ new SimpleDoFnRunner<>(
+ null,
+ fn,
+ null,
+ null,
+ null,
+ Collections.<TupleTag<?>>emptyList(),
+ mockStepContext,
+ null,
+ WindowingStrategy.of(new GlobalWindows()));
+
+ thrown.expect(UserCodeException.class);
+ thrown.expectCause(is(fn.exceptionToThrow));
+
+ runner.onTimer(
+ ThrowingDoFn.TIMER_ID,
+ GlobalWindow.INSTANCE,
+ new Instant(0),
+ TimeDomain.EVENT_TIME);
+ }
+
+ /**
+ * Tests that a users call to set a timer gets properly dispatched to the timer internals. From
+ * there on, it is the duty of the runner & step context to set it in whatever way is right for
+ * that runner.
+ */
+ @Test
+ public void testTimerSet() {
+ WindowFn<?, ?> windowFn = new GlobalWindows();
+ DoFnWithTimers<GlobalWindow> fn = new DoFnWithTimers(windowFn.windowCoder());
+ DoFnRunner<String, String> runner =
+ new SimpleDoFnRunner<>(
+ null,
+ fn,
+ null,
+ null,
+ null,
+ Collections.<TupleTag<?>>emptyList(),
+ mockStepContext,
+ null,
+ WindowingStrategy.of(new GlobalWindows()));
+
+ // Setting the timer needs the current time, as it is set relative
+ Instant currentTime = new Instant(42);
+ when(mockTimerInternals.currentInputWatermarkTime()).thenReturn(currentTime);
+
+ runner.processElement(WindowedValue.valueInGlobalWindow("anyValue"));
+
+ verify(mockTimerInternals)
+ .setTimer(
+ StateNamespaces.window(new GlobalWindows().windowCoder(), GlobalWindow.INSTANCE),
+ DoFnWithTimers.TIMER_ID,
+ currentTime.plus(DoFnWithTimers.TIMER_OFFSET),
+ TimeDomain.EVENT_TIME);
+ }
+
+ /**
+ * Tests that {@link SimpleDoFnRunner#onTimer} properly dispatches to the underlying
+ * {@link DoFn}.
+ */
+ @Test
+ public void testOnTimerCalled() {
+ WindowFn<?, GlobalWindow> windowFn = new GlobalWindows();
+ DoFnWithTimers<GlobalWindow> fn = new DoFnWithTimers(windowFn.windowCoder());
+ DoFnRunner<String, String> runner =
+ new SimpleDoFnRunner<>(
+ null,
+ fn,
+ null,
+ null,
+ null,
+ Collections.<TupleTag<?>>emptyList(),
+ mockStepContext,
+ null,
+ WindowingStrategy.of(windowFn));
+
+ Instant currentTime = new Instant(42);
+ Duration offset = Duration.millis(37);
+
+ // Mocking is not easily compatible with annotation analysis, so we manually record
+ // the method call.
+ runner.onTimer(
+ DoFnWithTimers.TIMER_ID,
+ GlobalWindow.INSTANCE,
+ currentTime.plus(offset),
+ TimeDomain.EVENT_TIME);
+
+ assertThat(
+ fn.onTimerInvocations,
+ contains(
+ TimerData.of(
+ DoFnWithTimers.TIMER_ID,
+ StateNamespaces.window(windowFn.windowCoder(), GlobalWindow.INSTANCE),
+ currentTime.plus(offset),
+ TimeDomain.EVENT_TIME)));
+ }
+
+ static class ThrowingDoFn extends DoFn<String, String> {
+ final Exception exceptionToThrow = new UnsupportedOperationException("Expected exception");
+
+ static final String TIMER_ID = "throwingTimerId";
+
+ @TimerId(TIMER_ID)
+ private static final TimerSpec timer = TimerSpecs.timer(TimeDomain.EVENT_TIME);
+
+ @ProcessElement
+ public void processElement(ProcessContext c) throws Exception {
+ throw exceptionToThrow;
+ }
+
+ @OnTimer(TIMER_ID)
+ public void onTimer(OnTimerContext context) throws Exception {
+ throw exceptionToThrow;
+ }
+ }
+
+ private static class DoFnWithTimers<W extends BoundedWindow> extends DoFn<String, String> {
+ static final String TIMER_ID = "testTimerId";
+
+ static final Duration TIMER_OFFSET = Duration.millis(100);
+
+ private final Coder<W> windowCoder;
+
+ // Mutable
+ List<TimerData> onTimerInvocations;
+
+ DoFnWithTimers(Coder<W> windowCoder) {
+ this.windowCoder = windowCoder;
+ this.onTimerInvocations = new ArrayList<>();
+ }
+
+ @TimerId(TIMER_ID)
+ private static final TimerSpec timer = TimerSpecs.timer(TimeDomain.EVENT_TIME);
+
+ @ProcessElement
+ public void process(ProcessContext context, @TimerId(TIMER_ID) Timer timer) {
+ timer.setForNowPlus(TIMER_OFFSET);
+ }
+
+ @OnTimer(TIMER_ID)
+ public void onTimer(OnTimerContext context) {
+ onTimerInvocations.add(
+ TimerData.of(
+ DoFnWithTimers.TIMER_ID,
+ StateNamespaces.window(windowCoder, (W) context.window()),
+ context.timestamp(),
+ context.timeDomain()));
+ }
+ }
+}