You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@beam.apache.org by th...@apache.org on 2016/10/28 14:47:43 UTC
[09/50] incubator-beam git commit: Remove use of OldDoFn from some
DirectRunner tests
Remove use of OldDoFn from some DirectRunner tests
Project: http://git-wip-us.apache.org/repos/asf/incubator-beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-beam/commit/3d086857
Tree: http://git-wip-us.apache.org/repos/asf/incubator-beam/tree/3d086857
Diff: http://git-wip-us.apache.org/repos/asf/incubator-beam/diff/3d086857
Branch: refs/heads/apex-runner
Commit: 3d086857de87734b087076dad3eca92f625bb417
Parents: 4051357
Author: Kenneth Knowles <kl...@google.com>
Authored: Mon Oct 24 16:09:13 2016 -0700
Committer: Kenneth Knowles <kl...@google.com>
Committed: Tue Oct 25 13:12:17 2016 -0700
----------------------------------------------------------------------
.../ConsumerTrackingPipelineVisitorTest.java | 32 +++----
.../beam/runners/direct/DirectRunnerTest.java | 40 +++++----
.../ImmutabilityCheckingBundleFactoryTest.java | 8 +-
.../ImmutabilityEnforcementFactoryTest.java | 8 +-
.../direct/KeyedPValueTrackingVisitorTest.java | 8 +-
.../beam/runners/direct/ParDoEvaluatorTest.java | 8 +-
.../direct/ParDoMultiEvaluatorFactoryTest.java | 87 +++++++++---------
.../direct/ParDoSingleEvaluatorFactoryTest.java | 94 +++++++++-----------
.../runners/direct/WatermarkManagerTest.java | 8 +-
9 files changed, 139 insertions(+), 154 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/3d086857/runners/direct-java/src/test/java/org/apache/beam/runners/direct/ConsumerTrackingPipelineVisitorTest.java
----------------------------------------------------------------------
diff --git a/runners/direct-java/src/test/java/org/apache/beam/runners/direct/ConsumerTrackingPipelineVisitorTest.java b/runners/direct-java/src/test/java/org/apache/beam/runners/direct/ConsumerTrackingPipelineVisitorTest.java
index 1c9b5a6..e8f2a7e 100644
--- a/runners/direct-java/src/test/java/org/apache/beam/runners/direct/ConsumerTrackingPipelineVisitorTest.java
+++ b/runners/direct-java/src/test/java/org/apache/beam/runners/direct/ConsumerTrackingPipelineVisitorTest.java
@@ -26,8 +26,8 @@ import org.apache.beam.sdk.io.CountingInput;
import org.apache.beam.sdk.testing.TestPipeline;
import org.apache.beam.sdk.transforms.AppliedPTransform;
import org.apache.beam.sdk.transforms.Create;
+import org.apache.beam.sdk.transforms.DoFn;
import org.apache.beam.sdk.transforms.Flatten;
-import org.apache.beam.sdk.transforms.OldDoFn;
import org.apache.beam.sdk.transforms.PTransform;
import org.apache.beam.sdk.transforms.ParDo;
import org.apache.beam.sdk.transforms.View;
@@ -60,9 +60,9 @@ public class ConsumerTrackingPipelineVisitorTest implements Serializable {
p.apply("listCreate", Create.of("foo", "bar"))
.apply(
ParDo.of(
- new OldDoFn<String, String>() {
- @Override
- public void processElement(OldDoFn<String, String>.ProcessContext c)
+ new DoFn<String, String>() {
+ @ProcessElement
+ public void processElement(DoFn<String, String>.ProcessContext c)
throws Exception {
c.output(Integer.toString(c.element().length()));
}
@@ -107,9 +107,9 @@ public class ConsumerTrackingPipelineVisitorTest implements Serializable {
PCollection<String> transformed =
created.apply(
ParDo.of(
- new OldDoFn<String, String>() {
- @Override
- public void processElement(OldDoFn<String, String>.ProcessContext c)
+ new DoFn<String, String>() {
+ @ProcessElement
+ public void processElement(DoFn<String, String>.ProcessContext c)
throws Exception {
c.output(Integer.toString(c.element().length()));
}
@@ -138,9 +138,9 @@ public class ConsumerTrackingPipelineVisitorTest implements Serializable {
PCollection<String> transformed =
created.apply(
ParDo.of(
- new OldDoFn<String, String>() {
- @Override
- public void processElement(OldDoFn<String, String>.ProcessContext c)
+ new DoFn<String, String>() {
+ @ProcessElement
+ public void processElement(DoFn<String, String>.ProcessContext c)
throws Exception {
c.output(Integer.toString(c.element().length()));
}
@@ -155,9 +155,9 @@ public class ConsumerTrackingPipelineVisitorTest implements Serializable {
p.apply(Create.of("1", "2", "3"))
.apply(
ParDo.of(
- new OldDoFn<String, String>() {
- @Override
- public void processElement(OldDoFn<String, String>.ProcessContext c)
+ new DoFn<String, String>() {
+ @ProcessElement
+ public void processElement(DoFn<String, String>.ProcessContext c)
throws Exception {
c.output(Integer.toString(c.element().length()));
}
@@ -180,9 +180,9 @@ public class ConsumerTrackingPipelineVisitorTest implements Serializable {
PCollection<String> transformed =
created.apply(
ParDo.of(
- new OldDoFn<String, String>() {
- @Override
- public void processElement(OldDoFn<String, String>.ProcessContext c)
+ new DoFn<String, String>() {
+ @ProcessElement
+ public void processElement(DoFn<String, String>.ProcessContext c)
throws Exception {
c.output(Integer.toString(c.element().length()));
}
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/3d086857/runners/direct-java/src/test/java/org/apache/beam/runners/direct/DirectRunnerTest.java
----------------------------------------------------------------------
diff --git a/runners/direct-java/src/test/java/org/apache/beam/runners/direct/DirectRunnerTest.java b/runners/direct-java/src/test/java/org/apache/beam/runners/direct/DirectRunnerTest.java
index 4027d25..34a5469 100644
--- a/runners/direct-java/src/test/java/org/apache/beam/runners/direct/DirectRunnerTest.java
+++ b/runners/direct-java/src/test/java/org/apache/beam/runners/direct/DirectRunnerTest.java
@@ -59,7 +59,6 @@ import org.apache.beam.sdk.transforms.Create;
import org.apache.beam.sdk.transforms.DoFn;
import org.apache.beam.sdk.transforms.Flatten;
import org.apache.beam.sdk.transforms.MapElements;
-import org.apache.beam.sdk.transforms.OldDoFn;
import org.apache.beam.sdk.transforms.ParDo;
import org.apache.beam.sdk.transforms.SerializableFunction;
import org.apache.beam.sdk.transforms.SimpleFunction;
@@ -221,8 +220,8 @@ public class DirectRunnerTest implements Serializable {
@Test
public void transformDisplayDataExceptionShouldFail() {
- OldDoFn<Integer, Integer> brokenDoFn = new OldDoFn<Integer, Integer>() {
- @Override
+ DoFn<Integer, Integer> brokenDoFn = new DoFn<Integer, Integer>() {
+ @ProcessElement
public void processElement(ProcessContext c) throws Exception {}
@Override
@@ -242,7 +241,7 @@ public class DirectRunnerTest implements Serializable {
}
/**
- * Tests that a {@link OldDoFn} that mutates an output with a good equals() fails in the
+ * Tests that a {@link DoFn} that mutates an output with a good equals() fails in the
* {@link DirectRunner}.
*/
@Test
@@ -251,8 +250,9 @@ public class DirectRunnerTest implements Serializable {
pipeline
.apply(Create.of(42))
- .apply(ParDo.of(new OldDoFn<Integer, List<Integer>>() {
- @Override public void processElement(ProcessContext c) {
+ .apply(ParDo.of(new DoFn<Integer, List<Integer>>() {
+ @ProcessElement
+ public void processElement(ProcessContext c) {
List<Integer> outputList = Arrays.asList(1, 2, 3, 4);
c.output(outputList);
outputList.set(0, 37);
@@ -267,7 +267,7 @@ public class DirectRunnerTest implements Serializable {
}
/**
- * Tests that a {@link OldDoFn} that mutates an output with a good equals() fails in the
+ * Tests that a {@link DoFn} that mutates an output with a good equals() fails in the
* {@link DirectRunner}.
*/
@Test
@@ -276,8 +276,9 @@ public class DirectRunnerTest implements Serializable {
pipeline
.apply(Create.of(42))
- .apply(ParDo.of(new OldDoFn<Integer, List<Integer>>() {
- @Override public void processElement(ProcessContext c) {
+ .apply(ParDo.of(new DoFn<Integer, List<Integer>>() {
+ @ProcessElement
+ public void processElement(ProcessContext c) {
List<Integer> outputList = Arrays.asList(1, 2, 3, 4);
c.output(outputList);
outputList.set(0, 37);
@@ -291,7 +292,7 @@ public class DirectRunnerTest implements Serializable {
}
/**
- * Tests that a {@link OldDoFn} that mutates an output with a bad equals() still fails
+ * Tests that a {@link DoFn} that mutates an output with a bad equals() still fails
* in the {@link DirectRunner}.
*/
@Test
@@ -300,8 +301,9 @@ public class DirectRunnerTest implements Serializable {
pipeline
.apply(Create.of(42))
- .apply(ParDo.of(new OldDoFn<Integer, byte[]>() {
- @Override public void processElement(ProcessContext c) {
+ .apply(ParDo.of(new DoFn<Integer, byte[]>() {
+ @ProcessElement
+ public void processElement(ProcessContext c) {
byte[] outputArray = new byte[]{0x1, 0x2, 0x3};
c.output(outputArray);
outputArray[0] = 0xa;
@@ -316,7 +318,7 @@ public class DirectRunnerTest implements Serializable {
}
/**
- * Tests that a {@link OldDoFn} that mutates its input with a good equals() fails in the
+ * Tests that a {@link DoFn} that mutates its input with a good equals() fails in the
* {@link DirectRunner}.
*/
@Test
@@ -326,8 +328,9 @@ public class DirectRunnerTest implements Serializable {
pipeline
.apply(Create.of(Arrays.asList(1, 2, 3), Arrays.asList(4, 5, 6))
.withCoder(ListCoder.of(VarIntCoder.of())))
- .apply(ParDo.of(new OldDoFn<List<Integer>, Integer>() {
- @Override public void processElement(ProcessContext c) {
+ .apply(ParDo.of(new DoFn<List<Integer>, Integer>() {
+ @ProcessElement
+ public void processElement(ProcessContext c) {
List<Integer> inputList = c.element();
inputList.set(0, 37);
c.output(12);
@@ -341,7 +344,7 @@ public class DirectRunnerTest implements Serializable {
}
/**
- * Tests that a {@link OldDoFn} that mutates an input with a bad equals() still fails
+ * Tests that a {@link DoFn} that mutates an input with a bad equals() still fails
* in the {@link DirectRunner}.
*/
@Test
@@ -350,8 +353,9 @@ public class DirectRunnerTest implements Serializable {
pipeline
.apply(Create.of(new byte[]{0x1, 0x2, 0x3}, new byte[]{0x4, 0x5, 0x6}))
- .apply(ParDo.of(new OldDoFn<byte[], Integer>() {
- @Override public void processElement(ProcessContext c) {
+ .apply(ParDo.of(new DoFn<byte[], Integer>() {
+ @ProcessElement
+ public void processElement(ProcessContext c) {
byte[] inputArray = c.element();
inputArray[0] = 0xa;
c.output(13);
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/3d086857/runners/direct-java/src/test/java/org/apache/beam/runners/direct/ImmutabilityCheckingBundleFactoryTest.java
----------------------------------------------------------------------
diff --git a/runners/direct-java/src/test/java/org/apache/beam/runners/direct/ImmutabilityCheckingBundleFactoryTest.java b/runners/direct-java/src/test/java/org/apache/beam/runners/direct/ImmutabilityCheckingBundleFactoryTest.java
index d445944..ea44125 100644
--- a/runners/direct-java/src/test/java/org/apache/beam/runners/direct/ImmutabilityCheckingBundleFactoryTest.java
+++ b/runners/direct-java/src/test/java/org/apache/beam/runners/direct/ImmutabilityCheckingBundleFactoryTest.java
@@ -26,7 +26,7 @@ import org.apache.beam.sdk.coders.ByteArrayCoder;
import org.apache.beam.sdk.coders.StringUtf8Coder;
import org.apache.beam.sdk.testing.TestPipeline;
import org.apache.beam.sdk.transforms.Create;
-import org.apache.beam.sdk.transforms.OldDoFn;
+import org.apache.beam.sdk.transforms.DoFn;
import org.apache.beam.sdk.transforms.ParDo;
import org.apache.beam.sdk.transforms.windowing.IntervalWindow;
import org.apache.beam.sdk.transforms.windowing.PaneInfo;
@@ -179,9 +179,9 @@ public class ImmutabilityCheckingBundleFactoryTest {
intermediate.commit(Instant.now());
}
- private static class IdentityDoFn<T> extends OldDoFn<T, T> {
- @Override
- public void processElement(OldDoFn<T, T>.ProcessContext c) throws Exception {
+ private static class IdentityDoFn<T> extends DoFn<T, T> {
+ @ProcessElement
+ public void processElement(ProcessContext c) throws Exception {
c.output(c.element());
}
}
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/3d086857/runners/direct-java/src/test/java/org/apache/beam/runners/direct/ImmutabilityEnforcementFactoryTest.java
----------------------------------------------------------------------
diff --git a/runners/direct-java/src/test/java/org/apache/beam/runners/direct/ImmutabilityEnforcementFactoryTest.java b/runners/direct-java/src/test/java/org/apache/beam/runners/direct/ImmutabilityEnforcementFactoryTest.java
index 812d7d5..a7277fe 100644
--- a/runners/direct-java/src/test/java/org/apache/beam/runners/direct/ImmutabilityEnforcementFactoryTest.java
+++ b/runners/direct-java/src/test/java/org/apache/beam/runners/direct/ImmutabilityEnforcementFactoryTest.java
@@ -24,7 +24,7 @@ import org.apache.beam.sdk.testing.TestPipeline;
import org.apache.beam.sdk.transforms.AppliedPTransform;
import org.apache.beam.sdk.transforms.Count;
import org.apache.beam.sdk.transforms.Create;
-import org.apache.beam.sdk.transforms.OldDoFn;
+import org.apache.beam.sdk.transforms.DoFn;
import org.apache.beam.sdk.transforms.ParDo;
import org.apache.beam.sdk.util.IllegalMutationException;
import org.apache.beam.sdk.util.WindowedValue;
@@ -57,9 +57,9 @@ public class ImmutabilityEnforcementFactoryTest implements Serializable {
p.apply(Create.of("foo".getBytes(), "spamhameggs".getBytes()))
.apply(
ParDo.of(
- new OldDoFn<byte[], byte[]>() {
- @Override
- public void processElement(OldDoFn<byte[], byte[]>.ProcessContext c)
+ new DoFn<byte[], byte[]>() {
+ @ProcessElement
+ public void processElement(ProcessContext c)
throws Exception {
c.element()[0] = 'b';
}
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/3d086857/runners/direct-java/src/test/java/org/apache/beam/runners/direct/KeyedPValueTrackingVisitorTest.java
----------------------------------------------------------------------
diff --git a/runners/direct-java/src/test/java/org/apache/beam/runners/direct/KeyedPValueTrackingVisitorTest.java b/runners/direct-java/src/test/java/org/apache/beam/runners/direct/KeyedPValueTrackingVisitorTest.java
index ee6b2b4..cf65936 100644
--- a/runners/direct-java/src/test/java/org/apache/beam/runners/direct/KeyedPValueTrackingVisitorTest.java
+++ b/runners/direct-java/src/test/java/org/apache/beam/runners/direct/KeyedPValueTrackingVisitorTest.java
@@ -31,9 +31,9 @@ import org.apache.beam.sdk.coders.VarIntCoder;
import org.apache.beam.sdk.coders.VoidCoder;
import org.apache.beam.sdk.testing.TestPipeline;
import org.apache.beam.sdk.transforms.Create;
+import org.apache.beam.sdk.transforms.DoFn;
import org.apache.beam.sdk.transforms.GroupByKey;
import org.apache.beam.sdk.transforms.Keys;
-import org.apache.beam.sdk.transforms.OldDoFn;
import org.apache.beam.sdk.transforms.PTransform;
import org.apache.beam.sdk.transforms.ParDo;
import org.apache.beam.sdk.values.KV;
@@ -177,9 +177,9 @@ public class KeyedPValueTrackingVisitorTest {
}
}
- private static class IdentityFn<K> extends OldDoFn<K, K> {
- @Override
- public void processElement(OldDoFn<K, K>.ProcessContext c) throws Exception {
+ private static class IdentityFn<K> extends DoFn<K, K> {
+ @ProcessElement
+ public void processElement(ProcessContext c) throws Exception {
c.output(c.element());
}
}
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/3d086857/runners/direct-java/src/test/java/org/apache/beam/runners/direct/ParDoEvaluatorTest.java
----------------------------------------------------------------------
diff --git a/runners/direct-java/src/test/java/org/apache/beam/runners/direct/ParDoEvaluatorTest.java b/runners/direct-java/src/test/java/org/apache/beam/runners/direct/ParDoEvaluatorTest.java
index 1a742f0..6d00aa1 100644
--- a/runners/direct-java/src/test/java/org/apache/beam/runners/direct/ParDoEvaluatorTest.java
+++ b/runners/direct-java/src/test/java/org/apache/beam/runners/direct/ParDoEvaluatorTest.java
@@ -37,7 +37,7 @@ import org.apache.beam.runners.direct.WatermarkManager.TimerUpdate;
import org.apache.beam.sdk.testing.TestPipeline;
import org.apache.beam.sdk.transforms.AppliedPTransform;
import org.apache.beam.sdk.transforms.Create;
-import org.apache.beam.sdk.transforms.OldDoFn;
+import org.apache.beam.sdk.transforms.DoFn;
import org.apache.beam.sdk.transforms.ParDo;
import org.apache.beam.sdk.transforms.View;
import org.apache.beam.sdk.transforms.windowing.BoundedWindow;
@@ -168,7 +168,7 @@ public class ParDoEvaluatorTest {
ImmutableMap.<TupleTag<?>, PCollection<?>>of(mainOutputTag, output));
}
- private static class RecorderFn extends OldDoFn<Integer, Integer> {
+ private static class RecorderFn extends DoFn<Integer, Integer> {
private Collection<Integer> processed;
private final PCollectionView<Integer> view;
@@ -177,8 +177,8 @@ public class ParDoEvaluatorTest {
this.view = view;
}
- @Override
- public void processElement(OldDoFn<Integer, Integer>.ProcessContext c) throws Exception {
+ @ProcessElement
+ public void processElement(ProcessContext c) throws Exception {
processed.add(c.element());
c.output(c.element() + c.sideInput(view));
}
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/3d086857/runners/direct-java/src/test/java/org/apache/beam/runners/direct/ParDoMultiEvaluatorFactoryTest.java
----------------------------------------------------------------------
diff --git a/runners/direct-java/src/test/java/org/apache/beam/runners/direct/ParDoMultiEvaluatorFactoryTest.java b/runners/direct-java/src/test/java/org/apache/beam/runners/direct/ParDoMultiEvaluatorFactoryTest.java
index 8b0070b..cc83323 100644
--- a/runners/direct-java/src/test/java/org/apache/beam/runners/direct/ParDoMultiEvaluatorFactoryTest.java
+++ b/runners/direct-java/src/test/java/org/apache/beam/runners/direct/ParDoMultiEvaluatorFactoryTest.java
@@ -32,7 +32,7 @@ import org.apache.beam.runners.direct.WatermarkManager.TimerUpdate;
import org.apache.beam.sdk.coders.StringUtf8Coder;
import org.apache.beam.sdk.testing.TestPipeline;
import org.apache.beam.sdk.transforms.Create;
-import org.apache.beam.sdk.transforms.OldDoFn;
+import org.apache.beam.sdk.transforms.DoFn;
import org.apache.beam.sdk.transforms.ParDo;
import org.apache.beam.sdk.transforms.ParDo.BoundMulti;
import org.apache.beam.sdk.transforms.windowing.BoundedWindow;
@@ -41,11 +41,16 @@ import org.apache.beam.sdk.transforms.windowing.IntervalWindow;
import org.apache.beam.sdk.transforms.windowing.OutputTimeFns;
import org.apache.beam.sdk.transforms.windowing.PaneInfo;
import org.apache.beam.sdk.util.TimeDomain;
+import org.apache.beam.sdk.util.Timer;
import org.apache.beam.sdk.util.TimerInternals.TimerData;
+import org.apache.beam.sdk.util.TimerSpec;
+import org.apache.beam.sdk.util.TimerSpecs;
import org.apache.beam.sdk.util.WindowedValue;
import org.apache.beam.sdk.util.state.BagState;
import org.apache.beam.sdk.util.state.StateNamespace;
import org.apache.beam.sdk.util.state.StateNamespaces;
+import org.apache.beam.sdk.util.state.StateSpec;
+import org.apache.beam.sdk.util.state.StateSpecs;
import org.apache.beam.sdk.util.state.StateTag;
import org.apache.beam.sdk.util.state.StateTags;
import org.apache.beam.sdk.util.state.WatermarkHoldState;
@@ -81,8 +86,8 @@ public class ParDoMultiEvaluatorFactoryTest implements Serializable {
BoundMulti<String, KV<String, Integer>> pardo =
ParDo.of(
- new OldDoFn<String, KV<String, Integer>>() {
- @Override
+ new DoFn<String, KV<String, Integer>>() {
+ @ProcessElement
public void processElement(ProcessContext c) {
c.output(KV.<String, Integer>of(c.element(), c.element().length()));
c.sideOutput(elementTag, c.element());
@@ -170,8 +175,8 @@ public class ParDoMultiEvaluatorFactoryTest implements Serializable {
BoundMulti<String, KV<String, Integer>> pardo =
ParDo.of(
- new OldDoFn<String, KV<String, Integer>>() {
- @Override
+ new DoFn<String, KV<String, Integer>>() {
+ @ProcessElement
public void processElement(ProcessContext c) {
c.output(KV.<String, Integer>of(c.element(), c.element().length()));
c.sideOutput(elementTag, c.element());
@@ -258,20 +263,17 @@ public class ParDoMultiEvaluatorFactoryTest implements Serializable {
StateNamespaces.window(GlobalWindow.Coder.INSTANCE, GlobalWindow.INSTANCE);
BoundMulti<String, KV<String, Integer>> pardo =
ParDo.of(
- new OldDoFn<String, KV<String, Integer>>() {
- @Override
- public void processElement(ProcessContext c) {
- c.windowingInternals()
- .stateInternals()
- .state(StateNamespaces.global(), watermarkTag)
- .add(new Instant(20202L + c.element().length()));
- c.windowingInternals()
- .stateInternals()
- .state(
- StateNamespaces.window(
- GlobalWindow.Coder.INSTANCE, GlobalWindow.INSTANCE),
- bagTag)
- .add(c.element());
+ new DoFn<String, KV<String, Integer>>() {
+ private static final String STATE_ID = "my-state-id";
+
+ @StateId(STATE_ID)
+ private final StateSpec<Object, BagState<String>> bagSpec =
+ StateSpecs.bag(StringUtf8Coder.of());
+
+ @ProcessElement
+ public void processElement(
+ ProcessContext c, @StateId(STATE_ID) BagState<String> bagState) {
+ bagState.add(c.element());
}
})
.withOutputTags(mainOutputTag, TupleTagList.of(elementTag));
@@ -362,34 +364,25 @@ public class ParDoMultiEvaluatorFactoryTest implements Serializable {
BoundMulti<String, KV<String, Integer>> pardo =
ParDo.of(
- new OldDoFn<String, KV<String, Integer>>() {
- @Override
- public void processElement(ProcessContext c) {
- c.windowingInternals().stateInternals();
- c.windowingInternals()
- .timerInternals()
- .setTimer(
- TimerData.of(
- StateNamespaces.window(
- IntervalWindow.getCoder(),
- new IntervalWindow(
- new Instant(0).plus(Duration.standardMinutes(5)),
- new Instant(1)
- .plus(Duration.standardMinutes(5))
- .plus(Duration.standardHours(1)))),
- new Instant(54541L),
- TimeDomain.EVENT_TIME));
- c.windowingInternals()
- .timerInternals()
- .deleteTimer(
- TimerData.of(
- StateNamespaces.window(
- IntervalWindow.getCoder(),
- new IntervalWindow(
- new Instant(0),
- new Instant(0).plus(Duration.standardHours(1)))),
- new Instant(3400000),
- TimeDomain.SYNCHRONIZED_PROCESSING_TIME));
+ new DoFn<String, KV<String, Integer>>() {
+ private static final String EVENT_TIME_TIMER = "event-time-timer";
+ private static final String SYNC_PROC_TIME_TIMER = "sync-proc-time-timer";
+
+ @TimerId(EVENT_TIME_TIMER)
+ TimerSpec myTimerSpec = TimerSpecs.timer(TimeDomain.EVENT_TIME);
+
+ @TimerId(SYNC_PROC_TIME_TIMER)
+ TimerSpec syncProcTimerSpec =
+ TimerSpecs.timer(TimeDomain.SYNCHRONIZED_PROCESSING_TIME);
+
+ @ProcessElement
+ public void processElement(
+ ProcessContext c,
+ @TimerId(EVENT_TIME_TIMER) Timer eventTimeTimer,
+ @TimerId(SYNC_PROC_TIME_TIMER) Timer syncProcTimeTimer) {
+
+ eventTimeTimer.setForNowPlus(Duration.standardMinutes(5));
+ syncProcTimeTimer.cancel();
}
})
.withOutputTags(mainOutputTag, TupleTagList.of(elementTag));
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/3d086857/runners/direct-java/src/test/java/org/apache/beam/runners/direct/ParDoSingleEvaluatorFactoryTest.java
----------------------------------------------------------------------
diff --git a/runners/direct-java/src/test/java/org/apache/beam/runners/direct/ParDoSingleEvaluatorFactoryTest.java b/runners/direct-java/src/test/java/org/apache/beam/runners/direct/ParDoSingleEvaluatorFactoryTest.java
index e562b28..d22643a 100644
--- a/runners/direct-java/src/test/java/org/apache/beam/runners/direct/ParDoSingleEvaluatorFactoryTest.java
+++ b/runners/direct-java/src/test/java/org/apache/beam/runners/direct/ParDoSingleEvaluatorFactoryTest.java
@@ -32,22 +32,25 @@ import org.apache.beam.runners.direct.WatermarkManager.TimerUpdate;
import org.apache.beam.sdk.coders.StringUtf8Coder;
import org.apache.beam.sdk.testing.TestPipeline;
import org.apache.beam.sdk.transforms.Create;
-import org.apache.beam.sdk.transforms.OldDoFn;
+import org.apache.beam.sdk.transforms.DoFn;
import org.apache.beam.sdk.transforms.ParDo;
import org.apache.beam.sdk.transforms.windowing.BoundedWindow;
import org.apache.beam.sdk.transforms.windowing.GlobalWindow;
import org.apache.beam.sdk.transforms.windowing.IntervalWindow;
-import org.apache.beam.sdk.transforms.windowing.OutputTimeFns;
import org.apache.beam.sdk.transforms.windowing.PaneInfo;
import org.apache.beam.sdk.util.TimeDomain;
+import org.apache.beam.sdk.util.Timer;
import org.apache.beam.sdk.util.TimerInternals.TimerData;
+import org.apache.beam.sdk.util.TimerSpec;
+import org.apache.beam.sdk.util.TimerSpecs;
import org.apache.beam.sdk.util.WindowedValue;
import org.apache.beam.sdk.util.state.BagState;
import org.apache.beam.sdk.util.state.StateNamespace;
import org.apache.beam.sdk.util.state.StateNamespaces;
+import org.apache.beam.sdk.util.state.StateSpec;
+import org.apache.beam.sdk.util.state.StateSpecs;
import org.apache.beam.sdk.util.state.StateTag;
import org.apache.beam.sdk.util.state.StateTags;
-import org.apache.beam.sdk.util.state.WatermarkHoldState;
import org.apache.beam.sdk.values.KV;
import org.apache.beam.sdk.values.PCollection;
import org.apache.beam.sdk.values.TupleTag;
@@ -74,8 +77,8 @@ public class ParDoSingleEvaluatorFactoryTest implements Serializable {
PCollection<Integer> collection =
input.apply(
ParDo.of(
- new OldDoFn<String, Integer>() {
- @Override
+ new DoFn<String, Integer>() {
+ @ProcessElement
public void processElement(ProcessContext c) {
c.output(c.element().length());
}
@@ -128,8 +131,8 @@ public class ParDoSingleEvaluatorFactoryTest implements Serializable {
PCollection<Integer> collection =
input.apply(
ParDo.of(
- new OldDoFn<String, Integer>() {
- @Override
+ new DoFn<String, Integer>() {
+ @ProcessElement
public void processElement(ProcessContext c) {
c.sideOutput(sideOutputTag, c.element().length());
}
@@ -178,26 +181,22 @@ public class ParDoSingleEvaluatorFactoryTest implements Serializable {
PCollection<String> input = p.apply(Create.of("foo", "bara", "bazam"));
- final StateTag<Object, WatermarkHoldState<BoundedWindow>> watermarkTag =
- StateTags.watermarkStateInternal("myId", OutputTimeFns.outputAtEarliestInputTimestamp());
final StateTag<Object, BagState<String>> bagTag = StateTags.bag("myBag", StringUtf8Coder.of());
final StateNamespace windowNs =
StateNamespaces.window(GlobalWindow.Coder.INSTANCE, GlobalWindow.INSTANCE);
ParDo.Bound<String, KV<String, Integer>> pardo =
ParDo.of(
- new OldDoFn<String, KV<String, Integer>>() {
- @Override
- public void processElement(ProcessContext c) {
- c.windowingInternals()
- .stateInternals()
- .state(StateNamespaces.global(), watermarkTag)
- .add(new Instant(124443L - c.element().length()));
- c.windowingInternals()
- .stateInternals()
- .state(
- StateNamespaces.window(GlobalWindow.Coder.INSTANCE, GlobalWindow.INSTANCE),
- bagTag)
- .add(c.element());
+ new DoFn<String, KV<String, Integer>>() {
+ private static final String STATE_ID = "my-state-id";
+
+ @StateId(STATE_ID)
+ private final StateSpec<Object, BagState<String>> bagSpec =
+ StateSpecs.bag(StringUtf8Coder.of());
+
+ @ProcessElement
+ public void processElement(
+ ProcessContext c, @StateId(STATE_ID) BagState<String> bagState) {
+ bagState.add(c.element());
}
});
PCollection<KV<String, Integer>> mainOutput = input.apply(pardo);
@@ -237,9 +236,6 @@ public class ParDoSingleEvaluatorFactoryTest implements Serializable {
assertThat(result.getWatermarkHold(), equalTo(new Instant(124438L)));
assertThat(result.getState(), not(nullValue()));
assertThat(
- result.getState().state(StateNamespaces.global(), watermarkTag).read(),
- equalTo(new Instant(124438L)));
- assertThat(
result.getState().state(windowNs, bagTag).read(),
containsInAnyOrder("foo", "bara", "bazam"));
}
@@ -255,6 +251,8 @@ public class ParDoSingleEvaluatorFactoryTest implements Serializable {
PCollection<String> input = p.apply(Create.of("foo", "bara", "bazam"));
+ // TODO: this timer data is absolute, but the new API only support relative settings.
+ // It will require adjustments when @Ignore is removed
final TimerData addedTimer =
TimerData.of(
StateNamespaces.window(
@@ -276,34 +274,24 @@ public class ParDoSingleEvaluatorFactoryTest implements Serializable {
ParDo.Bound<String, KV<String, Integer>> pardo =
ParDo.of(
- new OldDoFn<String, KV<String, Integer>>() {
- @Override
- public void processElement(ProcessContext c) {
- c.windowingInternals().stateInternals();
- c.windowingInternals()
- .timerInternals()
- .setTimer(
- TimerData.of(
- StateNamespaces.window(
- IntervalWindow.getCoder(),
- new IntervalWindow(
- new Instant(0).plus(Duration.standardMinutes(5)),
- new Instant(1)
- .plus(Duration.standardMinutes(5))
- .plus(Duration.standardHours(1)))),
- new Instant(54541L),
- TimeDomain.EVENT_TIME));
- c.windowingInternals()
- .timerInternals()
- .deleteTimer(
- TimerData.of(
- StateNamespaces.window(
- IntervalWindow.getCoder(),
- new IntervalWindow(
- new Instant(0),
- new Instant(0).plus(Duration.standardHours(1)))),
- new Instant(3400000),
- TimeDomain.SYNCHRONIZED_PROCESSING_TIME));
+ new DoFn<String, KV<String, Integer>>() {
+ private static final String EVENT_TIME_TIMER = "event-time-timer";
+ private static final String SYNC_PROC_TIME_TIMER = "sync-proc-time-timer";
+
+ @TimerId(EVENT_TIME_TIMER)
+ TimerSpec myTimerSpec = TimerSpecs.timer(TimeDomain.EVENT_TIME);
+
+ @TimerId(SYNC_PROC_TIME_TIMER)
+ TimerSpec syncProcTimerSpec =
+ TimerSpecs.timer(TimeDomain.SYNCHRONIZED_PROCESSING_TIME);
+
+ @ProcessElement
+ public void processElement(
+ ProcessContext c,
+ @TimerId(EVENT_TIME_TIMER) Timer eventTimeTimer,
+ @TimerId(SYNC_PROC_TIME_TIMER) Timer syncProcTimeTimer) {
+ eventTimeTimer.setForNowPlus(Duration.standardMinutes(5));
+ syncProcTimeTimer.cancel();
}
});
PCollection<KV<String, Integer>> mainOutput = input.apply(pardo);
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/3d086857/runners/direct-java/src/test/java/org/apache/beam/runners/direct/WatermarkManagerTest.java
----------------------------------------------------------------------
diff --git a/runners/direct-java/src/test/java/org/apache/beam/runners/direct/WatermarkManagerTest.java b/runners/direct-java/src/test/java/org/apache/beam/runners/direct/WatermarkManagerTest.java
index 042abab..1954005 100644
--- a/runners/direct-java/src/test/java/org/apache/beam/runners/direct/WatermarkManagerTest.java
+++ b/runners/direct-java/src/test/java/org/apache/beam/runners/direct/WatermarkManagerTest.java
@@ -47,9 +47,9 @@ import org.apache.beam.sdk.coders.VarLongCoder;
import org.apache.beam.sdk.testing.TestPipeline;
import org.apache.beam.sdk.transforms.AppliedPTransform;
import org.apache.beam.sdk.transforms.Create;
+import org.apache.beam.sdk.transforms.DoFn;
import org.apache.beam.sdk.transforms.Filter;
import org.apache.beam.sdk.transforms.Flatten;
-import org.apache.beam.sdk.transforms.OldDoFn;
import org.apache.beam.sdk.transforms.ParDo;
import org.apache.beam.sdk.transforms.WithKeys;
import org.apache.beam.sdk.transforms.windowing.BoundedWindow;
@@ -101,9 +101,9 @@ public class WatermarkManagerTest implements Serializable {
createdInts = p.apply("createdInts", Create.of(1, 2, 3));
filtered = createdInts.apply("filtered", Filter.greaterThan(1));
- filteredTimesTwo = filtered.apply("timesTwo", ParDo.of(new OldDoFn<Integer, Integer>() {
- @Override
- public void processElement(OldDoFn<Integer, Integer>.ProcessContext c) throws Exception {
+ filteredTimesTwo = filtered.apply("timesTwo", ParDo.of(new DoFn<Integer, Integer>() {
+ @ProcessElement
+ public void processElement(ProcessContext c) throws Exception {
c.output(c.element() * 2);
}
}));