You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@beam.apache.org by dh...@apache.org on 2016/09/27 21:57:58 UTC

[1/2] incubator-beam git commit: Closes #1011

Repository: incubator-beam
Updated Branches:
  refs/heads/master 900980246 -> 2571cfcf5


Closes #1011


Project: http://git-wip-us.apache.org/repos/asf/incubator-beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-beam/commit/2571cfcf
Tree: http://git-wip-us.apache.org/repos/asf/incubator-beam/tree/2571cfcf
Diff: http://git-wip-us.apache.org/repos/asf/incubator-beam/diff/2571cfcf

Branch: refs/heads/master
Commit: 2571cfcf5e057141c033101df039d8a4366b0f33
Parents: 9009802 bef0e9d
Author: Dan Halperin <dh...@google.com>
Authored: Tue Sep 27 14:57:49 2016 -0700
Committer: Dan Halperin <dh...@google.com>
Committed: Tue Sep 27 14:57:49 2016 -0700

----------------------------------------------------------------------
 .../apache/beam/sdk/transforms/DoFnTester.java  | 120 +++--
 .../beam/sdk/transforms/DoFnTesterTest.java     | 456 +++++++++++--------
 2 files changed, 338 insertions(+), 238 deletions(-)
----------------------------------------------------------------------



[2/2] incubator-beam git commit: Support for @Setup and @Teardown in DoFnTester

Posted by dh...@apache.org.
Support for @Setup and @Teardown in DoFnTester


Project: http://git-wip-us.apache.org/repos/asf/incubator-beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-beam/commit/bef0e9de
Tree: http://git-wip-us.apache.org/repos/asf/incubator-beam/tree/bef0e9de
Diff: http://git-wip-us.apache.org/repos/asf/incubator-beam/diff/bef0e9de

Branch: refs/heads/master
Commit: bef0e9de02be051411f20b298168e8477ed1a0da
Parents: 9009802
Author: Eugene Kirpichov <ki...@google.com>
Authored: Mon Sep 26 16:58:20 2016 -0700
Committer: Dan Halperin <dh...@google.com>
Committed: Tue Sep 27 14:57:49 2016 -0700

----------------------------------------------------------------------
 .../apache/beam/sdk/transforms/DoFnTester.java  | 120 +++--
 .../beam/sdk/transforms/DoFnTesterTest.java     | 456 +++++++++++--------
 2 files changed, 338 insertions(+), 238 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/bef0e9de/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/DoFnTester.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/DoFnTester.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/DoFnTester.java
index 0e018ba..9adb806 100644
--- a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/DoFnTester.java
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/DoFnTester.java
@@ -78,10 +78,10 @@ import org.joda.time.Instant;
  * @param <InputT> the type of the {@link DoFn}'s (main) input elements
  * @param <OutputT> the type of the {@link DoFn}'s (main) output elements
  */
-public class DoFnTester<InputT, OutputT> {
+public class DoFnTester<InputT, OutputT> implements AutoCloseable {
   /**
    * Returns a {@code DoFnTester} supporting unit-testing of the given
-   * {@link DoFn}.
+   * {@link DoFn}. By default, uses {@link CloningBehavior#CLONE_ONCE}.
    */
   @SuppressWarnings("unchecked")
   public static <InputT, OutputT> DoFnTester<InputT, OutputT> of(DoFn<InputT, OutputT> fn) {
@@ -91,6 +91,8 @@ public class DoFnTester<InputT, OutputT> {
   /**
    * Returns a {@code DoFnTester} supporting unit-testing of the given
    * {@link OldDoFn}.
+   *
+   * @see #of(DoFn)
    */
   @SuppressWarnings("unchecked")
    public static <InputT, OutputT> DoFnTester<InputT, OutputT>
@@ -108,8 +110,11 @@ public class DoFnTester<InputT, OutputT> {
    * {@link DoFn} takes no side inputs.
    */
   public void setSideInputs(Map<PCollectionView<?>, Map<BoundedWindow, ?>> sideInputs) {
+    checkState(
+        state == State.UNINITIALIZED,
+        "Can't add side inputs: DoFnTester is already initialized, in state %s",
+        state);
     this.sideInputs = sideInputs;
-    resetState();
   }
 
   /**
@@ -123,6 +128,10 @@ public class DoFnTester<InputT, OutputT> {
    * that is used.
    */
   public <T> void setSideInput(PCollectionView<T> sideInput, BoundedWindow window, T value) {
+    checkState(
+        state == State.UNINITIALIZED,
+        "Can't add side inputs: DoFnTester is already initialized, in state %s",
+        state);
     Map<BoundedWindow, T> windowValues = (Map<BoundedWindow, T>) sideInputs.get(sideInput);
     if (windowValues == null) {
       windowValues = new HashMap<>();
@@ -132,10 +141,24 @@ public class DoFnTester<InputT, OutputT> {
   }
 
   /**
-   * Whether or not a {@link DoFnTester} should clone the {@link DoFn} under test.
+   * When a {@link DoFnTester} should clone the {@link DoFn} under test and how it should manage
+   * the lifecycle of the {@link DoFn}.
    */
   public enum CloningBehavior {
-    CLONE,
+    /**
+     * Clone the {@link DoFn} and call {@link DoFn.Setup} every time a bundle starts; call {@link
+     * DoFn.Teardown} every time a bundle finishes.
+     */
+    CLONE_PER_BUNDLE,
+    /**
+     * Clone the {@link DoFn} and call {@link DoFn.Setup} on the first access; call {@link
+     * DoFn.Teardown} only explicitly.
+     */
+    CLONE_ONCE,
+    /**
+     * Do not clone the {@link DoFn}; call {@link DoFn.Setup} on the first access; call {@link
+     * DoFn.Teardown} only explicitly.
+     */
     DO_NOT_CLONE
   }
 
@@ -143,6 +166,7 @@ public class DoFnTester<InputT, OutputT> {
    * Instruct this {@link DoFnTester} whether or not to clone the {@link DoFn} under test.
    */
   public void setCloningBehavior(CloningBehavior newValue) {
+    checkState(state == State.UNINITIALIZED, "Wrong state: %s", state);
     this.cloningBehavior = newValue;
   }
 
@@ -187,11 +211,17 @@ public class DoFnTester<InputT, OutputT> {
   /**
    * Calls the {@link DoFn.StartBundle} method on the {@link DoFn} under test.
    *
-   * <p>If needed, first creates a fresh instance of the {@link DoFn} under test.
+   * <p>If needed, first creates a fresh instance of the {@link DoFn} under test and calls
+   * {@link DoFn.Setup}.
    */
   public void startBundle() throws Exception {
-    resetState();
-    initializeState();
+    checkState(
+        state == State.UNINITIALIZED || state == State.BUNDLE_FINISHED,
+        "Wrong state during startBundle: %s",
+        state);
+    if (state == State.UNINITIALIZED) {
+      initializeState();
+    }
     TestContext<InputT, OutputT> context = createContext(fn);
     context.setupDelegateAggregators();
     try {
@@ -199,7 +229,7 @@ public class DoFnTester<InputT, OutputT> {
     } catch (UserCodeException e) {
       unwrapUserCodeException(e);
     }
-    state = State.STARTED;
+    state = State.BUNDLE_STARTED;
   }
 
   private static void unwrapUserCodeException(UserCodeException e) throws Exception {
@@ -236,15 +266,10 @@ public class DoFnTester<InputT, OutputT> {
    * already been called.
    *
    * <p>If the input timestamp is {@literal null}, the minimum timestamp will be used.
-   *
-   * @throws IllegalStateException if the {@code OldDoFn} under test has already
-   * been finished
    */
   public void processTimestampedElement(TimestampedValue<InputT> element) throws Exception {
     checkNotNull(element, "Timestamped element cannot be null");
-    checkState(state != State.FINISHED, "finishBundle() has already been called");
-
-    if (state == State.UNSTARTED) {
+    if (state != State.BUNDLE_STARTED) {
       startBundle();
     }
     try {
@@ -257,25 +282,30 @@ public class DoFnTester<InputT, OutputT> {
   /**
    * Calls the {@link DoFn.FinishBundle} method of the {@link DoFn} under test.
    *
-   * <p>Will call {@link #startBundle} automatically, if it hasn't
-   * already been called.
+   * <p>If {@link #setCloningBehavior} was called with {@link CloningBehavior#CLONE_PER_BUNDLE},
+   * then also calls {@link DoFn.Teardown} on the {@link DoFn}, and it will be cloned and
+   * {@link DoFn.Setup} again when processing the next bundle.
    *
-   * @throws IllegalStateException if the {@link DoFn} under test has already
-   * been finished
+   * @throws IllegalStateException if {@link DoFn.FinishBundle} has already been called
+   * for this bundle.
    */
   public void finishBundle() throws Exception {
-    if (state == State.FINISHED) {
-      throw new IllegalStateException("finishBundle() has already been called");
-    }
-    if (state == State.UNSTARTED) {
-      startBundle();
-    }
+    checkState(
+        state == State.BUNDLE_STARTED,
+        "Must be inside bundle to call finishBundle, but was: %s",
+        state);
     try {
       fn.finishBundle(createContext(fn));
     } catch (UserCodeException e) {
       unwrapUserCodeException(e);
     }
-    state = State.FINISHED;
+    if (cloningBehavior == CloningBehavior.CLONE_PER_BUNDLE) {
+      fn.teardown();
+      fn = null;
+      state = State.UNINITIALIZED;
+    } else {
+      state = State.BUNDLE_FINISHED;
+    }
   }
 
   /**
@@ -695,13 +725,26 @@ public class DoFnTester<InputT, OutputT> {
     }
   }
 
+  @Override
+  public void close() throws Exception {
+    if (state == State.BUNDLE_STARTED) {
+      finishBundle();
+    }
+    if (state == State.BUNDLE_FINISHED) {
+      fn.teardown();
+      fn = null;
+    }
+    state = State.TORN_DOWN;
+  }
+
   /////////////////////////////////////////////////////////////////////////////
 
   /** The possible states of processing a {@link DoFn}. */
-  enum State {
-    UNSTARTED,
-    STARTED,
-    FINISHED
+  private enum State {
+    UNINITIALIZED,
+    BUNDLE_STARTED,
+    BUNDLE_FINISHED,
+    TORN_DOWN
   }
 
   private final PipelineOptions options = PipelineOptionsFactory.create();
@@ -714,7 +757,7 @@ public class DoFnTester<InputT, OutputT> {
    *
    * <p>Worker-side {@link DoFn DoFns} may not be serializable, and are not required to be.
    */
-  private CloningBehavior cloningBehavior = CloningBehavior.CLONE;
+  private CloningBehavior cloningBehavior = CloningBehavior.CLONE_ONCE;
 
   /** The side input values to provide to the {@link DoFn} under test. */
   private Map<PCollectionView<?>, Map<BoundedWindow, ?>> sideInputs =
@@ -732,22 +775,16 @@ public class DoFnTester<InputT, OutputT> {
   private Map<TupleTag<?>, List<WindowedValue<?>>> outputs;
 
   /** The state of processing of the {@link DoFn} under test. */
-  private State state;
+  private State state = State.UNINITIALIZED;
 
   private DoFnTester(OldDoFn<InputT, OutputT> origFn) {
     this.origFn = origFn;
-    resetState();
-  }
-
-  private void resetState() {
-    fn = null;
-    outputs = null;
-    accumulators = null;
-    state = State.UNSTARTED;
   }
 
   @SuppressWarnings("unchecked")
-  private void initializeState() {
+  private void initializeState() throws Exception {
+    checkState(state == State.UNINITIALIZED, "Already initialized");
+    checkState(fn == null, "Uninitialized but fn != null");
     if (cloningBehavior.equals(CloningBehavior.DO_NOT_CLONE)) {
       fn = origFn;
     } else {
@@ -756,6 +793,7 @@ public class DoFnTester<InputT, OutputT> {
               SerializableUtils.serializeToByteArray(origFn),
               origFn.toString());
     }
+    fn.setup();
     outputs = new HashMap<>();
     accumulators = new HashMap<>();
   }

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/bef0e9de/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/DoFnTesterTest.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/DoFnTesterTest.java b/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/DoFnTesterTest.java
index 3ed30fd..f208488 100644
--- a/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/DoFnTesterTest.java
+++ b/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/DoFnTesterTest.java
@@ -17,15 +17,17 @@
  */
 package org.apache.beam.sdk.transforms;
 
+import static com.google.common.base.Preconditions.checkState;
 import static org.hamcrest.Matchers.contains;
 import static org.hamcrest.Matchers.containsInAnyOrder;
 import static org.hamcrest.Matchers.equalTo;
 import static org.hamcrest.Matchers.hasItems;
-import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertEquals;
 import static org.junit.Assert.assertThat;
 import static org.junit.Assert.assertTrue;
 
 import java.util.List;
+import java.util.concurrent.atomic.AtomicInteger;
 import org.apache.beam.sdk.coders.VarIntCoder;
 import org.apache.beam.sdk.testing.TestPipeline;
 import org.apache.beam.sdk.transforms.windowing.GlobalWindow;
@@ -51,122 +53,180 @@ public class DoFnTesterTest {
 
   @Test
   public void processElement() throws Exception {
-    CounterDoFn counterDoFn = new CounterDoFn();
-    DoFnTester<Long, String> tester = DoFnTester.of(counterDoFn);
-
-    tester.processElement(1L);
-
-    List<String> take = tester.takeOutputElements();
+    for (DoFnTester.CloningBehavior cloning : DoFnTester.CloningBehavior.values()) {
+      try (DoFnTester<Long, String> tester = DoFnTester.of(new CounterDoFn())) {
+        tester.setCloningBehavior(cloning);
+        tester.processElement(1L);
 
-    assertThat(take, hasItems("1"));
+        List<String> take = tester.takeOutputElements();
 
-    // Following takeOutputElements(), neither takeOutputElements()
-    // nor peekOutputElements() return anything.
-    assertTrue(tester.takeOutputElements().isEmpty());
-    assertTrue(tester.peekOutputElements().isEmpty());
+        assertThat(take, hasItems("1"));
 
-    // processElement() caused startBundle() to be called, but finishBundle() was never called.
-    CounterDoFn deserializedDoFn = (CounterDoFn) tester.fn;
-    assertTrue(deserializedDoFn.wasStartBundleCalled());
-    assertFalse(deserializedDoFn.wasFinishBundleCalled());
+        // Following takeOutputElements(), neither takeOutputElements()
+        // nor peekOutputElements() return anything.
+        assertTrue(tester.takeOutputElements().isEmpty());
+        assertTrue(tester.peekOutputElements().isEmpty());
+      }
+    }
   }
 
   @Test
   public void processElementsWithPeeks() throws Exception {
-    CounterDoFn counterDoFn = new CounterDoFn();
-    DoFnTester<Long, String> tester = DoFnTester.of(counterDoFn);
-
-    // Explicitly call startBundle().
-    tester.startBundle();
-
-    // verify startBundle() was called but not finishBundle().
-    CounterDoFn deserializedDoFn = (CounterDoFn) tester.fn;
-    assertTrue(deserializedDoFn.wasStartBundleCalled());
-    assertFalse(deserializedDoFn.wasFinishBundleCalled());
-
-    // process a couple of elements.
-    tester.processElement(1L);
-    tester.processElement(2L);
-
-    // peek the first 2 outputs.
-    List<String> peek = tester.peekOutputElements();
-    assertThat(peek, hasItems("1", "2"));
-
-    // process a couple more.
-    tester.processElement(3L);
-    tester.processElement(4L);
-
-    // peek all the outputs so far.
-    peek = tester.peekOutputElements();
-    assertThat(peek, hasItems("1", "2", "3", "4"));
-    // take the outputs.
-    List<String> take = tester.takeOutputElements();
-    assertThat(take, hasItems("1", "2", "3", "4"));
-
-    // Following takeOutputElements(), neither takeOutputElements()
-    // nor peekOutputElements() return anything.
-    assertTrue(tester.peekOutputElements().isEmpty());
-    assertTrue(tester.takeOutputElements().isEmpty());
-
-    // verify finishBundle() hasn't been called yet.
-    assertTrue(deserializedDoFn.wasStartBundleCalled());
-    assertFalse(deserializedDoFn.wasFinishBundleCalled());
-
-    // process a couple more.
-    tester.processElement(5L);
-    tester.processElement(6L);
-
-    // peek and take now have only the 2 last outputs.
-    peek = tester.peekOutputElements();
-    assertThat(peek, hasItems("5", "6"));
-    take = tester.takeOutputElements();
-    assertThat(take, hasItems("5", "6"));
-
-    tester.finishBundle();
-
-    // verify finishBundle() was called.
-    assertTrue(deserializedDoFn.wasStartBundleCalled());
-    assertTrue(deserializedDoFn.wasFinishBundleCalled());
+    for (DoFnTester.CloningBehavior cloning : DoFnTester.CloningBehavior.values()) {
+      try (DoFnTester<Long, String> tester = DoFnTester.of(new CounterDoFn())) {
+        tester.setCloningBehavior(cloning);
+        // Explicitly call startBundle().
+        tester.startBundle();
+
+        // process a couple of elements.
+        tester.processElement(1L);
+        tester.processElement(2L);
+
+        // peek the first 2 outputs.
+        List<String> peek = tester.peekOutputElements();
+        assertThat(peek, hasItems("1", "2"));
+
+        // process a couple more.
+        tester.processElement(3L);
+        tester.processElement(4L);
+
+        // peek all the outputs so far.
+        peek = tester.peekOutputElements();
+        assertThat(peek, hasItems("1", "2", "3", "4"));
+        // take the outputs.
+        List<String> take = tester.takeOutputElements();
+        assertThat(take, hasItems("1", "2", "3", "4"));
+
+        // Following takeOutputElements(), neither takeOutputElements()
+        // nor peekOutputElements() return anything.
+        assertTrue(tester.peekOutputElements().isEmpty());
+        assertTrue(tester.takeOutputElements().isEmpty());
+
+        // process a couple more.
+        tester.processElement(5L);
+        tester.processElement(6L);
+
+        // peek and take now have only the 2 last outputs.
+        peek = tester.peekOutputElements();
+        assertThat(peek, hasItems("5", "6"));
+        take = tester.takeOutputElements();
+        assertThat(take, hasItems("5", "6"));
+
+        tester.finishBundle();
+      }
+    }
   }
 
   @Test
-  public void processElementAfterFinish() throws Exception {
-    DoFnTester<Long, String> tester = DoFnTester.of(new CounterDoFn());
-    tester.finishBundle();
+  public void processBundle() throws Exception {
+    for (DoFnTester.CloningBehavior cloning : DoFnTester.CloningBehavior.values()) {
+      try (DoFnTester<Long, String> tester = DoFnTester.of(new CounterDoFn())) {
+        tester.setCloningBehavior(cloning);
+        // processBundle() returns all the output like takeOutputElements().
+        assertThat(tester.processBundle(1L, 2L, 3L, 4L), hasItems("1", "2", "3", "4"));
+
+        // peek now returns nothing.
+        assertTrue(tester.peekOutputElements().isEmpty());
+      }
+    }
+  }
 
-    thrown.expect(IllegalStateException.class);
-    thrown.expectMessage("finishBundle() has already been called");
-    tester.processElement(1L);
+  @Test
+  public void processMultipleBundles() throws Exception {
+    for (DoFnTester.CloningBehavior cloning : DoFnTester.CloningBehavior.values()) {
+      try (DoFnTester<Long, String> tester = DoFnTester.of(new CounterDoFn())) {
+        tester.setCloningBehavior(cloning);
+        // processBundle() returns all the output like takeOutputElements().
+        assertThat(tester.processBundle(1L, 2L, 3L, 4L), hasItems("1", "2", "3", "4"));
+        assertThat(tester.processBundle(5L, 6L, 7L), hasItems("5", "6", "7"));
+        assertThat(tester.processBundle(8L, 9L), hasItems("8", "9"));
+
+        // peek now returns nothing.
+        assertTrue(tester.peekOutputElements().isEmpty());
+      }
+    }
   }
 
   @Test
-  public void processBatch() throws Exception {
-    CounterDoFn counterDoFn = new CounterDoFn();
-    DoFnTester<Long, String> tester = DoFnTester.of(counterDoFn);
+  public void doNotClone() throws Exception {
+    final AtomicInteger numSetupCalls = new AtomicInteger();
+    final AtomicInteger numTeardownCalls = new AtomicInteger();
+    DoFn<Long, String> fn =
+        new DoFn<Long, String>() {
+          @ProcessElement
+          public void process(ProcessContext context) {}
+
+          @Setup
+          public void setup() {
+            numSetupCalls.addAndGet(1);
+          }
+
+          @Teardown
+          public void teardown() {
+            numTeardownCalls.addAndGet(1);
+          }
+        };
+
+    try (DoFnTester<Long, String> tester = DoFnTester.of(fn)) {
+      tester.setCloningBehavior(DoFnTester.CloningBehavior.DO_NOT_CLONE);
+
+      tester.processBundle(1L, 2L, 3L);
+      tester.processBundle(4L, 5L);
+      tester.processBundle(6L);
+    }
+    assertEquals(1, numSetupCalls.get());
+    assertEquals(1, numTeardownCalls.get());
+  }
 
-    // processBundle() returns all the output like takeOutputElements().
-    List<String> take = tester.processBundle(1L, 2L, 3L, 4L);
+  private static class CountBundleCallsFn extends DoFn<Long, String> {
+    private int numStartBundleCalls = 0;
+    private int numFinishBundleCalls = 0;
 
-    assertThat(take, hasItems("1", "2", "3", "4"));
+    @ProcessElement
+    public void process(ProcessContext context) {
+      context.output(numStartBundleCalls + "/" + numFinishBundleCalls);
+    }
 
-    // peek now returns nothing.
-    assertTrue(tester.peekOutputElements().isEmpty());
+    @StartBundle
+    public void startBundle(Context context) {
+      ++numStartBundleCalls;
+    }
 
-    // verify startBundle() and finishBundle() were both called.
-    CounterDoFn deserializedDoFn = (CounterDoFn) tester.fn;
-    assertTrue(deserializedDoFn.wasStartBundleCalled());
-    assertTrue(deserializedDoFn.wasFinishBundleCalled());
+    @FinishBundle
+    public void finishBundle(Context context) {
+      ++numFinishBundleCalls;
+    }
   }
 
   @Test
-  public void processTimestampedElement() throws Exception {
-    DoFn<Long, TimestampedValue<Long>> reifyTimestamps = new ReifyTimestamps();
+  public void cloneOnce() throws Exception {
+    try (DoFnTester<Long, String> tester = DoFnTester.of(new CountBundleCallsFn())) {
+      tester.setCloningBehavior(DoFnTester.CloningBehavior.CLONE_ONCE);
+
+      assertThat(tester.processBundle(1L, 2L, 3L), contains("1/0", "1/0", "1/0"));
+      assertThat(tester.processBundle(4L, 5L), contains("2/1", "2/1"));
+      assertThat(tester.processBundle(6L), contains("3/2"));
+    }
+  }
+
+  @Test
+  public void clonePerBundle() throws Exception {
+    try (DoFnTester<Long, String> tester = DoFnTester.of(new CountBundleCallsFn())) {
+      tester.setCloningBehavior(DoFnTester.CloningBehavior.CLONE_PER_BUNDLE);
 
-    DoFnTester<Long, TimestampedValue<Long>> tester = DoFnTester.of(reifyTimestamps);
+      assertThat(tester.processBundle(1L, 2L, 3L), contains("1/0", "1/0", "1/0"));
+      assertThat(tester.processBundle(4L, 5L), contains("1/0", "1/0"));
+      assertThat(tester.processBundle(6L), contains("1/0"));
+    }
+  }
 
-    TimestampedValue<Long> input = TimestampedValue.of(1L, new Instant(100));
-    tester.processTimestampedElement(input);
-    assertThat(tester.takeOutputElements(), contains(input));
+  @Test
+  public void processTimestampedElement() throws Exception {
+    try (DoFnTester<Long, TimestampedValue<Long>> tester = DoFnTester.of(new ReifyTimestamps())) {
+      TimestampedValue<Long> input = TimestampedValue.of(1L, new Instant(100));
+      tester.processTimestampedElement(input);
+      assertThat(tester.takeOutputElements(), contains(input));
+    }
   }
 
   static class ReifyTimestamps extends DoFn<Long, TimestampedValue<Long>> {
@@ -178,86 +238,83 @@ public class DoFnTesterTest {
 
   @Test
   public void processElementWithOutputTimestamp() throws Exception {
-    CounterDoFn counterDoFn = new CounterDoFn();
-    DoFnTester<Long, String> tester = DoFnTester.of(counterDoFn);
-
-    tester.processElement(1L);
-    tester.processElement(2L);
-
-    List<TimestampedValue<String>> peek = tester.peekOutputElementsWithTimestamp();
-    TimestampedValue<String> one = TimestampedValue.of("1", new Instant(1000L));
-    TimestampedValue<String> two = TimestampedValue.of("2", new Instant(2000L));
-    assertThat(peek, hasItems(one, two));
-
-    tester.processElement(3L);
-    tester.processElement(4L);
-
-    TimestampedValue<String> three = TimestampedValue.of("3", new Instant(3000L));
-    TimestampedValue<String> four = TimestampedValue.of("4", new Instant(4000L));
-    peek = tester.peekOutputElementsWithTimestamp();
-    assertThat(peek, hasItems(one, two, three, four));
-    List<TimestampedValue<String>> take = tester.takeOutputElementsWithTimestamp();
-    assertThat(take, hasItems(one, two, three, four));
-
-    // Following takeOutputElementsWithTimestamp(), neither takeOutputElementsWithTimestamp()
-    // nor peekOutputElementsWithTimestamp() return anything.
-    assertTrue(tester.takeOutputElementsWithTimestamp().isEmpty());
-    assertTrue(tester.peekOutputElementsWithTimestamp().isEmpty());
-
-    // peekOutputElements() and takeOutputElements() also return nothing.
-    assertTrue(tester.peekOutputElements().isEmpty());
-    assertTrue(tester.takeOutputElements().isEmpty());
+    try (DoFnTester<Long, String> tester = DoFnTester.of(new CounterDoFn())) {
+      tester.processElement(1L);
+      tester.processElement(2L);
+
+      List<TimestampedValue<String>> peek = tester.peekOutputElementsWithTimestamp();
+      TimestampedValue<String> one = TimestampedValue.of("1", new Instant(1000L));
+      TimestampedValue<String> two = TimestampedValue.of("2", new Instant(2000L));
+      assertThat(peek, hasItems(one, two));
+
+      tester.processElement(3L);
+      tester.processElement(4L);
+
+      TimestampedValue<String> three = TimestampedValue.of("3", new Instant(3000L));
+      TimestampedValue<String> four = TimestampedValue.of("4", new Instant(4000L));
+      peek = tester.peekOutputElementsWithTimestamp();
+      assertThat(peek, hasItems(one, two, three, four));
+      List<TimestampedValue<String>> take = tester.takeOutputElementsWithTimestamp();
+      assertThat(take, hasItems(one, two, three, four));
+
+      // Following takeOutputElementsWithTimestamp(), neither takeOutputElementsWithTimestamp()
+      // nor peekOutputElementsWithTimestamp() return anything.
+      assertTrue(tester.takeOutputElementsWithTimestamp().isEmpty());
+      assertTrue(tester.peekOutputElementsWithTimestamp().isEmpty());
+
+      // peekOutputElements() and takeOutputElements() also return nothing.
+      assertTrue(tester.peekOutputElements().isEmpty());
+      assertTrue(tester.takeOutputElements().isEmpty());
+    }
   }
 
   @Test
   public void getAggregatorValuesShouldGetValueOfCounter() throws Exception {
     CounterDoFn counterDoFn = new CounterDoFn();
-    DoFnTester<Long, String> tester = DoFnTester.of(counterDoFn);
-    tester.processBundle(1L, 2L, 4L, 8L);
-
-    Long aggregatorVal = tester.getAggregatorValue(counterDoFn.agg);
-
-    assertThat(aggregatorVal, equalTo(15L));
+    try (DoFnTester<Long, String> tester = DoFnTester.of(counterDoFn)) {
+      tester.processBundle(1L, 2L, 4L, 8L);
+      assertThat(tester.getAggregatorValue(counterDoFn.agg), equalTo(15L));
+    }
   }
 
   @Test
   public void getAggregatorValuesWithEmptyCounterShouldSucceed() throws Exception {
     CounterDoFn counterDoFn = new CounterDoFn();
-    DoFnTester<Long, String> tester = DoFnTester.of(counterDoFn);
-    tester.processBundle();
-    Long aggregatorVal = tester.getAggregatorValue(counterDoFn.agg);
-    // empty bundle
-    assertThat(aggregatorVal, equalTo(0L));
+    try (DoFnTester<Long, String> tester = DoFnTester.of(counterDoFn)) {
+      tester.processBundle();
+      // empty bundle
+      assertThat(tester.getAggregatorValue(counterDoFn.agg), equalTo(0L));
+    }
   }
 
   @Test
   public void getAggregatorValuesInStartFinishBundleShouldGetValues() throws Exception {
-    CounterDoFn fn = new CounterDoFn(1L, 2L);
-    DoFnTester<Long, String> tester = DoFnTester.of(fn);
-    tester.processBundle(0L, 0L);
+    CounterDoFn fn = new CounterDoFn();
+    try (DoFnTester<Long, String> tester = DoFnTester.of(fn)) {
+      tester.processBundle(1L, 2L, 3L, 4L);
 
-    Long aggValue = tester.getAggregatorValue(fn.agg);
-    assertThat(aggValue, equalTo(1L + 2L));
+      assertThat(tester.getAggregatorValue(fn.startBundleCalls), equalTo(1L));
+      assertThat(tester.getAggregatorValue(fn.finishBundleCalls), equalTo(1L));
+    }
   }
 
   @Test
   public void peekValuesInWindow() throws Exception {
-    CounterDoFn fn = new CounterDoFn(1L, 2L);
-    DoFnTester<Long, String> tester = DoFnTester.of(fn);
-
-    tester.startBundle();
-    tester.processElement(1L);
-    tester.processElement(2L);
-    tester.finishBundle();
-
-    assertThat(
-        tester.peekOutputElementsInWindow(GlobalWindow.INSTANCE),
-        containsInAnyOrder(
-            TimestampedValue.of("1", new Instant(1000L)),
-            TimestampedValue.of("2", new Instant(2000L))));
-    assertThat(
-        tester.peekOutputElementsInWindow(new IntervalWindow(new Instant(0L), new Instant(10L))),
-        Matchers.<TimestampedValue<String>>emptyIterable());
+    try (DoFnTester<Long, String> tester = DoFnTester.of(new CounterDoFn())) {
+      tester.startBundle();
+      tester.processElement(1L);
+      tester.processElement(2L);
+      tester.finishBundle();
+
+      assertThat(
+          tester.peekOutputElementsInWindow(GlobalWindow.INSTANCE),
+          containsInAnyOrder(
+              TimestampedValue.of("1", new Instant(1000L)),
+              TimestampedValue.of("2", new Instant(2000L))));
+      assertThat(
+          tester.peekOutputElementsInWindow(new IntervalWindow(new Instant(0L), new Instant(10L))),
+          Matchers.<TimestampedValue<String>>emptyIterable());
+    }
   }
 
   @Test
@@ -265,15 +322,14 @@ public class DoFnTesterTest {
     final PCollectionView<Integer> value =
         PCollectionViews.singletonView(
             TestPipeline.create(), WindowingStrategy.globalDefault(), true, 0, VarIntCoder.of());
-    OldDoFn<Integer, Integer> fn = new SideInputDoFn(value);
-
-    DoFnTester<Integer, Integer> tester = DoFnTester.of(fn);
 
-    tester.processElement(1);
-    tester.processElement(2);
-    tester.processElement(4);
-    tester.processElement(8);
-    assertThat(tester.peekOutputElements(), containsInAnyOrder(0, 0, 0, 0));
+    try (DoFnTester<Integer, Integer> tester = DoFnTester.of(new SideInputDoFn(value))) {
+      tester.processElement(1);
+      tester.processElement(2);
+      tester.processElement(4);
+      tester.processElement(8);
+      assertThat(tester.peekOutputElements(), containsInAnyOrder(0, 0, 0, 0));
+    }
   }
 
   @Test
@@ -281,17 +337,17 @@ public class DoFnTesterTest {
     final PCollectionView<Integer> value =
         PCollectionViews.singletonView(
             TestPipeline.create(), WindowingStrategy.globalDefault(), true, 0, VarIntCoder.of());
-    OldDoFn<Integer, Integer> fn = new SideInputDoFn(value);
 
-    DoFnTester<Integer, Integer> tester = DoFnTester.of(fn);
-    tester.setSideInput(value, GlobalWindow.INSTANCE, -2);
-    tester.processElement(16);
-    tester.processElement(32);
-    tester.processElement(64);
-    tester.processElement(128);
-    tester.finishBundle();
+    try (DoFnTester<Integer, Integer> tester = DoFnTester.of(new SideInputDoFn(value))) {
+      tester.setSideInput(value, GlobalWindow.INSTANCE, -2);
+      tester.processElement(16);
+      tester.processElement(32);
+      tester.processElement(64);
+      tester.processElement(128);
+      tester.finishBundle();
 
-    assertThat(tester.peekOutputElements(), containsInAnyOrder(-2, -2, -2, -2));
+      assertThat(tester.peekOutputElements(), containsInAnyOrder(-2, -2, -2, -2));
+    }
   }
 
   private static class SideInputDoFn extends OldDoFn<Integer, Integer> {
@@ -308,50 +364,56 @@ public class DoFnTesterTest {
   }
 
   /**
-   * An {@link OldDoFn} that adds values to an aggregator and converts input to String in
+   * A {@link DoFn} that adds values to an aggregator and converts input to String in
    * {@link OldDoFn#processElement).
    */
-  private static class CounterDoFn extends OldDoFn<Long, String> {
+  private static class CounterDoFn extends DoFn<Long, String> {
     Aggregator<Long, Long> agg = createAggregator("ctr", new Sum.SumLongFn());
-    private final long startBundleVal;
-    private final long finishBundleVal;
-    private boolean startBundleCalled;
-    private boolean finishBundleCalled;
-
-    public CounterDoFn() {
-      this(0L, 0L);
+    Aggregator<Long, Long> startBundleCalls =
+        createAggregator("startBundleCalls", new Sum.SumLongFn());
+    Aggregator<Long, Long> finishBundleCalls =
+        createAggregator("finishBundleCalls", new Sum.SumLongFn());
+
+    private enum LifecycleState {
+      UNINITIALIZED,
+      SET_UP,
+      INSIDE_BUNDLE,
+      TORN_DOWN
     }
+    private LifecycleState state = LifecycleState.UNINITIALIZED;
 
-    public CounterDoFn(long start, long finish) {
-      this.startBundleVal = start;
-      this.finishBundleVal = finish;
+    @Setup
+    public void setup() {
+      checkState(state == LifecycleState.UNINITIALIZED, "Wrong state: %s", state);
+      state = LifecycleState.SET_UP;
     }
 
-    @Override
+    @StartBundle
     public void startBundle(Context c) {
-      agg.addValue(startBundleVal);
-      startBundleCalled = true;
+      checkState(state == LifecycleState.SET_UP, "Wrong state: %s", state);
+      state = LifecycleState.INSIDE_BUNDLE;
+      startBundleCalls.addValue(1L);
     }
 
-    @Override
+    @ProcessElement
     public void processElement(ProcessContext c) throws Exception {
+      checkState(state == LifecycleState.INSIDE_BUNDLE, "Wrong state: %s", state);
       agg.addValue(c.element());
       Instant instant = new Instant(1000L * c.element());
       c.outputWithTimestamp(c.element().toString(), instant);
     }
 
-    @Override
+    @FinishBundle
     public void finishBundle(Context c) {
-      agg.addValue(finishBundleVal);
-      finishBundleCalled = true;
-    }
-
-    boolean wasStartBundleCalled() {
-      return startBundleCalled;
+      checkState(state == LifecycleState.INSIDE_BUNDLE, "Wrong state: %s", state);
+      state = LifecycleState.SET_UP;
+      finishBundleCalls.addValue(1L);
     }
 
-    boolean wasFinishBundleCalled() {
-      return finishBundleCalled;
+    @Teardown
+    public void teardown() {
+      checkState(state == LifecycleState.SET_UP, "Wrong state: %s", state);
+      state = LifecycleState.TORN_DOWN;
     }
   }
 }