You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@beam.apache.org by ke...@apache.org on 2017/01/03 20:55:42 UTC
[1/3] beam git commit: No longer reject timers for ParDo in direct
runner
Repository: beam
Updated Branches:
refs/heads/master 467e38596 -> 0616245e6
No longer reject timers for ParDo in direct runner
Project: http://git-wip-us.apache.org/repos/asf/beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/beam/commit/27b016d9
Tree: http://git-wip-us.apache.org/repos/asf/beam/tree/27b016d9
Diff: http://git-wip-us.apache.org/repos/asf/beam/diff/27b016d9
Branch: refs/heads/master
Commit: 27b016d9cbd422f5cb7a4087d5adaf023c2c04b9
Parents: d4cc0c3
Author: Kenneth Knowles <kl...@google.com>
Authored: Thu Dec 8 10:27:23 2016 -0800
Committer: Kenneth Knowles <kl...@google.com>
Committed: Tue Jan 3 06:52:40 2017 -0800
----------------------------------------------------------------------
runners/direct-java/pom.xml | 1 -
.../beam/runners/direct/ParDoMultiOverrideFactory.java | 13 ++-----------
2 files changed, 2 insertions(+), 12 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/beam/blob/27b016d9/runners/direct-java/pom.xml
----------------------------------------------------------------------
diff --git a/runners/direct-java/pom.xml b/runners/direct-java/pom.xml
index 3578907..0d44136 100644
--- a/runners/direct-java/pom.xml
+++ b/runners/direct-java/pom.xml
@@ -68,7 +68,6 @@
</goals>
<configuration>
<groups>org.apache.beam.sdk.testing.NeedsRunner</groups>
- <excludedGroups>org.apache.beam.sdk.testing.UsesTimersInParDo</excludedGroups>
<parallel>none</parallel>
<failIfNoTests>true</failIfNoTests>
<dependenciesToScan>
http://git-wip-us.apache.org/repos/asf/beam/blob/27b016d9/runners/direct-java/src/main/java/org/apache/beam/runners/direct/ParDoMultiOverrideFactory.java
----------------------------------------------------------------------
diff --git a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/ParDoMultiOverrideFactory.java b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/ParDoMultiOverrideFactory.java
index b35df87..ceb35ec 100644
--- a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/ParDoMultiOverrideFactory.java
+++ b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/ParDoMultiOverrideFactory.java
@@ -64,17 +64,8 @@ class ParDoMultiOverrideFactory<InputT, OutputT>
DoFnSignature signature = DoFnSignatures.getSignature(fn.getClass());
if (signature.processElement().isSplittable()) {
return new SplittableParDo(transform);
- } else if (signature.timerDeclarations().size() > 0) {
- // Temporarily actually reject timers
- throw new UnsupportedOperationException(
- String.format(
- "Found %s annotations on %s, but %s cannot yet be used with timers in the %s.",
- DoFn.TimerId.class.getSimpleName(),
- fn.getClass().getName(),
- DoFn.class.getSimpleName(),
- DirectRunner.class.getSimpleName()));
-
- } else if (signature.stateDeclarations().size() > 0) {
+ } else if (signature.stateDeclarations().size() > 0
+ || signature.timerDeclarations().size() > 0) {
// Based on the fact that the signature is stateful, DoFnSignatures ensures
// that it is also keyed
ParDo.BoundMulti<KV<?, ?>, OutputT> keyedTransform =
[3/3] beam git commit: This closes #1667: Support user timers for
ParDo in the direct runner
Posted by ke...@apache.org.
This closes #1667: Support user timers for ParDo in the direct runner
No longer reject timers for ParDo in direct runner
Deliver timers in the direct runner
Project: http://git-wip-us.apache.org/repos/asf/beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/beam/commit/0616245e
Tree: http://git-wip-us.apache.org/repos/asf/beam/tree/0616245e
Diff: http://git-wip-us.apache.org/repos/asf/beam/diff/0616245e
Branch: refs/heads/master
Commit: 0616245e654c60ae94cc2c188f857b74a62d9b24
Parents: 467e385 27b016d
Author: Kenneth Knowles <kl...@google.com>
Authored: Tue Jan 3 12:55:17 2017 -0800
Committer: Kenneth Knowles <kl...@google.com>
Committed: Tue Jan 3 12:55:17 2017 -0800
----------------------------------------------------------------------
runners/direct-java/pom.xml | 1 -
...ecycleManagerRemovingTransformEvaluator.java | 19 +++-
.../beam/runners/direct/ParDoEvaluator.java | 9 ++
.../runners/direct/ParDoEvaluatorFactory.java | 2 +-
.../direct/ParDoMultiOverrideFactory.java | 13 +--
.../direct/StatefulParDoEvaluatorFactory.java | 15 ++-
...leManagerRemovingTransformEvaluatorTest.java | 103 +++++++++----------
.../apache/beam/sdk/transforms/ParDoTest.java | 74 ++++++++++++-
8 files changed, 162 insertions(+), 74 deletions(-)
----------------------------------------------------------------------
[2/3] beam git commit: Deliver timers in the direct runner
Posted by ke...@apache.org.
Deliver timers in the direct runner
Project: http://git-wip-us.apache.org/repos/asf/beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/beam/commit/d4cc0c33
Tree: http://git-wip-us.apache.org/repos/asf/beam/tree/d4cc0c33
Diff: http://git-wip-us.apache.org/repos/asf/beam/diff/d4cc0c33
Branch: refs/heads/master
Commit: d4cc0c33fd97d4d7a65412432548d42172b58aa0
Parents: 70ff6bf
Author: Kenneth Knowles <kl...@google.com>
Authored: Thu Dec 8 15:18:44 2016 -0800
Committer: Kenneth Knowles <kl...@google.com>
Committed: Tue Jan 3 06:52:40 2017 -0800
----------------------------------------------------------------------
...ecycleManagerRemovingTransformEvaluator.java | 19 +++-
.../beam/runners/direct/ParDoEvaluator.java | 9 ++
.../runners/direct/ParDoEvaluatorFactory.java | 2 +-
.../direct/StatefulParDoEvaluatorFactory.java | 15 ++-
...leManagerRemovingTransformEvaluatorTest.java | 103 +++++++++----------
.../apache/beam/sdk/transforms/ParDoTest.java | 74 ++++++++++++-
6 files changed, 160 insertions(+), 62 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/beam/blob/d4cc0c33/runners/direct-java/src/main/java/org/apache/beam/runners/direct/DoFnLifecycleManagerRemovingTransformEvaluator.java
----------------------------------------------------------------------
diff --git a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/DoFnLifecycleManagerRemovingTransformEvaluator.java b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/DoFnLifecycleManagerRemovingTransformEvaluator.java
index fb13b0f..226e499 100644
--- a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/DoFnLifecycleManagerRemovingTransformEvaluator.java
+++ b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/DoFnLifecycleManagerRemovingTransformEvaluator.java
@@ -18,6 +18,8 @@
package org.apache.beam.runners.direct;
+import org.apache.beam.sdk.transforms.windowing.BoundedWindow;
+import org.apache.beam.sdk.util.TimerInternals.TimerData;
import org.apache.beam.sdk.util.WindowedValue;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -29,16 +31,16 @@ import org.slf4j.LoggerFactory;
class DoFnLifecycleManagerRemovingTransformEvaluator<InputT> implements TransformEvaluator<InputT> {
private static final Logger LOG =
LoggerFactory.getLogger(DoFnLifecycleManagerRemovingTransformEvaluator.class);
- private final TransformEvaluator<InputT> underlying;
+ private final ParDoEvaluator<InputT, ?> underlying;
private final DoFnLifecycleManager lifecycleManager;
- public static <InputT> TransformEvaluator<InputT> wrapping(
- TransformEvaluator<InputT> underlying, DoFnLifecycleManager lifecycleManager) {
+ public static <InputT> DoFnLifecycleManagerRemovingTransformEvaluator<InputT> wrapping(
+ ParDoEvaluator<InputT, ?> underlying, DoFnLifecycleManager lifecycleManager) {
return new DoFnLifecycleManagerRemovingTransformEvaluator<>(underlying, lifecycleManager);
}
private DoFnLifecycleManagerRemovingTransformEvaluator(
- TransformEvaluator<InputT> underlying, DoFnLifecycleManager lifecycleManager) {
+ ParDoEvaluator<InputT, ?> underlying, DoFnLifecycleManager lifecycleManager) {
this.underlying = underlying;
this.lifecycleManager = lifecycleManager;
}
@@ -53,6 +55,15 @@ class DoFnLifecycleManagerRemovingTransformEvaluator<InputT> implements Transfor
}
}
+ public void onTimer(TimerData timer, BoundedWindow window) throws Exception {
+ try {
+ underlying.onTimer(timer, window);
+ } catch (Exception e) {
+ onException(e, "Exception encountered while cleaning up after processing a timer");
+ throw e;
+ }
+ }
+
@Override
public TransformResult<InputT> finishBundle() throws Exception {
try {
http://git-wip-us.apache.org/repos/asf/beam/blob/d4cc0c33/runners/direct-java/src/main/java/org/apache/beam/runners/direct/ParDoEvaluator.java
----------------------------------------------------------------------
diff --git a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/ParDoEvaluator.java b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/ParDoEvaluator.java
index a5de4c6..e146470 100644
--- a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/ParDoEvaluator.java
+++ b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/ParDoEvaluator.java
@@ -32,6 +32,7 @@ import org.apache.beam.runners.direct.DirectRunner.UncommittedBundle;
import org.apache.beam.sdk.transforms.AppliedPTransform;
import org.apache.beam.sdk.transforms.windowing.BoundedWindow;
import org.apache.beam.sdk.util.ReadyCheckingSideInputReader;
+import org.apache.beam.sdk.util.TimerInternals.TimerData;
import org.apache.beam.sdk.util.UserCodeException;
import org.apache.beam.sdk.util.WindowedValue;
import org.apache.beam.sdk.util.WindowingStrategy;
@@ -136,6 +137,14 @@ class ParDoEvaluator<InputT, OutputT> implements TransformEvaluator<InputT> {
}
}
+ public void onTimer(TimerData timer, BoundedWindow window) {
+ try {
+ fnRunner.onTimer(timer.getTimerId(), window, timer.getTimestamp(), timer.getDomain());
+ } catch (Exception e) {
+ throw UserCodeException.wrap(e);
+ }
+ }
+
@Override
public TransformResult<InputT> finishBundle() {
try {
http://git-wip-us.apache.org/repos/asf/beam/blob/d4cc0c33/runners/direct-java/src/main/java/org/apache/beam/runners/direct/ParDoEvaluatorFactory.java
----------------------------------------------------------------------
diff --git a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/ParDoEvaluatorFactory.java b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/ParDoEvaluatorFactory.java
index 835e6ce..2fc19b7 100644
--- a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/ParDoEvaluatorFactory.java
+++ b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/ParDoEvaluatorFactory.java
@@ -93,7 +93,7 @@ final class ParDoEvaluatorFactory<InputT, OutputT> implements TransformEvaluator
* correspond with the type in the unpacked {@link DoFn}, side inputs, and output tags.
*/
@SuppressWarnings({"unchecked", "rawtypes"})
- TransformEvaluator<InputT> createEvaluator(
+ DoFnLifecycleManagerRemovingTransformEvaluator<InputT> createEvaluator(
AppliedPTransform<PCollection<InputT>, PCollectionTuple, ?> application,
StructuralKey<?> inputBundleKey,
DoFn<InputT, OutputT> doFn,
http://git-wip-us.apache.org/repos/asf/beam/blob/d4cc0c33/runners/direct-java/src/main/java/org/apache/beam/runners/direct/StatefulParDoEvaluatorFactory.java
----------------------------------------------------------------------
diff --git a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/StatefulParDoEvaluatorFactory.java b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/StatefulParDoEvaluatorFactory.java
index 003df0f..9582d5c 100644
--- a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/StatefulParDoEvaluatorFactory.java
+++ b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/StatefulParDoEvaluatorFactory.java
@@ -21,6 +21,7 @@ import com.google.auto.value.AutoValue;
import com.google.common.cache.CacheBuilder;
import com.google.common.cache.CacheLoader;
import com.google.common.cache.LoadingCache;
+import com.google.common.collect.Iterables;
import com.google.common.collect.Lists;
import java.util.Collections;
import org.apache.beam.runners.core.KeyedWorkItem;
@@ -36,6 +37,7 @@ import org.apache.beam.sdk.transforms.reflect.DoFnSignature;
import org.apache.beam.sdk.transforms.reflect.DoFnSignature.StateDeclaration;
import org.apache.beam.sdk.transforms.reflect.DoFnSignatures;
import org.apache.beam.sdk.transforms.windowing.BoundedWindow;
+import org.apache.beam.sdk.util.TimerInternals.TimerData;
import org.apache.beam.sdk.util.WindowedValue;
import org.apache.beam.sdk.util.WindowingStrategy;
import org.apache.beam.sdk.util.state.StateNamespace;
@@ -104,7 +106,7 @@ final class StatefulParDoEvaluatorFactory<K, InputT, OutputT> implements Transfo
}
}
- TransformEvaluator<KV<K, InputT>> delegateEvaluator =
+ DoFnLifecycleManagerRemovingTransformEvaluator<KV<K, InputT>> delegateEvaluator =
delegateFactory.createEvaluator(
(AppliedPTransform) application,
inputBundle.getKey(),
@@ -210,9 +212,10 @@ final class StatefulParDoEvaluatorFactory<K, InputT, OutputT> implements Transfo
private static class StatefulParDoEvaluator<K, InputT>
implements TransformEvaluator<KeyedWorkItem<K, KV<K, InputT>>> {
- private final TransformEvaluator<KV<K, InputT>> delegateEvaluator;
+ private final DoFnLifecycleManagerRemovingTransformEvaluator<KV<K, InputT>> delegateEvaluator;
- public StatefulParDoEvaluator(TransformEvaluator<KV<K, InputT>> delegateEvaluator) {
+ public StatefulParDoEvaluator(
+ DoFnLifecycleManagerRemovingTransformEvaluator<KV<K, InputT>> delegateEvaluator) {
this.delegateEvaluator = delegateEvaluator;
}
@@ -220,9 +223,15 @@ final class StatefulParDoEvaluatorFactory<K, InputT, OutputT> implements Transfo
public void processElement(WindowedValue<KeyedWorkItem<K, KV<K, InputT>>> gbkResult)
throws Exception {
+ BoundedWindow window = Iterables.getOnlyElement(gbkResult.getWindows());
+
for (WindowedValue<KV<K, InputT>> windowedValue : gbkResult.getValue().elementsIterable()) {
delegateEvaluator.processElement(windowedValue);
}
+
+ for (TimerData timer : gbkResult.getValue().timersIterable()) {
+ delegateEvaluator.onTimer(timer, window);
+ }
}
@Override
http://git-wip-us.apache.org/repos/asf/beam/blob/d4cc0c33/runners/direct-java/src/test/java/org/apache/beam/runners/direct/DoFnLifecycleManagerRemovingTransformEvaluatorTest.java
----------------------------------------------------------------------
diff --git a/runners/direct-java/src/test/java/org/apache/beam/runners/direct/DoFnLifecycleManagerRemovingTransformEvaluatorTest.java b/runners/direct-java/src/test/java/org/apache/beam/runners/direct/DoFnLifecycleManagerRemovingTransformEvaluatorTest.java
index b5eec63..a9d51e8 100644
--- a/runners/direct-java/src/test/java/org/apache/beam/runners/direct/DoFnLifecycleManagerRemovingTransformEvaluatorTest.java
+++ b/runners/direct-java/src/test/java/org/apache/beam/runners/direct/DoFnLifecycleManagerRemovingTransformEvaluatorTest.java
@@ -18,26 +18,30 @@
package org.apache.beam.runners.direct;
-import static org.hamcrest.Matchers.containsInAnyOrder;
import static org.hamcrest.Matchers.not;
import static org.hamcrest.Matchers.nullValue;
-import static org.hamcrest.core.Is.is;
import static org.junit.Assert.assertThat;
import static org.junit.Assert.fail;
+import static org.mockito.Matchers.any;
+import static org.mockito.Mockito.doThrow;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.verify;
-import java.util.ArrayList;
-import java.util.List;
import org.apache.beam.sdk.transforms.DoFn;
+import org.apache.beam.sdk.transforms.windowing.BoundedWindow;
+import org.apache.beam.sdk.transforms.windowing.GlobalWindow;
+import org.apache.beam.sdk.util.TimeDomain;
+import org.apache.beam.sdk.util.TimerInternals.TimerData;
import org.apache.beam.sdk.util.WindowedValue;
+import org.apache.beam.sdk.util.state.StateNamespaces;
import org.hamcrest.Matchers;
+import org.joda.time.Instant;
import org.junit.Before;
import org.junit.Test;
import org.junit.runner.RunWith;
import org.junit.runners.JUnit4;
-/**
- * Tests for {@link DoFnLifecycleManagerRemovingTransformEvaluator}.
- */
+/** Tests for {@link DoFnLifecycleManagerRemovingTransformEvaluator}. */
@RunWith(JUnit4.class)
public class DoFnLifecycleManagerRemovingTransformEvaluatorTest {
private DoFnLifecycleManager lifecycleManager;
@@ -49,24 +53,28 @@ public class DoFnLifecycleManagerRemovingTransformEvaluatorTest {
@Test
public void delegatesToUnderlying() throws Exception {
- RecordingTransformEvaluator underlying = new RecordingTransformEvaluator();
+ ParDoEvaluator<Object, Object> underlying = mock(ParDoEvaluator.class);
DoFn<?, ?> original = lifecycleManager.get();
TransformEvaluator<Object> evaluator =
DoFnLifecycleManagerRemovingTransformEvaluator.wrapping(underlying, lifecycleManager);
WindowedValue<Object> first = WindowedValue.valueInGlobalWindow(new Object());
WindowedValue<Object> second = WindowedValue.valueInGlobalWindow(new Object());
+
evaluator.processElement(first);
- assertThat(underlying.objects, containsInAnyOrder(first));
+ verify(underlying).processElement(first);
+
evaluator.processElement(second);
- evaluator.finishBundle();
+ verify(underlying).processElement(second);
- assertThat(underlying.finishBundleCalled, is(true));
- assertThat(underlying.objects, containsInAnyOrder(second, first));
+ evaluator.finishBundle();
+ verify(underlying).finishBundle();
}
@Test
public void removesOnExceptionInProcessElement() throws Exception {
- ThrowingTransformEvaluator underlying = new ThrowingTransformEvaluator();
+ ParDoEvaluator<Object, Object> underlying = mock(ParDoEvaluator.class);
+ doThrow(Exception.class).when(underlying).processElement(any(WindowedValue.class));
+
DoFn<?, ?> original = lifecycleManager.get();
assertThat(original, not(nullValue()));
TransformEvaluator<Object> evaluator =
@@ -78,65 +86,54 @@ public class DoFnLifecycleManagerRemovingTransformEvaluatorTest {
assertThat(lifecycleManager.get(), not(Matchers.<DoFn<?, ?>>theInstance(original)));
return;
}
- fail("Expected ThrowingTransformEvaluator to throw on method call");
+ fail("Expected underlying evaluator to throw on method call");
}
@Test
- public void removesOnExceptionInFinishBundle() throws Exception {
- ThrowingTransformEvaluator underlying = new ThrowingTransformEvaluator();
+ public void removesOnExceptionInOnTimer() throws Exception {
+ ParDoEvaluator<Object, Object> underlying = mock(ParDoEvaluator.class);
+ doThrow(Exception.class)
+ .when(underlying)
+ .onTimer(any(TimerData.class), any(BoundedWindow.class));
+
DoFn<?, ?> original = lifecycleManager.get();
- // the LifecycleManager is set when the evaluator starts
assertThat(original, not(nullValue()));
- TransformEvaluator<Object> evaluator =
+ DoFnLifecycleManagerRemovingTransformEvaluator<Object> evaluator =
DoFnLifecycleManagerRemovingTransformEvaluator.wrapping(underlying, lifecycleManager);
try {
- evaluator.finishBundle();
+ evaluator.onTimer(
+ TimerData.of("foo", StateNamespaces.global(), new Instant(0), TimeDomain.EVENT_TIME),
+ GlobalWindow.INSTANCE);
} catch (Exception e) {
- assertThat(lifecycleManager.get(),
- Matchers.not(Matchers.<DoFn<?, ?>>theInstance(original)));
+ assertThat(lifecycleManager.get(), not(Matchers.<DoFn<?, ?>>theInstance(original)));
return;
}
- fail("Expected ThrowingTransformEvaluator to throw on method call");
+ fail("Expected underlying evaluator to throw on method call");
}
- private class RecordingTransformEvaluator implements TransformEvaluator<Object> {
- private boolean finishBundleCalled;
- private List<WindowedValue<Object>> objects;
-
- public RecordingTransformEvaluator() {
- this.finishBundleCalled = true;
- this.objects = new ArrayList<>();
- }
-
- @Override
- public void processElement(WindowedValue<Object> element) throws Exception {
- objects.add(element);
- }
-
- @Override
- public TransformResult<Object> finishBundle() throws Exception {
- finishBundleCalled = true;
- return null;
- }
- }
+ @Test
+ public void removesOnExceptionInFinishBundle() throws Exception {
+ ParDoEvaluator<Object, Object> underlying = mock(ParDoEvaluator.class);
+ doThrow(Exception.class).when(underlying).finishBundle();
- private class ThrowingTransformEvaluator implements TransformEvaluator<Object> {
- @Override
- public void processElement(WindowedValue<Object> element) throws Exception {
- throw new Exception();
- }
+ DoFn<?, ?> original = lifecycleManager.get();
+ // the LifecycleManager is set when the evaluator starts
+ assertThat(original, not(nullValue()));
+ TransformEvaluator<Object> evaluator =
+ DoFnLifecycleManagerRemovingTransformEvaluator.wrapping(underlying, lifecycleManager);
- @Override
- public TransformResult<Object> finishBundle() throws Exception {
- throw new Exception();
+ try {
+ evaluator.finishBundle();
+ } catch (Exception e) {
+ assertThat(lifecycleManager.get(), Matchers.not(Matchers.<DoFn<?, ?>>theInstance(original)));
+ return;
}
+ fail("Expected underlying evaluator to throw on method call");
}
-
private static class TestFn extends DoFn<Object, Object> {
@ProcessElement
- public void processElement(ProcessContext c) throws Exception {
- }
+ public void processElement(ProcessContext c) throws Exception {}
}
}
http://git-wip-us.apache.org/repos/asf/beam/blob/d4cc0c33/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/ParDoTest.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/ParDoTest.java b/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/ParDoTest.java
index d95b2d0..2e3fb85 100644
--- a/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/ParDoTest.java
+++ b/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/ParDoTest.java
@@ -50,6 +50,8 @@ import java.util.Collections;
import java.util.List;
import org.apache.beam.sdk.coders.AtomicCoder;
import org.apache.beam.sdk.coders.CoderException;
+import org.apache.beam.sdk.coders.KvCoder;
+import org.apache.beam.sdk.coders.StringUtf8Coder;
import org.apache.beam.sdk.coders.VarIntCoder;
import org.apache.beam.sdk.io.CountingInput;
import org.apache.beam.sdk.testing.NeedsRunner;
@@ -58,6 +60,7 @@ import org.apache.beam.sdk.testing.RunnableOnService;
import org.apache.beam.sdk.testing.TestPipeline;
import org.apache.beam.sdk.testing.TestStream;
import org.apache.beam.sdk.testing.UsesStatefulParDo;
+import org.apache.beam.sdk.testing.UsesTestStream;
import org.apache.beam.sdk.testing.UsesTimersInParDo;
import org.apache.beam.sdk.transforms.DoFn.OnTimer;
import org.apache.beam.sdk.transforms.DoFn.ProcessElement;
@@ -1646,7 +1649,7 @@ public class ParDoTest implements Serializable {
*/
@Test
@Category({RunnableOnService.class, UsesTimersInParDo.class})
- public void testSimpleEventTimeTimer() throws Exception {
+ public void testEventTimeTimerBounded() throws Exception {
final String timerId = "foo";
DoFn<KV<String, Integer>, Integer> fn =
@@ -1673,6 +1676,75 @@ public class ParDoTest implements Serializable {
}
@Test
+ @Category({NeedsRunner.class, UsesTimersInParDo.class, UsesTestStream.class})
+ public void testSimpleProcessingTimerTimer() throws Exception {
+ final String timerId = "foo";
+
+ DoFn<KV<String, Integer>, Integer> fn =
+ new DoFn<KV<String, Integer>, Integer>() {
+
+ @TimerId(timerId)
+ private final TimerSpec spec = TimerSpecs.timer(TimeDomain.PROCESSING_TIME);
+
+ @ProcessElement
+ public void processElement(ProcessContext context, @TimerId(timerId) Timer timer) {
+ timer.setForNowPlus(Duration.standardSeconds(1));
+ context.output(3);
+ }
+
+ @OnTimer(timerId)
+ public void onTimer(OnTimerContext context) {
+ context.output(42);
+ }
+ };
+
+ TestStream<KV<String, Integer>> stream =
+ TestStream.create(KvCoder.of(StringUtf8Coder.of(), VarIntCoder.of()))
+ .addElements(KV.of("hello", 37))
+ .advanceProcessingTime(Duration.standardSeconds(2))
+ .advanceWatermarkToInfinity();
+
+ PCollection<Integer> output = pipeline.apply(stream).apply(ParDo.of(fn));
+ PAssert.that(output).containsInAnyOrder(3, 42);
+ pipeline.run();
+ }
+
+ @Test
+ @Category({NeedsRunner.class, UsesTimersInParDo.class, UsesTestStream.class})
+ public void testEventTimeTimerUnbounded() throws Exception {
+ final String timerId = "foo";
+
+ DoFn<KV<String, Integer>, Integer> fn =
+ new DoFn<KV<String, Integer>, Integer>() {
+
+ @TimerId(timerId)
+ private final TimerSpec spec = TimerSpecs.timer(TimeDomain.EVENT_TIME);
+
+ @ProcessElement
+ public void processElement(ProcessContext context, @TimerId(timerId) Timer timer) {
+ timer.setForNowPlus(Duration.standardSeconds(1));
+ context.output(3);
+ }
+
+ @OnTimer(timerId)
+ public void onTimer(OnTimerContext context) {
+ context.output(42);
+ }
+ };
+
+ TestStream<KV<String, Integer>> stream = TestStream.create(KvCoder
+ .of(StringUtf8Coder.of(), VarIntCoder.of()))
+ .advanceWatermarkTo(new Instant(0))
+ .addElements(KV.of("hello", 37))
+ .advanceWatermarkTo(new Instant(0).plus(Duration.standardSeconds(1)))
+ .advanceWatermarkToInfinity();
+
+ PCollection<Integer> output = pipeline.apply(stream).apply(ParDo.of(fn));
+ PAssert.that(output).containsInAnyOrder(3, 42);
+ pipeline.run();
+ }
+
+ @Test
public void testWithOutputTagsDisplayData() {
DoFn<String, String> fn = new DoFn<String, String>() {
@ProcessElement