You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@beam.apache.org by dh...@apache.org on 2016/08/15 21:17:09 UTC

[4/5] incubator-beam git commit: Move ParDo Lifecycle tests to their own file

Move ParDo Lifecycle tests to their own file

These tests are not yet functional in all runners, and this makes them
easier to ignore.


Project: http://git-wip-us.apache.org/repos/asf/incubator-beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-beam/commit/29cbdceb
Tree: http://git-wip-us.apache.org/repos/asf/incubator-beam/tree/29cbdceb
Diff: http://git-wip-us.apache.org/repos/asf/incubator-beam/diff/29cbdceb

Branch: refs/heads/master
Commit: 29cbdceb5b78ce86ad0d90050d7542b0d5b45362
Parents: 12abb1b
Author: Thomas Groh <tg...@google.com>
Authored: Thu Aug 11 10:45:43 2016 -0700
Committer: Dan Halperin <dh...@google.com>
Committed: Mon Aug 15 14:16:54 2016 -0700

----------------------------------------------------------------------
 runners/google-cloud-dataflow-java/pom.xml      |  10 +
 .../beam/sdk/transforms/ParDoLifecycleTest.java | 448 +++++++++++++++++++
 .../apache/beam/sdk/transforms/ParDoTest.java   | 405 -----------------
 3 files changed, 458 insertions(+), 405 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/29cbdceb/runners/google-cloud-dataflow-java/pom.xml
----------------------------------------------------------------------
diff --git a/runners/google-cloud-dataflow-java/pom.xml b/runners/google-cloud-dataflow-java/pom.xml
index 86991b7..c32e184 100644
--- a/runners/google-cloud-dataflow-java/pom.xml
+++ b/runners/google-cloud-dataflow-java/pom.xml
@@ -60,6 +60,16 @@
             <beamUseDummyRunner>true</beamUseDummyRunner>
           </systemPropertyVariables>
         </configuration>
+        <executions>
+          <execution>
+            <id>runnable-on-service-tests</id>
+            <configuration>
+              <excludes>
+                <exclude>org/apache/beam/sdk/transforms/ParDoLifecycleTest.java</exclude>
+              </excludes>
+            </configuration>
+          </execution>
+        </executions>
       </plugin>
 
       <!-- Run CheckStyle pass on transforms, as they are release in

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/29cbdceb/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/ParDoLifecycleTest.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/ParDoLifecycleTest.java b/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/ParDoLifecycleTest.java
new file mode 100644
index 0000000..272fea7
--- /dev/null
+++ b/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/ParDoLifecycleTest.java
@@ -0,0 +1,448 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.beam.sdk.transforms;
+
+import static org.hamcrest.Matchers.anyOf;
+import static org.hamcrest.Matchers.equalTo;
+import static org.hamcrest.Matchers.greaterThan;
+import static org.hamcrest.Matchers.is;
+import static org.junit.Assert.assertThat;
+import static org.junit.Assert.fail;
+
+import org.apache.beam.sdk.testing.NeedsRunner;
+import org.apache.beam.sdk.testing.RunnableOnService;
+import org.apache.beam.sdk.testing.TestPipeline;
+import org.apache.beam.sdk.values.PCollectionList;
+import org.apache.beam.sdk.values.TupleTag;
+import org.apache.beam.sdk.values.TupleTagList;
+
+import org.junit.Test;
+import org.junit.experimental.categories.Category;
+import org.junit.runner.RunWith;
+import org.junit.runners.JUnit4;
+
+import java.util.concurrent.atomic.AtomicBoolean;
+
+/**
+ * Tests that {@link ParDo} exercises {@link DoFn} methods in the appropriate sequence.
+ */
+@RunWith(JUnit4.class)
+public class ParDoLifecycleTest {
+  @Test
+  @Category(RunnableOnService.class)
+  public void testOldFnCallSequence() {
+    TestPipeline p = TestPipeline.create();
+    PCollectionList.of(p.apply("Impolite", Create.of(1, 2, 4)))
+        .and(p.apply("Polite", Create.of(3, 5, 6, 7)))
+        .apply(Flatten.<Integer>pCollections())
+        .apply(ParDo.of(new CallSequenceEnforcingOldFn<Integer>()));
+
+    p.run();
+  }
+
+  @Test
+  @Category(RunnableOnService.class)
+  public void testOldFnCallSequenceMulti() {
+    TestPipeline p = TestPipeline.create();
+    PCollectionList.of(p.apply("Impolite", Create.of(1, 2, 4)))
+        .and(p.apply("Polite", Create.of(3, 5, 6, 7)))
+        .apply(Flatten.<Integer>pCollections())
+        .apply(ParDo.of(new CallSequenceEnforcingOldFn<Integer>())
+            .withOutputTags(new TupleTag<Integer>() {}, TupleTagList.empty()));
+
+    p.run();
+  }
+
+  private static class CallSequenceEnforcingOldFn<T> extends OldDoFn<T, T> {
+    private boolean setupCalled = false;
+    private int startBundleCalls = 0;
+    private int finishBundleCalls = 0;
+    private boolean teardownCalled = false;
+
+    @Override
+    public void setup() {
+      assertThat("setup should not be called twice", setupCalled, is(false));
+      assertThat("setup should be called before startBundle", startBundleCalls, equalTo(0));
+      assertThat("setup should be called before finishBundle", finishBundleCalls, equalTo(0));
+      assertThat("setup should be called before teardown", teardownCalled, is(false));
+      setupCalled = true;
+    }
+
+    @Override
+    public void startBundle(Context c) {
+      assertThat("setup should have been called", setupCalled, is(true));
+      assertThat(
+          "Even number of startBundle and finishBundle calls in startBundle",
+          startBundleCalls,
+          equalTo(finishBundleCalls));
+      assertThat("teardown should not have been called", teardownCalled, is(false));
+      startBundleCalls++;
+    }
+
+    @Override
+    public void processElement(ProcessContext c) throws Exception {
+      assertThat("startBundle should have been called", startBundleCalls, greaterThan(0));
+      assertThat(
+          "there should be one startBundle call with no call to finishBundle",
+          startBundleCalls,
+          equalTo(finishBundleCalls + 1));
+      assertThat("teardown should not have been called", teardownCalled, is(false));
+    }
+
+    @Override
+    public void finishBundle(Context c) {
+      assertThat("startBundle should have been called", startBundleCalls, greaterThan(0));
+      assertThat(
+          "there should be one bundle that has been started but not finished",
+          startBundleCalls,
+          equalTo(finishBundleCalls + 1));
+      assertThat("teardown should not have been called", teardownCalled, is(false));
+      finishBundleCalls++;
+    }
+
+    @Override
+    public void teardown() {
+      assertThat(setupCalled, is(true));
+      assertThat(startBundleCalls, anyOf(equalTo(finishBundleCalls)));
+      assertThat(teardownCalled, is(false));
+      teardownCalled = true;
+    }
+  }
+
+  @Test
+  @Category(RunnableOnService.class)
+  public void testFnCallSequence() {
+    TestPipeline p = TestPipeline.create();
+    PCollectionList.of(p.apply("Impolite", Create.of(1, 2, 4)))
+        .and(p.apply("Polite", Create.of(3, 5, 6, 7)))
+        .apply(Flatten.<Integer>pCollections())
+        .apply(ParDo.of(new CallSequenceEnforcingFn<Integer>()));
+
+    p.run();
+  }
+
+  @Test
+  @Category(RunnableOnService.class)
+  public void testFnCallSequenceMulti() {
+    TestPipeline p = TestPipeline.create();
+    PCollectionList.of(p.apply("Impolite", Create.of(1, 2, 4)))
+        .and(p.apply("Polite", Create.of(3, 5, 6, 7)))
+        .apply(Flatten.<Integer>pCollections())
+        .apply(ParDo.of(new CallSequenceEnforcingFn<Integer>())
+            .withOutputTags(new TupleTag<Integer>() {
+            }, TupleTagList.empty()));
+
+    p.run();
+  }
+
+  private static class CallSequenceEnforcingFn<T> extends DoFn<T, T> {
+    private boolean setupCalled = false;
+    private int startBundleCalls = 0;
+    private int finishBundleCalls = 0;
+    private boolean teardownCalled = false;
+
+    @Setup
+    public void before() {
+      assertThat("setup should not be called twice", setupCalled, is(false));
+      assertThat("setup should be called before startBundle", startBundleCalls, equalTo(0));
+      assertThat("setup should be called before finishBundle", finishBundleCalls, equalTo(0));
+      assertThat("setup should be called before teardown", teardownCalled, is(false));
+      setupCalled = true;
+    }
+
+    @StartBundle
+    public void begin(Context c) {
+      assertThat("setup should have been called", setupCalled, is(true));
+      assertThat("Even number of startBundle and finishBundle calls in startBundle",
+          startBundleCalls,
+          equalTo(finishBundleCalls));
+      assertThat("teardown should not have been called", teardownCalled, is(false));
+      startBundleCalls++;
+    }
+
+    @ProcessElement
+    public void process(ProcessContext c) throws Exception {
+      assertThat("startBundle should have been called", startBundleCalls, greaterThan(0));
+      assertThat("there should be one startBundle call with no call to finishBundle",
+          startBundleCalls,
+          equalTo(finishBundleCalls + 1));
+      assertThat("teardown should not have been called", teardownCalled, is(false));
+    }
+
+    @FinishBundle
+    public void end(Context c) {
+      assertThat("startBundle should have been called", startBundleCalls, greaterThan(0));
+      assertThat("there should be one bundle that has been started but not finished",
+          startBundleCalls,
+          equalTo(finishBundleCalls + 1));
+      assertThat("teardown should not have been called", teardownCalled, is(false));
+      finishBundleCalls++;
+    }
+
+    @Teardown
+    public void after() {
+      assertThat(setupCalled, is(true));
+      assertThat(startBundleCalls, anyOf(equalTo(finishBundleCalls)));
+      assertThat(teardownCalled, is(false));
+      teardownCalled = true;
+    }
+  }
+
+  @Test
+  @Category(NeedsRunner.class)
+  public void testTeardownCalledAfterExceptionInSetup() {
+    TestPipeline p = TestPipeline.create();
+    ExceptionThrowingOldFn fn = new ExceptionThrowingOldFn(MethodForException.SETUP);
+    p
+        .apply(Create.of(1, 2, 3))
+        .apply(ParDo.of(fn));
+    try {
+      p.run();
+      fail("Pipeline should have failed with an exception");
+    } catch (Exception e) {
+      assertThat(
+          "Function should have been torn down after exception",
+          ExceptionThrowingOldFn.teardownCalled.get(),
+          is(true));
+    }
+  }
+
+
+
+  @Test
+  @Category(NeedsRunner.class)
+  public void testTeardownCalledAfterExceptionInStartBundle() {
+    TestPipeline p = TestPipeline.create();
+    ExceptionThrowingOldFn fn = new ExceptionThrowingOldFn(MethodForException.START_BUNDLE);
+    p
+        .apply(Create.of(1, 2, 3))
+        .apply(ParDo.of(fn));
+    try {
+      p.run();
+      fail("Pipeline should have failed with an exception");
+    } catch (Exception e) {
+      assertThat(
+          "Function should have been torn down after exception",
+          ExceptionThrowingOldFn.teardownCalled.get(),
+          is(true));
+    }
+  }
+
+  @Test
+  @Category(NeedsRunner.class)
+  public void testTeardownCalledAfterExceptionInProcessElement() {
+    TestPipeline p = TestPipeline.create();
+    ExceptionThrowingOldFn fn = new ExceptionThrowingOldFn(MethodForException.PROCESS_ELEMENT);
+    p
+        .apply(Create.of(1, 2, 3))
+        .apply(ParDo.of(fn));
+    try {
+      p.run();
+      fail("Pipeline should have failed with an exception");
+    } catch (Exception e) {
+      assertThat(
+          "Function should have been torn down after exception",
+          ExceptionThrowingOldFn.teardownCalled.get(),
+          is(true));
+    }
+  }
+
+  @Test
+  @Category(NeedsRunner.class)
+  public void testTeardownCalledAfterExceptionInFinishBundle() {
+    TestPipeline p = TestPipeline.create();
+    ExceptionThrowingOldFn fn = new ExceptionThrowingOldFn(MethodForException.FINISH_BUNDLE);
+    p
+        .apply(Create.of(1, 2, 3))
+        .apply(ParDo.of(fn));
+    try {
+      p.run();
+      fail("Pipeline should have failed with an exception");
+    } catch (Exception e) {
+      assertThat(
+          "Function should have been torn down after exception",
+          ExceptionThrowingOldFn.teardownCalled.get(),
+          is(true));
+    }
+  }
+
+  @Test
+  @Category(NeedsRunner.class)
+  public void testWithContextTeardownCalledAfterExceptionInSetup() {
+    TestPipeline p = TestPipeline.create();
+    ExceptionThrowingOldFn fn = new ExceptionThrowingOldFn(MethodForException.SETUP);
+    p.apply(Create.of(1, 2, 3)).apply(ParDo.of(fn));
+    try {
+      p.run();
+      fail("Pipeline should have failed with an exception");
+    } catch (Exception e) {
+      assertThat("Function should have been torn down after exception",
+          ExceptionThrowingOldFn.teardownCalled.get(),
+          is(true));
+    }
+  }
+
+  @Test
+  @Category(NeedsRunner.class)
+  public void testWithContextTeardownCalledAfterExceptionInStartBundle() {
+    TestPipeline p = TestPipeline.create();
+    ExceptionThrowingOldFn fn = new ExceptionThrowingOldFn(MethodForException.START_BUNDLE);
+    p.apply(Create.of(1, 2, 3)).apply(ParDo.of(fn));
+    try {
+      p.run();
+      fail("Pipeline should have failed with an exception");
+    } catch (Exception e) {
+      assertThat("Function should have been torn down after exception",
+          ExceptionThrowingOldFn.teardownCalled.get(),
+          is(true));
+    }
+  }
+
+  @Test
+  @Category(NeedsRunner.class)
+  public void testWithContextTeardownCalledAfterExceptionInProcessElement() {
+    TestPipeline p = TestPipeline.create();
+    ExceptionThrowingOldFn fn = new ExceptionThrowingOldFn(MethodForException.PROCESS_ELEMENT);
+    p.apply(Create.of(1, 2, 3)).apply(ParDo.of(fn));
+    try {
+      p.run();
+      fail("Pipeline should have failed with an exception");
+    } catch (Exception e) {
+      assertThat("Function should have been torn down after exception",
+          ExceptionThrowingOldFn.teardownCalled.get(),
+          is(true));
+    }
+  }
+
+  @Test
+  @Category(NeedsRunner.class)
+  public void testWithContextTeardownCalledAfterExceptionInFinishBundle() {
+    TestPipeline p = TestPipeline.create();
+    ExceptionThrowingOldFn fn = new ExceptionThrowingOldFn(MethodForException.FINISH_BUNDLE);
+    p.apply(Create.of(1, 2, 3)).apply(ParDo.of(fn));
+    try {
+      p.run();
+      fail("Pipeline should have failed with an exception");
+    } catch (Exception e) {
+      assertThat("Function should have been torn down after exception",
+          ExceptionThrowingOldFn.teardownCalled.get(),
+          is(true));
+    }
+  }
+
+  private static class ExceptionThrowingOldFn extends OldDoFn<Object, Object> {
+    static AtomicBoolean teardownCalled = new AtomicBoolean(false);
+
+    private final MethodForException toThrow;
+    private boolean thrown;
+
+    private ExceptionThrowingOldFn(MethodForException toThrow) {
+      this.toThrow = toThrow;
+    }
+
+    @Override
+    public void setup() throws Exception {
+      throwIfNecessary(MethodForException.SETUP);
+    }
+
+    @Override
+    public void startBundle(Context c) throws Exception {
+      throwIfNecessary(MethodForException.START_BUNDLE);
+    }
+
+    @Override
+    public void processElement(ProcessContext c) throws Exception {
+      throwIfNecessary(MethodForException.PROCESS_ELEMENT);
+    }
+
+    @Override
+    public void finishBundle(Context c) throws Exception {
+      throwIfNecessary(MethodForException.FINISH_BUNDLE);
+    }
+
+    private void throwIfNecessary(MethodForException method) throws Exception {
+      if (toThrow == method && !thrown) {
+        thrown = true;
+        throw new Exception("Hasn't yet thrown");
+      }
+    }
+
+    @Override
+    public void teardown() {
+      if (!thrown) {
+        fail("Excepted to have a processing method throw an exception");
+      }
+      teardownCalled.set(true);
+    }
+  }
+
+
+  private static class ExceptionThrowingFn extends DoFn<Object, Object> {
+    static AtomicBoolean teardownCalled = new AtomicBoolean(false);
+
+    private final MethodForException toThrow;
+    private boolean thrown;
+
+    private ExceptionThrowingFn(MethodForException toThrow) {
+      this.toThrow = toThrow;
+    }
+
+    @Setup
+    public void before() throws Exception {
+      throwIfNecessary(MethodForException.SETUP);
+    }
+
+    @StartBundle
+    public void preBundle(Context c) throws Exception {
+      throwIfNecessary(MethodForException.START_BUNDLE);
+    }
+
+    @ProcessElement
+    public void perElement(ProcessContext c) throws Exception {
+      throwIfNecessary(MethodForException.PROCESS_ELEMENT);
+    }
+
+    @FinishBundle
+    public void postBundle(Context c) throws Exception {
+      throwIfNecessary(MethodForException.FINISH_BUNDLE);
+    }
+
+    private void throwIfNecessary(MethodForException method) throws Exception {
+      if (toThrow == method && !thrown) {
+        thrown = true;
+        throw new Exception("Hasn't yet thrown");
+      }
+    }
+
+    @Teardown
+    public void after() {
+      if (!thrown) {
+        fail("Excepted to have a processing method throw an exception");
+      }
+      teardownCalled.set(true);
+    }
+  }
+
+  private enum MethodForException {
+    SETUP,
+    START_BUNDLE,
+    PROCESS_ELEMENT,
+    FINISH_BUNDLE
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/29cbdceb/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/ParDoTest.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/ParDoTest.java b/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/ParDoTest.java
index 8460124..c384114 100644
--- a/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/ParDoTest.java
+++ b/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/ParDoTest.java
@@ -28,14 +28,11 @@ import static com.google.common.base.Preconditions.checkNotNull;
 import static org.hamcrest.Matchers.allOf;
 import static org.hamcrest.Matchers.anyOf;
 import static org.hamcrest.Matchers.equalTo;
-import static org.hamcrest.Matchers.greaterThan;
-import static org.hamcrest.Matchers.is;
 import static org.hamcrest.collection.IsIterableContainingInAnyOrder.containsInAnyOrder;
 import static org.hamcrest.collection.IsIterableContainingInOrder.contains;
 import static org.junit.Assert.assertArrayEquals;
 import static org.junit.Assert.assertEquals;
 import static org.junit.Assert.assertThat;
-import static org.junit.Assert.fail;
 
 import org.apache.beam.sdk.Pipeline;
 import org.apache.beam.sdk.coders.AtomicCoder;
@@ -54,7 +51,6 @@ import org.apache.beam.sdk.transforms.windowing.FixedWindows;
 import org.apache.beam.sdk.transforms.windowing.Window;
 import org.apache.beam.sdk.util.common.ElementByteSizeObserver;
 import org.apache.beam.sdk.values.PCollection;
-import org.apache.beam.sdk.values.PCollectionList;
 import org.apache.beam.sdk.values.PCollectionTuple;
 import org.apache.beam.sdk.values.PCollectionView;
 import org.apache.beam.sdk.values.TimestampedValue;
@@ -80,7 +76,6 @@ import java.util.ArrayList;
 import java.util.Arrays;
 import java.util.Collections;
 import java.util.List;
-import java.util.concurrent.atomic.AtomicBoolean;
 
 /**
  * Tests for ParDo.
@@ -1475,404 +1470,4 @@ public class ParDoTest implements Serializable {
     assertThat(displayData, includesDisplayDataFrom(fn));
     assertThat(displayData, hasDisplayItem("fn", fn.getClass()));
   }
-
-  @Test
-  @Category(RunnableOnService.class)
-  public void testFnCallSequence() {
-    TestPipeline p = TestPipeline.create();
-    PCollectionList.of(p.apply("Impolite", Create.of(1, 2, 4)))
-        .and(p.apply("Polite", Create.of(3, 5, 6, 7)))
-        .apply(Flatten.<Integer>pCollections())
-        .apply(ParDo.of(new CallSequenceEnforcingOldFn<Integer>()));
-
-    p.run();
-  }
-
-  @Test
-  @Category(RunnableOnService.class)
-  public void testFnCallSequenceMulti() {
-    TestPipeline p = TestPipeline.create();
-    PCollectionList.of(p.apply("Impolite", Create.of(1, 2, 4)))
-        .and(p.apply("Polite", Create.of(3, 5, 6, 7)))
-        .apply(Flatten.<Integer>pCollections())
-        .apply(ParDo.of(new CallSequenceEnforcingOldFn<Integer>())
-                .withOutputTags(new TupleTag<Integer>() {}, TupleTagList.empty()));
-
-    p.run();
-  }
-
-  private static class CallSequenceEnforcingOldFn<T> extends OldDoFn<T, T> {
-    private boolean setupCalled = false;
-    private int startBundleCalls = 0;
-    private int finishBundleCalls = 0;
-    private boolean teardownCalled = false;
-
-    @Override
-    public void setup() {
-      assertThat("setup should not be called twice", setupCalled, is(false));
-      assertThat("setup should be called before startBundle", startBundleCalls, equalTo(0));
-      assertThat("setup should be called before finishBundle", finishBundleCalls, equalTo(0));
-      assertThat("setup should be called before teardown", teardownCalled, is(false));
-      setupCalled = true;
-    }
-
-    @Override
-    public void startBundle(Context c) {
-      assertThat("setup should have been called", setupCalled, is(true));
-      assertThat(
-          "Even number of startBundle and finishBundle calls in startBundle",
-          startBundleCalls,
-          equalTo(finishBundleCalls));
-      assertThat("teardown should not have been called", teardownCalled, is(false));
-      startBundleCalls++;
-    }
-
-    @Override
-    public void processElement(ProcessContext c) throws Exception {
-      assertThat("startBundle should have been called", startBundleCalls, greaterThan(0));
-      assertThat(
-          "there should be one startBundle call with no call to finishBundle",
-          startBundleCalls,
-          equalTo(finishBundleCalls + 1));
-      assertThat("teardown should not have been called", teardownCalled, is(false));
-    }
-
-    @Override
-    public void finishBundle(Context c) {
-      assertThat("startBundle should have been called", startBundleCalls, greaterThan(0));
-      assertThat(
-          "there should be one bundle that has been started but not finished",
-          startBundleCalls,
-          equalTo(finishBundleCalls + 1));
-      assertThat("teardown should not have been called", teardownCalled, is(false));
-      finishBundleCalls++;
-    }
-
-    @Override
-    public void teardown() {
-      assertThat(setupCalled, is(true));
-      assertThat(startBundleCalls, anyOf(equalTo(finishBundleCalls)));
-      assertThat(teardownCalled, is(false));
-      teardownCalled = true;
-    }
-  }
-
-  @Test
-  @Category(RunnableOnService.class)
-  public void testFnWithContextCallSequence() {
-    TestPipeline p = TestPipeline.create();
-    PCollectionList.of(p.apply("Impolite", Create.of(1, 2, 4)))
-        .and(p.apply("Polite", Create.of(3, 5, 6, 7)))
-        .apply(Flatten.<Integer>pCollections())
-        .apply(ParDo.of(new CallSequenceEnforcingFn<Integer>()));
-
-    p.run();
-  }
-
-  @Test
-  @Category(RunnableOnService.class)
-  public void testFnWithContextCallSequenceMulti() {
-    TestPipeline p = TestPipeline.create();
-    PCollectionList.of(p.apply("Impolite", Create.of(1, 2, 4)))
-        .and(p.apply("Polite", Create.of(3, 5, 6, 7)))
-        .apply(Flatten.<Integer>pCollections())
-        .apply(ParDo.of(new CallSequenceEnforcingFn<Integer>())
-            .withOutputTags(new TupleTag<Integer>() {
-            }, TupleTagList.empty()));
-
-    p.run();
-  }
-
-  private static class CallSequenceEnforcingFn<T> extends DoFn<T, T> {
-    private boolean setupCalled = false;
-    private int startBundleCalls = 0;
-    private int finishBundleCalls = 0;
-    private boolean teardownCalled = false;
-
-    @Setup
-    public void before() {
-      assertThat("setup should not be called twice", setupCalled, is(false));
-      assertThat("setup should be called before startBundle", startBundleCalls, equalTo(0));
-      assertThat("setup should be called before finishBundle", finishBundleCalls, equalTo(0));
-      assertThat("setup should be called before teardown", teardownCalled, is(false));
-      setupCalled = true;
-    }
-
-    @StartBundle
-    public void begin(Context c) {
-      assertThat("setup should have been called", setupCalled, is(true));
-      assertThat("Even number of startBundle and finishBundle calls in startBundle",
-          startBundleCalls,
-          equalTo(finishBundleCalls));
-      assertThat("teardown should not have been called", teardownCalled, is(false));
-      startBundleCalls++;
-    }
-
-    @ProcessElement
-    public void process(ProcessContext c) throws Exception {
-      assertThat("startBundle should have been called", startBundleCalls, greaterThan(0));
-      assertThat("there should be one startBundle call with no call to finishBundle",
-          startBundleCalls,
-          equalTo(finishBundleCalls + 1));
-      assertThat("teardown should not have been called", teardownCalled, is(false));
-    }
-
-    @FinishBundle
-    public void end(Context c) {
-      assertThat("startBundle should have been called", startBundleCalls, greaterThan(0));
-      assertThat("there should be one bundle that has been started but not finished",
-          startBundleCalls,
-          equalTo(finishBundleCalls + 1));
-      assertThat("teardown should not have been called", teardownCalled, is(false));
-      finishBundleCalls++;
-    }
-
-    @Teardown
-    public void after() {
-      assertThat(setupCalled, is(true));
-      assertThat(startBundleCalls, anyOf(equalTo(finishBundleCalls)));
-      assertThat(teardownCalled, is(false));
-      teardownCalled = true;
-    }
-  }
-
-  @Test
-  @Category(NeedsRunner.class)
-  public void testTeardownCalledAfterExceptionInSetup() {
-    TestPipeline p = TestPipeline.create();
-    ExceptionThrowingOldFn fn = new ExceptionThrowingOldFn(MethodForException.SETUP);
-    p
-        .apply(Create.of(1, 2, 3))
-        .apply(ParDo.of(fn));
-    try {
-      p.run();
-      fail("Pipeline should have failed with an exception");
-    } catch (Exception e) {
-      assertThat(
-          "Function should have been torn down after exception",
-          ExceptionThrowingOldFn.teardownCalled.get(),
-          is(true));
-    }
-  }
-
-  @Test
-  @Category(NeedsRunner.class)
-  public void testTeardownCalledAfterExceptionInStartBundle() {
-    TestPipeline p = TestPipeline.create();
-    ExceptionThrowingOldFn fn = new ExceptionThrowingOldFn(MethodForException.START_BUNDLE);
-    p
-        .apply(Create.of(1, 2, 3))
-        .apply(ParDo.of(fn));
-    try {
-      p.run();
-      fail("Pipeline should have failed with an exception");
-    } catch (Exception e) {
-      assertThat(
-          "Function should have been torn down after exception",
-          ExceptionThrowingOldFn.teardownCalled.get(),
-          is(true));
-    }
-  }
-
-  @Test
-  @Category(NeedsRunner.class)
-  public void testTeardownCalledAfterExceptionInProcessElement() {
-    TestPipeline p = TestPipeline.create();
-    ExceptionThrowingOldFn fn = new ExceptionThrowingOldFn(MethodForException.PROCESS_ELEMENT);
-    p
-        .apply(Create.of(1, 2, 3))
-        .apply(ParDo.of(fn));
-    try {
-      p.run();
-      fail("Pipeline should have failed with an exception");
-    } catch (Exception e) {
-      assertThat(
-          "Function should have been torn down after exception",
-          ExceptionThrowingOldFn.teardownCalled.get(),
-          is(true));
-    }
-  }
-
-  @Test
-  @Category(NeedsRunner.class)
-  public void testTeardownCalledAfterExceptionInFinishBundle() {
-    TestPipeline p = TestPipeline.create();
-    ExceptionThrowingOldFn fn = new ExceptionThrowingOldFn(MethodForException.FINISH_BUNDLE);
-    p
-        .apply(Create.of(1, 2, 3))
-        .apply(ParDo.of(fn));
-    try {
-      p.run();
-      fail("Pipeline should have failed with an exception");
-    } catch (Exception e) {
-      assertThat(
-          "Function should have been torn down after exception",
-          ExceptionThrowingOldFn.teardownCalled.get(),
-          is(true));
-    }
-  }
-
-  @Test
-  @Category(NeedsRunner.class)
-  public void testWithContextTeardownCalledAfterExceptionInSetup() {
-    TestPipeline p = TestPipeline.create();
-    ExceptionThrowingOldFn fn = new ExceptionThrowingOldFn(MethodForException.SETUP);
-    p.apply(Create.of(1, 2, 3)).apply(ParDo.of(fn));
-    try {
-      p.run();
-      fail("Pipeline should have failed with an exception");
-    } catch (Exception e) {
-      assertThat("Function should have been torn down after exception",
-          ExceptionThrowingOldFn.teardownCalled.get(),
-          is(true));
-    }
-  }
-
-  @Test
-  @Category(NeedsRunner.class)
-  public void testWithContextTeardownCalledAfterExceptionInStartBundle() {
-    TestPipeline p = TestPipeline.create();
-    ExceptionThrowingOldFn fn = new ExceptionThrowingOldFn(MethodForException.START_BUNDLE);
-    p.apply(Create.of(1, 2, 3)).apply(ParDo.of(fn));
-    try {
-      p.run();
-      fail("Pipeline should have failed with an exception");
-    } catch (Exception e) {
-      assertThat("Function should have been torn down after exception",
-          ExceptionThrowingOldFn.teardownCalled.get(),
-          is(true));
-    }
-  }
-
-  @Test
-  @Category(NeedsRunner.class)
-  public void testWithContextTeardownCalledAfterExceptionInProcessElement() {
-    TestPipeline p = TestPipeline.create();
-    ExceptionThrowingOldFn fn = new ExceptionThrowingOldFn(MethodForException.PROCESS_ELEMENT);
-    p.apply(Create.of(1, 2, 3)).apply(ParDo.of(fn));
-    try {
-      p.run();
-      fail("Pipeline should have failed with an exception");
-    } catch (Exception e) {
-      assertThat("Function should have been torn down after exception",
-          ExceptionThrowingOldFn.teardownCalled.get(),
-          is(true));
-    }
-  }
-
-  @Test
-  @Category(NeedsRunner.class)
-  public void testWithContextTeardownCalledAfterExceptionInFinishBundle() {
-    TestPipeline p = TestPipeline.create();
-    ExceptionThrowingOldFn fn = new ExceptionThrowingOldFn(MethodForException.FINISH_BUNDLE);
-    p.apply(Create.of(1, 2, 3)).apply(ParDo.of(fn));
-    try {
-      p.run();
-      fail("Pipeline should have failed with an exception");
-    } catch (Exception e) {
-      assertThat("Function should have been torn down after exception",
-          ExceptionThrowingOldFn.teardownCalled.get(),
-          is(true));
-    }
-  }
-
-  private static class ExceptionThrowingOldFn extends OldDoFn<Object, Object> {
-    static AtomicBoolean teardownCalled = new AtomicBoolean(false);
-
-    private final MethodForException toThrow;
-    private boolean thrown;
-
-    private ExceptionThrowingOldFn(MethodForException toThrow) {
-      this.toThrow = toThrow;
-    }
-
-    @Override
-    public void setup() throws Exception {
-      throwIfNecessary(MethodForException.SETUP);
-    }
-
-    @Override
-    public void startBundle(Context c) throws Exception {
-      throwIfNecessary(MethodForException.START_BUNDLE);
-    }
-
-    @Override
-    public void processElement(ProcessContext c) throws Exception {
-      throwIfNecessary(MethodForException.PROCESS_ELEMENT);
-    }
-
-    @Override
-    public void finishBundle(Context c) throws Exception {
-      throwIfNecessary(MethodForException.FINISH_BUNDLE);
-    }
-
-    private void throwIfNecessary(MethodForException method) throws Exception {
-      if (toThrow == method && !thrown) {
-        thrown = true;
-        throw new Exception("Hasn't yet thrown");
-      }
-    }
-
-    @Override
-    public void teardown() {
-      if (!thrown) {
-        fail("Excepted to have a processing method throw an exception");
-      }
-      teardownCalled.set(true);
-    }
-  }
-
-
-  private static class ExceptionThrowingFn extends DoFn<Object, Object> {
-    static AtomicBoolean teardownCalled = new AtomicBoolean(false);
-
-    private final MethodForException toThrow;
-    private boolean thrown;
-
-    private ExceptionThrowingFn(MethodForException toThrow) {
-      this.toThrow = toThrow;
-    }
-
-    @Setup
-    public void before() throws Exception {
-      throwIfNecessary(MethodForException.SETUP);
-    }
-
-    @StartBundle
-    public void preBundle(Context c) throws Exception {
-      throwIfNecessary(MethodForException.START_BUNDLE);
-    }
-
-    @ProcessElement
-    public void perElement(ProcessContext c) throws Exception {
-      throwIfNecessary(MethodForException.PROCESS_ELEMENT);
-    }
-
-    @FinishBundle
-    public void postBundle(Context c) throws Exception {
-      throwIfNecessary(MethodForException.FINISH_BUNDLE);
-    }
-
-    private void throwIfNecessary(MethodForException method) throws Exception {
-      if (toThrow == method && !thrown) {
-        thrown = true;
-        throw new Exception("Hasn't yet thrown");
-      }
-    }
-
-    @Teardown
-    public void after() {
-      if (!thrown) {
-        fail("Excepted to have a processing method throw an exception");
-      }
-      teardownCalled.set(true);
-    }
-  }
-
-  private enum MethodForException {
-    SETUP,
-    START_BUNDLE,
-    PROCESS_ELEMENT,
-    FINISH_BUNDLE
-  }
 }