You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@beam.apache.org by ke...@apache.org on 2016/10/25 20:12:33 UTC

[2/4] incubator-beam git commit: Remove use of OldDoFn from some DirectRunner tests

Remove use of OldDoFn from some DirectRunner tests


Project: http://git-wip-us.apache.org/repos/asf/incubator-beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-beam/commit/3d086857
Tree: http://git-wip-us.apache.org/repos/asf/incubator-beam/tree/3d086857
Diff: http://git-wip-us.apache.org/repos/asf/incubator-beam/diff/3d086857

Branch: refs/heads/master
Commit: 3d086857de87734b087076dad3eca92f625bb417
Parents: 4051357
Author: Kenneth Knowles <kl...@google.com>
Authored: Mon Oct 24 16:09:13 2016 -0700
Committer: Kenneth Knowles <kl...@google.com>
Committed: Tue Oct 25 13:12:17 2016 -0700

----------------------------------------------------------------------
 .../ConsumerTrackingPipelineVisitorTest.java    | 32 +++----
 .../beam/runners/direct/DirectRunnerTest.java   | 40 +++++----
 .../ImmutabilityCheckingBundleFactoryTest.java  |  8 +-
 .../ImmutabilityEnforcementFactoryTest.java     |  8 +-
 .../direct/KeyedPValueTrackingVisitorTest.java  |  8 +-
 .../beam/runners/direct/ParDoEvaluatorTest.java |  8 +-
 .../direct/ParDoMultiEvaluatorFactoryTest.java  | 87 +++++++++---------
 .../direct/ParDoSingleEvaluatorFactoryTest.java | 94 +++++++++-----------
 .../runners/direct/WatermarkManagerTest.java    |  8 +-
 9 files changed, 139 insertions(+), 154 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/3d086857/runners/direct-java/src/test/java/org/apache/beam/runners/direct/ConsumerTrackingPipelineVisitorTest.java
----------------------------------------------------------------------
diff --git a/runners/direct-java/src/test/java/org/apache/beam/runners/direct/ConsumerTrackingPipelineVisitorTest.java b/runners/direct-java/src/test/java/org/apache/beam/runners/direct/ConsumerTrackingPipelineVisitorTest.java
index 1c9b5a6..e8f2a7e 100644
--- a/runners/direct-java/src/test/java/org/apache/beam/runners/direct/ConsumerTrackingPipelineVisitorTest.java
+++ b/runners/direct-java/src/test/java/org/apache/beam/runners/direct/ConsumerTrackingPipelineVisitorTest.java
@@ -26,8 +26,8 @@ import org.apache.beam.sdk.io.CountingInput;
 import org.apache.beam.sdk.testing.TestPipeline;
 import org.apache.beam.sdk.transforms.AppliedPTransform;
 import org.apache.beam.sdk.transforms.Create;
+import org.apache.beam.sdk.transforms.DoFn;
 import org.apache.beam.sdk.transforms.Flatten;
-import org.apache.beam.sdk.transforms.OldDoFn;
 import org.apache.beam.sdk.transforms.PTransform;
 import org.apache.beam.sdk.transforms.ParDo;
 import org.apache.beam.sdk.transforms.View;
@@ -60,9 +60,9 @@ public class ConsumerTrackingPipelineVisitorTest implements Serializable {
         p.apply("listCreate", Create.of("foo", "bar"))
             .apply(
                 ParDo.of(
-                    new OldDoFn<String, String>() {
-                      @Override
-                      public void processElement(OldDoFn<String, String>.ProcessContext c)
+                    new DoFn<String, String>() {
+                      @ProcessElement
+                      public void processElement(DoFn<String, String>.ProcessContext c)
                           throws Exception {
                         c.output(Integer.toString(c.element().length()));
                       }
@@ -107,9 +107,9 @@ public class ConsumerTrackingPipelineVisitorTest implements Serializable {
     PCollection<String> transformed =
         created.apply(
             ParDo.of(
-                new OldDoFn<String, String>() {
-                  @Override
-                  public void processElement(OldDoFn<String, String>.ProcessContext c)
+                new DoFn<String, String>() {
+                  @ProcessElement
+                  public void processElement(DoFn<String, String>.ProcessContext c)
                       throws Exception {
                     c.output(Integer.toString(c.element().length()));
                   }
@@ -138,9 +138,9 @@ public class ConsumerTrackingPipelineVisitorTest implements Serializable {
     PCollection<String> transformed =
         created.apply(
             ParDo.of(
-                new OldDoFn<String, String>() {
-                  @Override
-                  public void processElement(OldDoFn<String, String>.ProcessContext c)
+                new DoFn<String, String>() {
+                  @ProcessElement
+                  public void processElement(DoFn<String, String>.ProcessContext c)
                       throws Exception {
                     c.output(Integer.toString(c.element().length()));
                   }
@@ -155,9 +155,9 @@ public class ConsumerTrackingPipelineVisitorTest implements Serializable {
     p.apply(Create.of("1", "2", "3"))
         .apply(
             ParDo.of(
-                new OldDoFn<String, String>() {
-                  @Override
-                  public void processElement(OldDoFn<String, String>.ProcessContext c)
+                new DoFn<String, String>() {
+                  @ProcessElement
+                  public void processElement(DoFn<String, String>.ProcessContext c)
                       throws Exception {
                     c.output(Integer.toString(c.element().length()));
                   }
@@ -180,9 +180,9 @@ public class ConsumerTrackingPipelineVisitorTest implements Serializable {
     PCollection<String> transformed =
         created.apply(
             ParDo.of(
-                new OldDoFn<String, String>() {
-                  @Override
-                  public void processElement(OldDoFn<String, String>.ProcessContext c)
+                new DoFn<String, String>() {
+                  @ProcessElement
+                  public void processElement(DoFn<String, String>.ProcessContext c)
                       throws Exception {
                     c.output(Integer.toString(c.element().length()));
                   }

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/3d086857/runners/direct-java/src/test/java/org/apache/beam/runners/direct/DirectRunnerTest.java
----------------------------------------------------------------------
diff --git a/runners/direct-java/src/test/java/org/apache/beam/runners/direct/DirectRunnerTest.java b/runners/direct-java/src/test/java/org/apache/beam/runners/direct/DirectRunnerTest.java
index 4027d25..34a5469 100644
--- a/runners/direct-java/src/test/java/org/apache/beam/runners/direct/DirectRunnerTest.java
+++ b/runners/direct-java/src/test/java/org/apache/beam/runners/direct/DirectRunnerTest.java
@@ -59,7 +59,6 @@ import org.apache.beam.sdk.transforms.Create;
 import org.apache.beam.sdk.transforms.DoFn;
 import org.apache.beam.sdk.transforms.Flatten;
 import org.apache.beam.sdk.transforms.MapElements;
-import org.apache.beam.sdk.transforms.OldDoFn;
 import org.apache.beam.sdk.transforms.ParDo;
 import org.apache.beam.sdk.transforms.SerializableFunction;
 import org.apache.beam.sdk.transforms.SimpleFunction;
@@ -221,8 +220,8 @@ public class DirectRunnerTest implements Serializable {
 
   @Test
   public void transformDisplayDataExceptionShouldFail() {
-    OldDoFn<Integer, Integer> brokenDoFn = new OldDoFn<Integer, Integer>() {
-      @Override
+    DoFn<Integer, Integer> brokenDoFn = new DoFn<Integer, Integer>() {
+      @ProcessElement
       public void processElement(ProcessContext c) throws Exception {}
 
       @Override
@@ -242,7 +241,7 @@ public class DirectRunnerTest implements Serializable {
   }
 
   /**
-   * Tests that a {@link OldDoFn} that mutates an output with a good equals() fails in the
+   * Tests that a {@link DoFn} that mutates an output with a good equals() fails in the
    * {@link DirectRunner}.
    */
   @Test
@@ -251,8 +250,9 @@ public class DirectRunnerTest implements Serializable {
 
     pipeline
         .apply(Create.of(42))
-        .apply(ParDo.of(new OldDoFn<Integer, List<Integer>>() {
-          @Override public void processElement(ProcessContext c) {
+        .apply(ParDo.of(new DoFn<Integer, List<Integer>>() {
+          @ProcessElement
+          public void processElement(ProcessContext c) {
             List<Integer> outputList = Arrays.asList(1, 2, 3, 4);
             c.output(outputList);
             outputList.set(0, 37);
@@ -267,7 +267,7 @@ public class DirectRunnerTest implements Serializable {
   }
 
   /**
-   * Tests that a {@link OldDoFn} that mutates an output with a good equals() fails in the
+   * Tests that a {@link DoFn} that mutates an output with a good equals() fails in the
    * {@link DirectRunner}.
    */
   @Test
@@ -276,8 +276,9 @@ public class DirectRunnerTest implements Serializable {
 
     pipeline
         .apply(Create.of(42))
-        .apply(ParDo.of(new OldDoFn<Integer, List<Integer>>() {
-          @Override public void processElement(ProcessContext c) {
+        .apply(ParDo.of(new DoFn<Integer, List<Integer>>() {
+          @ProcessElement
+          public void processElement(ProcessContext c) {
             List<Integer> outputList = Arrays.asList(1, 2, 3, 4);
             c.output(outputList);
             outputList.set(0, 37);
@@ -291,7 +292,7 @@ public class DirectRunnerTest implements Serializable {
   }
 
   /**
-   * Tests that a {@link OldDoFn} that mutates an output with a bad equals() still fails
+   * Tests that a {@link DoFn} that mutates an output with a bad equals() still fails
    * in the {@link DirectRunner}.
    */
   @Test
@@ -300,8 +301,9 @@ public class DirectRunnerTest implements Serializable {
 
     pipeline
         .apply(Create.of(42))
-        .apply(ParDo.of(new OldDoFn<Integer, byte[]>() {
-          @Override public void processElement(ProcessContext c) {
+        .apply(ParDo.of(new DoFn<Integer, byte[]>() {
+          @ProcessElement
+          public void processElement(ProcessContext c) {
             byte[] outputArray = new byte[]{0x1, 0x2, 0x3};
             c.output(outputArray);
             outputArray[0] = 0xa;
@@ -316,7 +318,7 @@ public class DirectRunnerTest implements Serializable {
   }
 
   /**
-   * Tests that a {@link OldDoFn} that mutates its input with a good equals() fails in the
+   * Tests that a {@link DoFn} that mutates its input with a good equals() fails in the
    * {@link DirectRunner}.
    */
   @Test
@@ -326,8 +328,9 @@ public class DirectRunnerTest implements Serializable {
     pipeline
         .apply(Create.of(Arrays.asList(1, 2, 3), Arrays.asList(4, 5, 6))
             .withCoder(ListCoder.of(VarIntCoder.of())))
-        .apply(ParDo.of(new OldDoFn<List<Integer>, Integer>() {
-          @Override public void processElement(ProcessContext c) {
+        .apply(ParDo.of(new DoFn<List<Integer>, Integer>() {
+          @ProcessElement
+          public void processElement(ProcessContext c) {
             List<Integer> inputList = c.element();
             inputList.set(0, 37);
             c.output(12);
@@ -341,7 +344,7 @@ public class DirectRunnerTest implements Serializable {
   }
 
   /**
-   * Tests that a {@link OldDoFn} that mutates an input with a bad equals() still fails
+   * Tests that a {@link DoFn} that mutates an input with a bad equals() still fails
    * in the {@link DirectRunner}.
    */
   @Test
@@ -350,8 +353,9 @@ public class DirectRunnerTest implements Serializable {
 
     pipeline
         .apply(Create.of(new byte[]{0x1, 0x2, 0x3}, new byte[]{0x4, 0x5, 0x6}))
-        .apply(ParDo.of(new OldDoFn<byte[], Integer>() {
-          @Override public void processElement(ProcessContext c) {
+        .apply(ParDo.of(new DoFn<byte[], Integer>() {
+          @ProcessElement
+          public void processElement(ProcessContext c) {
             byte[] inputArray = c.element();
             inputArray[0] = 0xa;
             c.output(13);

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/3d086857/runners/direct-java/src/test/java/org/apache/beam/runners/direct/ImmutabilityCheckingBundleFactoryTest.java
----------------------------------------------------------------------
diff --git a/runners/direct-java/src/test/java/org/apache/beam/runners/direct/ImmutabilityCheckingBundleFactoryTest.java b/runners/direct-java/src/test/java/org/apache/beam/runners/direct/ImmutabilityCheckingBundleFactoryTest.java
index d445944..ea44125 100644
--- a/runners/direct-java/src/test/java/org/apache/beam/runners/direct/ImmutabilityCheckingBundleFactoryTest.java
+++ b/runners/direct-java/src/test/java/org/apache/beam/runners/direct/ImmutabilityCheckingBundleFactoryTest.java
@@ -26,7 +26,7 @@ import org.apache.beam.sdk.coders.ByteArrayCoder;
 import org.apache.beam.sdk.coders.StringUtf8Coder;
 import org.apache.beam.sdk.testing.TestPipeline;
 import org.apache.beam.sdk.transforms.Create;
-import org.apache.beam.sdk.transforms.OldDoFn;
+import org.apache.beam.sdk.transforms.DoFn;
 import org.apache.beam.sdk.transforms.ParDo;
 import org.apache.beam.sdk.transforms.windowing.IntervalWindow;
 import org.apache.beam.sdk.transforms.windowing.PaneInfo;
@@ -179,9 +179,9 @@ public class ImmutabilityCheckingBundleFactoryTest {
     intermediate.commit(Instant.now());
   }
 
-  private static class IdentityDoFn<T> extends OldDoFn<T, T> {
-    @Override
-    public void processElement(OldDoFn<T, T>.ProcessContext c) throws Exception {
+  private static class IdentityDoFn<T> extends DoFn<T, T> {
+    @ProcessElement
+    public void processElement(ProcessContext c) throws Exception {
       c.output(c.element());
     }
   }

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/3d086857/runners/direct-java/src/test/java/org/apache/beam/runners/direct/ImmutabilityEnforcementFactoryTest.java
----------------------------------------------------------------------
diff --git a/runners/direct-java/src/test/java/org/apache/beam/runners/direct/ImmutabilityEnforcementFactoryTest.java b/runners/direct-java/src/test/java/org/apache/beam/runners/direct/ImmutabilityEnforcementFactoryTest.java
index 812d7d5..a7277fe 100644
--- a/runners/direct-java/src/test/java/org/apache/beam/runners/direct/ImmutabilityEnforcementFactoryTest.java
+++ b/runners/direct-java/src/test/java/org/apache/beam/runners/direct/ImmutabilityEnforcementFactoryTest.java
@@ -24,7 +24,7 @@ import org.apache.beam.sdk.testing.TestPipeline;
 import org.apache.beam.sdk.transforms.AppliedPTransform;
 import org.apache.beam.sdk.transforms.Count;
 import org.apache.beam.sdk.transforms.Create;
-import org.apache.beam.sdk.transforms.OldDoFn;
+import org.apache.beam.sdk.transforms.DoFn;
 import org.apache.beam.sdk.transforms.ParDo;
 import org.apache.beam.sdk.util.IllegalMutationException;
 import org.apache.beam.sdk.util.WindowedValue;
@@ -57,9 +57,9 @@ public class ImmutabilityEnforcementFactoryTest implements Serializable {
         p.apply(Create.of("foo".getBytes(), "spamhameggs".getBytes()))
             .apply(
                 ParDo.of(
-                    new OldDoFn<byte[], byte[]>() {
-                      @Override
-                      public void processElement(OldDoFn<byte[], byte[]>.ProcessContext c)
+                    new DoFn<byte[], byte[]>() {
+                      @ProcessElement
+                      public void processElement(ProcessContext c)
                           throws Exception {
                         c.element()[0] = 'b';
                       }

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/3d086857/runners/direct-java/src/test/java/org/apache/beam/runners/direct/KeyedPValueTrackingVisitorTest.java
----------------------------------------------------------------------
diff --git a/runners/direct-java/src/test/java/org/apache/beam/runners/direct/KeyedPValueTrackingVisitorTest.java b/runners/direct-java/src/test/java/org/apache/beam/runners/direct/KeyedPValueTrackingVisitorTest.java
index ee6b2b4..cf65936 100644
--- a/runners/direct-java/src/test/java/org/apache/beam/runners/direct/KeyedPValueTrackingVisitorTest.java
+++ b/runners/direct-java/src/test/java/org/apache/beam/runners/direct/KeyedPValueTrackingVisitorTest.java
@@ -31,9 +31,9 @@ import org.apache.beam.sdk.coders.VarIntCoder;
 import org.apache.beam.sdk.coders.VoidCoder;
 import org.apache.beam.sdk.testing.TestPipeline;
 import org.apache.beam.sdk.transforms.Create;
+import org.apache.beam.sdk.transforms.DoFn;
 import org.apache.beam.sdk.transforms.GroupByKey;
 import org.apache.beam.sdk.transforms.Keys;
-import org.apache.beam.sdk.transforms.OldDoFn;
 import org.apache.beam.sdk.transforms.PTransform;
 import org.apache.beam.sdk.transforms.ParDo;
 import org.apache.beam.sdk.values.KV;
@@ -177,9 +177,9 @@ public class KeyedPValueTrackingVisitorTest {
     }
   }
 
-  private static class IdentityFn<K> extends OldDoFn<K, K> {
-    @Override
-    public void processElement(OldDoFn<K, K>.ProcessContext c) throws Exception {
+  private static class IdentityFn<K> extends DoFn<K, K> {
+    @ProcessElement
+    public void processElement(ProcessContext c) throws Exception {
       c.output(c.element());
     }
   }

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/3d086857/runners/direct-java/src/test/java/org/apache/beam/runners/direct/ParDoEvaluatorTest.java
----------------------------------------------------------------------
diff --git a/runners/direct-java/src/test/java/org/apache/beam/runners/direct/ParDoEvaluatorTest.java b/runners/direct-java/src/test/java/org/apache/beam/runners/direct/ParDoEvaluatorTest.java
index 1a742f0..6d00aa1 100644
--- a/runners/direct-java/src/test/java/org/apache/beam/runners/direct/ParDoEvaluatorTest.java
+++ b/runners/direct-java/src/test/java/org/apache/beam/runners/direct/ParDoEvaluatorTest.java
@@ -37,7 +37,7 @@ import org.apache.beam.runners.direct.WatermarkManager.TimerUpdate;
 import org.apache.beam.sdk.testing.TestPipeline;
 import org.apache.beam.sdk.transforms.AppliedPTransform;
 import org.apache.beam.sdk.transforms.Create;
-import org.apache.beam.sdk.transforms.OldDoFn;
+import org.apache.beam.sdk.transforms.DoFn;
 import org.apache.beam.sdk.transforms.ParDo;
 import org.apache.beam.sdk.transforms.View;
 import org.apache.beam.sdk.transforms.windowing.BoundedWindow;
@@ -168,7 +168,7 @@ public class ParDoEvaluatorTest {
         ImmutableMap.<TupleTag<?>, PCollection<?>>of(mainOutputTag, output));
   }
 
-  private static class RecorderFn extends OldDoFn<Integer, Integer> {
+  private static class RecorderFn extends DoFn<Integer, Integer> {
     private Collection<Integer> processed;
     private final PCollectionView<Integer> view;
 
@@ -177,8 +177,8 @@ public class ParDoEvaluatorTest {
       this.view = view;
     }
 
-    @Override
-    public void processElement(OldDoFn<Integer, Integer>.ProcessContext c) throws Exception {
+    @ProcessElement
+    public void processElement(ProcessContext c) throws Exception {
       processed.add(c.element());
       c.output(c.element() + c.sideInput(view));
     }

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/3d086857/runners/direct-java/src/test/java/org/apache/beam/runners/direct/ParDoMultiEvaluatorFactoryTest.java
----------------------------------------------------------------------
diff --git a/runners/direct-java/src/test/java/org/apache/beam/runners/direct/ParDoMultiEvaluatorFactoryTest.java b/runners/direct-java/src/test/java/org/apache/beam/runners/direct/ParDoMultiEvaluatorFactoryTest.java
index 8b0070b..cc83323 100644
--- a/runners/direct-java/src/test/java/org/apache/beam/runners/direct/ParDoMultiEvaluatorFactoryTest.java
+++ b/runners/direct-java/src/test/java/org/apache/beam/runners/direct/ParDoMultiEvaluatorFactoryTest.java
@@ -32,7 +32,7 @@ import org.apache.beam.runners.direct.WatermarkManager.TimerUpdate;
 import org.apache.beam.sdk.coders.StringUtf8Coder;
 import org.apache.beam.sdk.testing.TestPipeline;
 import org.apache.beam.sdk.transforms.Create;
-import org.apache.beam.sdk.transforms.OldDoFn;
+import org.apache.beam.sdk.transforms.DoFn;
 import org.apache.beam.sdk.transforms.ParDo;
 import org.apache.beam.sdk.transforms.ParDo.BoundMulti;
 import org.apache.beam.sdk.transforms.windowing.BoundedWindow;
@@ -41,11 +41,16 @@ import org.apache.beam.sdk.transforms.windowing.IntervalWindow;
 import org.apache.beam.sdk.transforms.windowing.OutputTimeFns;
 import org.apache.beam.sdk.transforms.windowing.PaneInfo;
 import org.apache.beam.sdk.util.TimeDomain;
+import org.apache.beam.sdk.util.Timer;
 import org.apache.beam.sdk.util.TimerInternals.TimerData;
+import org.apache.beam.sdk.util.TimerSpec;
+import org.apache.beam.sdk.util.TimerSpecs;
 import org.apache.beam.sdk.util.WindowedValue;
 import org.apache.beam.sdk.util.state.BagState;
 import org.apache.beam.sdk.util.state.StateNamespace;
 import org.apache.beam.sdk.util.state.StateNamespaces;
+import org.apache.beam.sdk.util.state.StateSpec;
+import org.apache.beam.sdk.util.state.StateSpecs;
 import org.apache.beam.sdk.util.state.StateTag;
 import org.apache.beam.sdk.util.state.StateTags;
 import org.apache.beam.sdk.util.state.WatermarkHoldState;
@@ -81,8 +86,8 @@ public class ParDoMultiEvaluatorFactoryTest implements Serializable {
 
     BoundMulti<String, KV<String, Integer>> pardo =
         ParDo.of(
-                new OldDoFn<String, KV<String, Integer>>() {
-                  @Override
+                new DoFn<String, KV<String, Integer>>() {
+                  @ProcessElement
                   public void processElement(ProcessContext c) {
                     c.output(KV.<String, Integer>of(c.element(), c.element().length()));
                     c.sideOutput(elementTag, c.element());
@@ -170,8 +175,8 @@ public class ParDoMultiEvaluatorFactoryTest implements Serializable {
 
     BoundMulti<String, KV<String, Integer>> pardo =
         ParDo.of(
-                new OldDoFn<String, KV<String, Integer>>() {
-                  @Override
+                new DoFn<String, KV<String, Integer>>() {
+                  @ProcessElement
                   public void processElement(ProcessContext c) {
                     c.output(KV.<String, Integer>of(c.element(), c.element().length()));
                     c.sideOutput(elementTag, c.element());
@@ -258,20 +263,17 @@ public class ParDoMultiEvaluatorFactoryTest implements Serializable {
         StateNamespaces.window(GlobalWindow.Coder.INSTANCE, GlobalWindow.INSTANCE);
     BoundMulti<String, KV<String, Integer>> pardo =
         ParDo.of(
-                new OldDoFn<String, KV<String, Integer>>() {
-                  @Override
-                  public void processElement(ProcessContext c) {
-                    c.windowingInternals()
-                        .stateInternals()
-                        .state(StateNamespaces.global(), watermarkTag)
-                        .add(new Instant(20202L + c.element().length()));
-                    c.windowingInternals()
-                        .stateInternals()
-                        .state(
-                            StateNamespaces.window(
-                                GlobalWindow.Coder.INSTANCE, GlobalWindow.INSTANCE),
-                            bagTag)
-                        .add(c.element());
+                new DoFn<String, KV<String, Integer>>() {
+                  private static final String STATE_ID = "my-state-id";
+
+                  @StateId(STATE_ID)
+                  private final StateSpec<Object, BagState<String>> bagSpec =
+                      StateSpecs.bag(StringUtf8Coder.of());
+
+                  @ProcessElement
+                  public void processElement(
+                      ProcessContext c, @StateId(STATE_ID) BagState<String> bagState) {
+                    bagState.add(c.element());
                   }
                 })
             .withOutputTags(mainOutputTag, TupleTagList.of(elementTag));
@@ -362,34 +364,25 @@ public class ParDoMultiEvaluatorFactoryTest implements Serializable {
 
     BoundMulti<String, KV<String, Integer>> pardo =
         ParDo.of(
-                new OldDoFn<String, KV<String, Integer>>() {
-                  @Override
-                  public void processElement(ProcessContext c) {
-                    c.windowingInternals().stateInternals();
-                    c.windowingInternals()
-                        .timerInternals()
-                        .setTimer(
-                            TimerData.of(
-                                StateNamespaces.window(
-                                    IntervalWindow.getCoder(),
-                                    new IntervalWindow(
-                                        new Instant(0).plus(Duration.standardMinutes(5)),
-                                        new Instant(1)
-                                            .plus(Duration.standardMinutes(5))
-                                            .plus(Duration.standardHours(1)))),
-                                new Instant(54541L),
-                                TimeDomain.EVENT_TIME));
-                    c.windowingInternals()
-                        .timerInternals()
-                        .deleteTimer(
-                            TimerData.of(
-                                StateNamespaces.window(
-                                    IntervalWindow.getCoder(),
-                                    new IntervalWindow(
-                                        new Instant(0),
-                                        new Instant(0).plus(Duration.standardHours(1)))),
-                                new Instant(3400000),
-                                TimeDomain.SYNCHRONIZED_PROCESSING_TIME));
+                new DoFn<String, KV<String, Integer>>() {
+                  private static final String EVENT_TIME_TIMER = "event-time-timer";
+                  private static final String SYNC_PROC_TIME_TIMER = "sync-proc-time-timer";
+
+                  @TimerId(EVENT_TIME_TIMER)
+                  TimerSpec myTimerSpec = TimerSpecs.timer(TimeDomain.EVENT_TIME);
+
+                  @TimerId(SYNC_PROC_TIME_TIMER)
+                  TimerSpec syncProcTimerSpec =
+                      TimerSpecs.timer(TimeDomain.SYNCHRONIZED_PROCESSING_TIME);
+
+                  @ProcessElement
+                  public void processElement(
+                      ProcessContext c,
+                      @TimerId(EVENT_TIME_TIMER) Timer eventTimeTimer,
+                      @TimerId(SYNC_PROC_TIME_TIMER) Timer syncProcTimeTimer) {
+
+                    eventTimeTimer.setForNowPlus(Duration.standardMinutes(5));
+                    syncProcTimeTimer.cancel();
                   }
                 })
             .withOutputTags(mainOutputTag, TupleTagList.of(elementTag));

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/3d086857/runners/direct-java/src/test/java/org/apache/beam/runners/direct/ParDoSingleEvaluatorFactoryTest.java
----------------------------------------------------------------------
diff --git a/runners/direct-java/src/test/java/org/apache/beam/runners/direct/ParDoSingleEvaluatorFactoryTest.java b/runners/direct-java/src/test/java/org/apache/beam/runners/direct/ParDoSingleEvaluatorFactoryTest.java
index e562b28..d22643a 100644
--- a/runners/direct-java/src/test/java/org/apache/beam/runners/direct/ParDoSingleEvaluatorFactoryTest.java
+++ b/runners/direct-java/src/test/java/org/apache/beam/runners/direct/ParDoSingleEvaluatorFactoryTest.java
@@ -32,22 +32,25 @@ import org.apache.beam.runners.direct.WatermarkManager.TimerUpdate;
 import org.apache.beam.sdk.coders.StringUtf8Coder;
 import org.apache.beam.sdk.testing.TestPipeline;
 import org.apache.beam.sdk.transforms.Create;
-import org.apache.beam.sdk.transforms.OldDoFn;
+import org.apache.beam.sdk.transforms.DoFn;
 import org.apache.beam.sdk.transforms.ParDo;
 import org.apache.beam.sdk.transforms.windowing.BoundedWindow;
 import org.apache.beam.sdk.transforms.windowing.GlobalWindow;
 import org.apache.beam.sdk.transforms.windowing.IntervalWindow;
-import org.apache.beam.sdk.transforms.windowing.OutputTimeFns;
 import org.apache.beam.sdk.transforms.windowing.PaneInfo;
 import org.apache.beam.sdk.util.TimeDomain;
+import org.apache.beam.sdk.util.Timer;
 import org.apache.beam.sdk.util.TimerInternals.TimerData;
+import org.apache.beam.sdk.util.TimerSpec;
+import org.apache.beam.sdk.util.TimerSpecs;
 import org.apache.beam.sdk.util.WindowedValue;
 import org.apache.beam.sdk.util.state.BagState;
 import org.apache.beam.sdk.util.state.StateNamespace;
 import org.apache.beam.sdk.util.state.StateNamespaces;
+import org.apache.beam.sdk.util.state.StateSpec;
+import org.apache.beam.sdk.util.state.StateSpecs;
 import org.apache.beam.sdk.util.state.StateTag;
 import org.apache.beam.sdk.util.state.StateTags;
-import org.apache.beam.sdk.util.state.WatermarkHoldState;
 import org.apache.beam.sdk.values.KV;
 import org.apache.beam.sdk.values.PCollection;
 import org.apache.beam.sdk.values.TupleTag;
@@ -74,8 +77,8 @@ public class ParDoSingleEvaluatorFactoryTest implements Serializable {
     PCollection<Integer> collection =
         input.apply(
             ParDo.of(
-                new OldDoFn<String, Integer>() {
-                  @Override
+                new DoFn<String, Integer>() {
+                  @ProcessElement
                   public void processElement(ProcessContext c) {
                     c.output(c.element().length());
                   }
@@ -128,8 +131,8 @@ public class ParDoSingleEvaluatorFactoryTest implements Serializable {
     PCollection<Integer> collection =
         input.apply(
             ParDo.of(
-                new OldDoFn<String, Integer>() {
-                  @Override
+                new DoFn<String, Integer>() {
+                  @ProcessElement
                   public void processElement(ProcessContext c) {
                     c.sideOutput(sideOutputTag, c.element().length());
                   }
@@ -178,26 +181,22 @@ public class ParDoSingleEvaluatorFactoryTest implements Serializable {
 
     PCollection<String> input = p.apply(Create.of("foo", "bara", "bazam"));
 
-    final StateTag<Object, WatermarkHoldState<BoundedWindow>> watermarkTag =
-        StateTags.watermarkStateInternal("myId", OutputTimeFns.outputAtEarliestInputTimestamp());
     final StateTag<Object, BagState<String>> bagTag = StateTags.bag("myBag", StringUtf8Coder.of());
     final StateNamespace windowNs =
         StateNamespaces.window(GlobalWindow.Coder.INSTANCE, GlobalWindow.INSTANCE);
     ParDo.Bound<String, KV<String, Integer>> pardo =
         ParDo.of(
-            new OldDoFn<String, KV<String, Integer>>() {
-              @Override
-              public void processElement(ProcessContext c) {
-                c.windowingInternals()
-                    .stateInternals()
-                    .state(StateNamespaces.global(), watermarkTag)
-                    .add(new Instant(124443L - c.element().length()));
-                c.windowingInternals()
-                    .stateInternals()
-                    .state(
-                        StateNamespaces.window(GlobalWindow.Coder.INSTANCE, GlobalWindow.INSTANCE),
-                        bagTag)
-                    .add(c.element());
+            new DoFn<String, KV<String, Integer>>() {
+              private static final String STATE_ID = "my-state-id";
+
+              @StateId(STATE_ID)
+              private final StateSpec<Object, BagState<String>> bagSpec =
+                  StateSpecs.bag(StringUtf8Coder.of());
+
+              @ProcessElement
+              public void processElement(
+                  ProcessContext c, @StateId(STATE_ID) BagState<String> bagState) {
+                bagState.add(c.element());
               }
             });
     PCollection<KV<String, Integer>> mainOutput = input.apply(pardo);
@@ -237,9 +236,6 @@ public class ParDoSingleEvaluatorFactoryTest implements Serializable {
     assertThat(result.getWatermarkHold(), equalTo(new Instant(124438L)));
     assertThat(result.getState(), not(nullValue()));
     assertThat(
-        result.getState().state(StateNamespaces.global(), watermarkTag).read(),
-        equalTo(new Instant(124438L)));
-    assertThat(
         result.getState().state(windowNs, bagTag).read(),
         containsInAnyOrder("foo", "bara", "bazam"));
   }
@@ -255,6 +251,8 @@ public class ParDoSingleEvaluatorFactoryTest implements Serializable {
 
     PCollection<String> input = p.apply(Create.of("foo", "bara", "bazam"));
 
+    // TODO: this timer data is absolute, but the new API only support relative settings.
+    // It will require adjustments when @Ignore is removed
     final TimerData addedTimer =
         TimerData.of(
             StateNamespaces.window(
@@ -276,34 +274,24 @@ public class ParDoSingleEvaluatorFactoryTest implements Serializable {
 
     ParDo.Bound<String, KV<String, Integer>> pardo =
         ParDo.of(
-            new OldDoFn<String, KV<String, Integer>>() {
-              @Override
-              public void processElement(ProcessContext c) {
-                c.windowingInternals().stateInternals();
-                c.windowingInternals()
-                    .timerInternals()
-                    .setTimer(
-                        TimerData.of(
-                            StateNamespaces.window(
-                                IntervalWindow.getCoder(),
-                                new IntervalWindow(
-                                    new Instant(0).plus(Duration.standardMinutes(5)),
-                                    new Instant(1)
-                                        .plus(Duration.standardMinutes(5))
-                                        .plus(Duration.standardHours(1)))),
-                            new Instant(54541L),
-                            TimeDomain.EVENT_TIME));
-                c.windowingInternals()
-                    .timerInternals()
-                    .deleteTimer(
-                        TimerData.of(
-                            StateNamespaces.window(
-                                IntervalWindow.getCoder(),
-                                new IntervalWindow(
-                                    new Instant(0),
-                                    new Instant(0).plus(Duration.standardHours(1)))),
-                            new Instant(3400000),
-                            TimeDomain.SYNCHRONIZED_PROCESSING_TIME));
+            new DoFn<String, KV<String, Integer>>() {
+              private static final String EVENT_TIME_TIMER = "event-time-timer";
+              private static final String SYNC_PROC_TIME_TIMER = "sync-proc-time-timer";
+
+              @TimerId(EVENT_TIME_TIMER)
+              TimerSpec myTimerSpec = TimerSpecs.timer(TimeDomain.EVENT_TIME);
+
+              @TimerId(SYNC_PROC_TIME_TIMER)
+              TimerSpec syncProcTimerSpec =
+                  TimerSpecs.timer(TimeDomain.SYNCHRONIZED_PROCESSING_TIME);
+
+              @ProcessElement
+              public void processElement(
+                  ProcessContext c,
+                  @TimerId(EVENT_TIME_TIMER) Timer eventTimeTimer,
+                  @TimerId(SYNC_PROC_TIME_TIMER) Timer syncProcTimeTimer) {
+                eventTimeTimer.setForNowPlus(Duration.standardMinutes(5));
+                syncProcTimeTimer.cancel();
               }
             });
     PCollection<KV<String, Integer>> mainOutput = input.apply(pardo);

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/3d086857/runners/direct-java/src/test/java/org/apache/beam/runners/direct/WatermarkManagerTest.java
----------------------------------------------------------------------
diff --git a/runners/direct-java/src/test/java/org/apache/beam/runners/direct/WatermarkManagerTest.java b/runners/direct-java/src/test/java/org/apache/beam/runners/direct/WatermarkManagerTest.java
index 042abab..1954005 100644
--- a/runners/direct-java/src/test/java/org/apache/beam/runners/direct/WatermarkManagerTest.java
+++ b/runners/direct-java/src/test/java/org/apache/beam/runners/direct/WatermarkManagerTest.java
@@ -47,9 +47,9 @@ import org.apache.beam.sdk.coders.VarLongCoder;
 import org.apache.beam.sdk.testing.TestPipeline;
 import org.apache.beam.sdk.transforms.AppliedPTransform;
 import org.apache.beam.sdk.transforms.Create;
+import org.apache.beam.sdk.transforms.DoFn;
 import org.apache.beam.sdk.transforms.Filter;
 import org.apache.beam.sdk.transforms.Flatten;
-import org.apache.beam.sdk.transforms.OldDoFn;
 import org.apache.beam.sdk.transforms.ParDo;
 import org.apache.beam.sdk.transforms.WithKeys;
 import org.apache.beam.sdk.transforms.windowing.BoundedWindow;
@@ -101,9 +101,9 @@ public class WatermarkManagerTest implements Serializable {
     createdInts = p.apply("createdInts", Create.of(1, 2, 3));
 
     filtered = createdInts.apply("filtered", Filter.greaterThan(1));
-    filteredTimesTwo = filtered.apply("timesTwo", ParDo.of(new OldDoFn<Integer, Integer>() {
-      @Override
-      public void processElement(OldDoFn<Integer, Integer>.ProcessContext c) throws Exception {
+    filteredTimesTwo = filtered.apply("timesTwo", ParDo.of(new DoFn<Integer, Integer>() {
+      @ProcessElement
+      public void processElement(ProcessContext c) throws Exception {
         c.output(c.element() * 2);
       }
     }));