You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@beam.apache.org by ke...@apache.org on 2016/12/20 19:04:57 UTC

[07/17] incubator-beam git commit: Migrated the beam-runners-direct-java module to TestPipeline as a JUnit rule.

Migrated the beam-runners-direct-java module to TestPipeline as a JUnit rule.


Project: http://git-wip-us.apache.org/repos/asf/incubator-beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-beam/commit/09c404a6
Tree: http://git-wip-us.apache.org/repos/asf/incubator-beam/tree/09c404a6
Diff: http://git-wip-us.apache.org/repos/asf/incubator-beam/diff/09c404a6

Branch: refs/heads/master
Commit: 09c404a6c407898fcbc2fd22797cba4da8839b93
Parents: b671025
Author: Stas Levin <st...@gmail.com>
Authored: Mon Dec 19 10:20:16 2016 +0200
Committer: Kenneth Knowles <kl...@google.com>
Committed: Tue Dec 20 09:55:45 2016 -0800

----------------------------------------------------------------------
 .../direct/BoundedReadEvaluatorFactoryTest.java       | 13 ++++++-------
 .../beam/runners/direct/CloningBundleFactoryTest.java |  8 ++------
 .../beam/runners/direct/CommittedResultTest.java      |  6 +++++-
 .../CopyOnAccessInMemoryStateInternalsTest.java       | 11 +++++++++--
 .../beam/runners/direct/DirectGraphVisitorTest.java   |  3 ++-
 .../beam/runners/direct/EvaluationContextTest.java    |  7 ++++---
 .../runners/direct/FlattenEvaluatorFactoryTest.java   |  6 ++++--
 .../direct/GroupByKeyEvaluatorFactoryTest.java        |  5 ++++-
 .../direct/GroupByKeyOnlyEvaluatorFactoryTest.java    |  5 ++++-
 .../direct/ImmutabilityCheckingBundleFactoryTest.java |  4 +++-
 .../direct/ImmutabilityEnforcementFactoryTest.java    |  3 ++-
 .../direct/ImmutableListBundleFactoryTest.java        | 14 +++++++++++---
 .../direct/KeyedPValueTrackingVisitorTest.java        |  6 +++---
 .../beam/runners/direct/ParDoEvaluatorTest.java       |  5 ++++-
 .../beam/runners/direct/SideInputContainerTest.java   |  5 +++--
 .../direct/StatefulParDoEvaluatorFactoryTest.java     |  7 +++++--
 .../beam/runners/direct/StepTransformResultTest.java  |  5 ++++-
 .../direct/TestStreamEvaluatorFactoryTest.java        |  5 ++++-
 .../beam/runners/direct/TransformExecutorTest.java    |  4 +++-
 .../direct/UnboundedReadEvaluatorFactoryTest.java     |  9 ++++-----
 .../beam/runners/direct/ViewEvaluatorFactoryTest.java |  5 ++++-
 .../runners/direct/WatermarkCallbackExecutorTest.java |  5 ++++-
 .../beam/runners/direct/WatermarkManagerTest.java     |  6 ++++--
 .../runners/direct/WindowEvaluatorFactoryTest.java    |  5 ++++-
 .../runners/direct/WriteWithShardingFactoryTest.java  | 14 +++++++-------
 25 files changed, 109 insertions(+), 57 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/09c404a6/runners/direct-java/src/test/java/org/apache/beam/runners/direct/BoundedReadEvaluatorFactoryTest.java
----------------------------------------------------------------------
diff --git a/runners/direct-java/src/test/java/org/apache/beam/runners/direct/BoundedReadEvaluatorFactoryTest.java b/runners/direct-java/src/test/java/org/apache/beam/runners/direct/BoundedReadEvaluatorFactoryTest.java
index acb1444..97eae27 100644
--- a/runners/direct-java/src/test/java/org/apache/beam/runners/direct/BoundedReadEvaluatorFactoryTest.java
+++ b/runners/direct-java/src/test/java/org/apache/beam/runners/direct/BoundedReadEvaluatorFactoryTest.java
@@ -62,6 +62,7 @@ import org.apache.beam.sdk.values.PCollection;
 import org.hamcrest.Matchers;
 import org.joda.time.Instant;
 import org.junit.Before;
+import org.junit.Rule;
 import org.junit.Test;
 import org.junit.runner.RunWith;
 import org.junit.runners.JUnit4;
@@ -82,11 +83,13 @@ public class BoundedReadEvaluatorFactoryTest {
   private BundleFactory bundleFactory;
   private AppliedPTransform<?, ?, ?> longsProducer;
 
+  @Rule
+  public TestPipeline p = TestPipeline.create().enableAbandonedNodeEnforcement(false);
+
   @Before
   public void setup() {
     MockitoAnnotations.initMocks(this);
     source = CountingSource.upTo(10L);
-    TestPipeline p = TestPipeline.create();
     longs = p.apply(Read.from(source));
 
     factory =
@@ -142,7 +145,7 @@ public class BoundedReadEvaluatorFactoryTest {
       elems[i] = (long) i;
     }
     PCollection<Long> read =
-        TestPipeline.create().apply(Read.from(new TestSource<>(VarLongCoder.of(), 5, elems)));
+        p.apply(Read.from(new TestSource<>(VarLongCoder.of(), 5, elems)));
     AppliedPTransform<?, ?, ?> transform = DirectGraphs.getProducer(read);
     Collection<CommittedBundle<?>> unreadInputs =
         new BoundedReadEvaluatorFactory.InputProvider(context).getInitialInputs(transform, 1);
@@ -191,8 +194,7 @@ public class BoundedReadEvaluatorFactoryTest {
     BoundedReadEvaluatorFactory factory = new BoundedReadEvaluatorFactory(context, 0L);
 
     PCollection<Long> read =
-        TestPipeline.create()
-            .apply(Read.from(SourceTestUtils.toUnsplittableSource(CountingSource.upTo(10L))));
+        p.apply(Read.from(SourceTestUtils.toUnsplittableSource(CountingSource.upTo(10L))));
     AppliedPTransform<?, ?, ?> transform = DirectGraphs.getProducer(read);
 
     when(context.createRootBundle()).thenReturn(bundleFactory.createRootBundle());
@@ -298,8 +300,6 @@ public class BoundedReadEvaluatorFactoryTest {
   @Test
   public void boundedSourceEvaluatorClosesReader() throws Exception {
     TestSource<Long> source = new TestSource<>(BigEndianLongCoder.of(), 1L, 2L, 3L);
-
-    TestPipeline p = TestPipeline.create();
     PCollection<Long> pcollection = p.apply(Read.from(source));
     AppliedPTransform<?, ?, ?> sourceTransform = DirectGraphs.getProducer(pcollection);
 
@@ -320,7 +320,6 @@ public class BoundedReadEvaluatorFactoryTest {
   public void boundedSourceEvaluatorNoElementsClosesReader() throws Exception {
     TestSource<Long> source = new TestSource<>(BigEndianLongCoder.of());
 
-    TestPipeline p = TestPipeline.create();
     PCollection<Long> pcollection = p.apply(Read.from(source));
     AppliedPTransform<?, ?, ?> sourceTransform = DirectGraphs.getProducer(pcollection);
 

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/09c404a6/runners/direct-java/src/test/java/org/apache/beam/runners/direct/CloningBundleFactoryTest.java
----------------------------------------------------------------------
diff --git a/runners/direct-java/src/test/java/org/apache/beam/runners/direct/CloningBundleFactoryTest.java b/runners/direct-java/src/test/java/org/apache/beam/runners/direct/CloningBundleFactoryTest.java
index bafab59..e5299a2 100644
--- a/runners/direct-java/src/test/java/org/apache/beam/runners/direct/CloningBundleFactoryTest.java
+++ b/runners/direct-java/src/test/java/org/apache/beam/runners/direct/CloningBundleFactoryTest.java
@@ -62,6 +62,8 @@ import org.junit.runners.JUnit4;
 @RunWith(JUnit4.class)
 public class CloningBundleFactoryTest {
   @Rule public ExpectedException thrown = ExpectedException.none();
+  @Rule public TestPipeline p = TestPipeline.create().enableAbandonedNodeEnforcement(false);
+
   private CloningBundleFactory factory = CloningBundleFactory.create();
 
   @Test
@@ -76,7 +78,6 @@ public class CloningBundleFactoryTest {
 
   @Test
   public void bundleWorkingCoderSucceedsClonesOutput() {
-    TestPipeline p = TestPipeline.create();
     PCollection<Integer> created = p.apply(Create.of(1, 3).withCoder(VarIntCoder.of()));
     PCollection<KV<String, Integer>> kvs =
         created
@@ -101,7 +102,6 @@ public class CloningBundleFactoryTest {
 
   @Test
   public void keyedBundleWorkingCoderSucceedsClonesOutput() {
-    TestPipeline p = TestPipeline.create();
     PCollection<Integer> created = p.apply(Create.of(1, 3).withCoder(VarIntCoder.of()));
 
     PCollection<KV<String, Iterable<Integer>>> keyed =
@@ -130,7 +130,6 @@ public class CloningBundleFactoryTest {
 
   @Test
   public void bundleEncodeFailsAddFails() {
-    TestPipeline p = TestPipeline.create();
     PCollection<Record> pc = p.apply(Create.<Record>of().withCoder(new RecordNoEncodeCoder()));
     UncommittedBundle<Record> bundle = factory.createBundle(pc);
 
@@ -142,7 +141,6 @@ public class CloningBundleFactoryTest {
 
   @Test
   public void bundleDecodeFailsAddFails() {
-    TestPipeline p = TestPipeline.create();
     PCollection<Record> pc = p.apply(Create.<Record>of().withCoder(new RecordNoDecodeCoder()));
     UncommittedBundle<Record> bundle = factory.createBundle(pc);
 
@@ -154,7 +152,6 @@ public class CloningBundleFactoryTest {
 
   @Test
   public void keyedBundleEncodeFailsAddFails() {
-    TestPipeline p = TestPipeline.create();
     PCollection<Record> pc = p.apply(Create.<Record>of().withCoder(new RecordNoEncodeCoder()));
     UncommittedBundle<Record> bundle =
         factory.createKeyedBundle(StructuralKey.of("foo", StringUtf8Coder.of()), pc);
@@ -167,7 +164,6 @@ public class CloningBundleFactoryTest {
 
   @Test
   public void keyedBundleDecodeFailsAddFails() {
-    TestPipeline p = TestPipeline.create();
     PCollection<Record> pc = p.apply(Create.<Record>of().withCoder(new RecordNoDecodeCoder()));
     UncommittedBundle<Record> bundle =
         factory.createKeyedBundle(StructuralKey.of("foo", StringUtf8Coder.of()), pc);

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/09c404a6/runners/direct-java/src/test/java/org/apache/beam/runners/direct/CommittedResultTest.java
----------------------------------------------------------------------
diff --git a/runners/direct-java/src/test/java/org/apache/beam/runners/direct/CommittedResultTest.java b/runners/direct-java/src/test/java/org/apache/beam/runners/direct/CommittedResultTest.java
index c6986c0..736f554 100644
--- a/runners/direct-java/src/test/java/org/apache/beam/runners/direct/CommittedResultTest.java
+++ b/runners/direct-java/src/test/java/org/apache/beam/runners/direct/CommittedResultTest.java
@@ -38,6 +38,7 @@ import org.apache.beam.sdk.values.PCollection;
 import org.apache.beam.sdk.values.PDone;
 import org.hamcrest.Matchers;
 import org.joda.time.Instant;
+import org.junit.Rule;
 import org.junit.Test;
 import org.junit.runner.RunWith;
 import org.junit.runners.JUnit4;
@@ -47,7 +48,10 @@ import org.junit.runners.JUnit4;
  */
 @RunWith(JUnit4.class)
 public class CommittedResultTest implements Serializable {
-  private transient TestPipeline p = TestPipeline.create();
+
+  @Rule
+  public transient TestPipeline p = TestPipeline.create().enableAbandonedNodeEnforcement(false);
+
   private transient PCollection<Integer> created = p.apply(Create.of(1, 2));
   private transient AppliedPTransform<?, ?, ?> transform =
       AppliedPTransform.of("foo", p.begin(), PDone.in(p), new PTransform<PBegin, PDone>() {

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/09c404a6/runners/direct-java/src/test/java/org/apache/beam/runners/direct/CopyOnAccessInMemoryStateInternalsTest.java
----------------------------------------------------------------------
diff --git a/runners/direct-java/src/test/java/org/apache/beam/runners/direct/CopyOnAccessInMemoryStateInternalsTest.java b/runners/direct-java/src/test/java/org/apache/beam/runners/direct/CopyOnAccessInMemoryStateInternalsTest.java
index deefc68..35245f4 100644
--- a/runners/direct-java/src/test/java/org/apache/beam/runners/direct/CopyOnAccessInMemoryStateInternalsTest.java
+++ b/runners/direct-java/src/test/java/org/apache/beam/runners/direct/CopyOnAccessInMemoryStateInternalsTest.java
@@ -61,8 +61,15 @@ import org.junit.runners.JUnit4;
  */
 @RunWith(JUnit4.class)
 public class CopyOnAccessInMemoryStateInternalsTest {
+
+  @Rule public TestPipeline pipeline = TestPipeline.create();
   @Rule public ExpectedException thrown = ExpectedException.none();
   private String key = "foo";
+
+  public CopyOnAccessInMemoryStateInternalsTest() {
+    pipeline = TestPipeline.create();
+  }
+
   @Test
   public void testGetWithEmpty() {
     CopyOnAccessInMemoryStateInternals<String> internals =
@@ -167,7 +174,7 @@ public class CopyOnAccessInMemoryStateInternalsTest {
     CombineFn<Long, long[], Long> sumLongFn = new Sum.SumLongFn();
 
     StateNamespace namespace = new StateNamespaceForTest("foo");
-    CoderRegistry reg = TestPipeline.create().getCoderRegistry();
+    CoderRegistry reg = pipeline.getCoderRegistry();
     StateTag<Object, AccumulatorCombiningState<Long, long[], Long>> stateTag =
         StateTags.combiningValue("summer",
             sumLongFn.getAccumulatorCoder(reg, reg.getDefaultCoder(Long.class)), sumLongFn);
@@ -197,7 +204,7 @@ public class CopyOnAccessInMemoryStateInternalsTest {
     KeyedCombineFn<String, Long, long[], Long> sumLongFn = new Sum.SumLongFn().asKeyedFn();
 
     StateNamespace namespace = new StateNamespaceForTest("foo");
-    CoderRegistry reg = TestPipeline.create().getCoderRegistry();
+    CoderRegistry reg = pipeline.getCoderRegistry();
     StateTag<String, AccumulatorCombiningState<Long, long[], Long>> stateTag =
         StateTags.keyedCombiningValue(
             "summer",

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/09c404a6/runners/direct-java/src/test/java/org/apache/beam/runners/direct/DirectGraphVisitorTest.java
----------------------------------------------------------------------
diff --git a/runners/direct-java/src/test/java/org/apache/beam/runners/direct/DirectGraphVisitorTest.java b/runners/direct-java/src/test/java/org/apache/beam/runners/direct/DirectGraphVisitorTest.java
index b88c9a4..c3bbe2d 100644
--- a/runners/direct-java/src/test/java/org/apache/beam/runners/direct/DirectGraphVisitorTest.java
+++ b/runners/direct-java/src/test/java/org/apache/beam/runners/direct/DirectGraphVisitorTest.java
@@ -60,8 +60,9 @@ import org.junit.runners.JUnit4;
 @RunWith(JUnit4.class)
 public class DirectGraphVisitorTest implements Serializable {
   @Rule public transient ExpectedException thrown = ExpectedException.none();
+  @Rule public transient TestPipeline p = TestPipeline.create()
+                                                      .enableAbandonedNodeEnforcement(false);
 
-  private transient TestPipeline p = TestPipeline.create();
   private transient DirectGraphVisitor visitor = new DirectGraphVisitor();
 
   @Test

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/09c404a6/runners/direct-java/src/test/java/org/apache/beam/runners/direct/EvaluationContextTest.java
----------------------------------------------------------------------
diff --git a/runners/direct-java/src/test/java/org/apache/beam/runners/direct/EvaluationContextTest.java b/runners/direct-java/src/test/java/org/apache/beam/runners/direct/EvaluationContextTest.java
index f11c370..bf36204 100644
--- a/runners/direct-java/src/test/java/org/apache/beam/runners/direct/EvaluationContextTest.java
+++ b/runners/direct-java/src/test/java/org/apache/beam/runners/direct/EvaluationContextTest.java
@@ -68,6 +68,7 @@ import org.apache.beam.sdk.values.PCollectionView;
 import org.hamcrest.Matchers;
 import org.joda.time.Instant;
 import org.junit.Before;
+import org.junit.Rule;
 import org.junit.Test;
 import org.junit.runner.RunWith;
 import org.junit.runners.JUnit4;
@@ -77,7 +78,6 @@ import org.junit.runners.JUnit4;
  */
 @RunWith(JUnit4.class)
 public class EvaluationContextTest {
-  private TestPipeline p;
   private EvaluationContext context;
 
   private PCollection<Integer> created;
@@ -92,13 +92,14 @@ public class EvaluationContextTest {
   private AppliedPTransform<?, ?, ?> viewProducer;
   private AppliedPTransform<?, ?, ?> unboundedProducer;
 
+  @Rule
+  public TestPipeline p = TestPipeline.create().enableAbandonedNodeEnforcement(false);
+
   @Before
   public void setup() {
     DirectRunner runner =
         DirectRunner.fromOptions(PipelineOptionsFactory.create());
 
-    p = TestPipeline.create();
-
     created = p.apply(Create.of(1, 2, 3));
     downstream = created.apply(WithKeys.<String, Integer>of("foo"));
     view = created.apply(View.<Integer>asIterable());

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/09c404a6/runners/direct-java/src/test/java/org/apache/beam/runners/direct/FlattenEvaluatorFactoryTest.java
----------------------------------------------------------------------
diff --git a/runners/direct-java/src/test/java/org/apache/beam/runners/direct/FlattenEvaluatorFactoryTest.java b/runners/direct-java/src/test/java/org/apache/beam/runners/direct/FlattenEvaluatorFactoryTest.java
index 9e22c36..cda68f0 100644
--- a/runners/direct-java/src/test/java/org/apache/beam/runners/direct/FlattenEvaluatorFactoryTest.java
+++ b/runners/direct-java/src/test/java/org/apache/beam/runners/direct/FlattenEvaluatorFactoryTest.java
@@ -37,6 +37,7 @@ import org.apache.beam.sdk.values.PCollection;
 import org.apache.beam.sdk.values.PCollectionList;
 import org.hamcrest.Matchers;
 import org.joda.time.Instant;
+import org.junit.Rule;
 import org.junit.Test;
 import org.junit.runner.RunWith;
 import org.junit.runners.JUnit4;
@@ -48,9 +49,11 @@ import org.junit.runners.JUnit4;
 public class FlattenEvaluatorFactoryTest {
   private BundleFactory bundleFactory = ImmutableListBundleFactory.create();
 
+  @Rule
+  public TestPipeline p = TestPipeline.create().enableAbandonedNodeEnforcement(false);
+
   @Test
   public void testFlattenInMemoryEvaluator() throws Exception {
-    TestPipeline p = TestPipeline.create();
     PCollection<Integer> left = p.apply("left", Create.of(1, 2, 4));
     PCollection<Integer> right = p.apply("right", Create.of(-1, 2, -4));
     PCollectionList<Integer> list = PCollectionList.of(left).and(right);
@@ -118,7 +121,6 @@ public class FlattenEvaluatorFactoryTest {
 
   @Test
   public void testFlattenInMemoryEvaluatorWithEmptyPCollectionList() throws Exception {
-    TestPipeline p = TestPipeline.create();
     PCollectionList<Integer> list = PCollectionList.empty(p);
 
     PCollection<Integer> flattened = list.apply(Flatten.<Integer>pCollections());

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/09c404a6/runners/direct-java/src/test/java/org/apache/beam/runners/direct/GroupByKeyEvaluatorFactoryTest.java
----------------------------------------------------------------------
diff --git a/runners/direct-java/src/test/java/org/apache/beam/runners/direct/GroupByKeyEvaluatorFactoryTest.java b/runners/direct-java/src/test/java/org/apache/beam/runners/direct/GroupByKeyEvaluatorFactoryTest.java
index f0b29f0..fefafd0 100644
--- a/runners/direct-java/src/test/java/org/apache/beam/runners/direct/GroupByKeyEvaluatorFactoryTest.java
+++ b/runners/direct-java/src/test/java/org/apache/beam/runners/direct/GroupByKeyEvaluatorFactoryTest.java
@@ -41,6 +41,7 @@ import org.apache.beam.sdk.values.PCollection;
 import org.hamcrest.BaseMatcher;
 import org.hamcrest.Description;
 import org.joda.time.Instant;
+import org.junit.Rule;
 import org.junit.Test;
 import org.junit.runner.RunWith;
 import org.junit.runners.JUnit4;
@@ -52,9 +53,11 @@ import org.junit.runners.JUnit4;
 public class GroupByKeyEvaluatorFactoryTest {
   private BundleFactory bundleFactory = ImmutableListBundleFactory.create();
 
+  @Rule
+  public TestPipeline p = TestPipeline.create().enableAbandonedNodeEnforcement(false);
+
   @Test
   public void testInMemoryEvaluator() throws Exception {
-    TestPipeline p = TestPipeline.create();
     KV<String, Integer> firstFoo = KV.of("foo", -1);
     KV<String, Integer> secondFoo = KV.of("foo", 1);
     KV<String, Integer> thirdFoo = KV.of("foo", 3);

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/09c404a6/runners/direct-java/src/test/java/org/apache/beam/runners/direct/GroupByKeyOnlyEvaluatorFactoryTest.java
----------------------------------------------------------------------
diff --git a/runners/direct-java/src/test/java/org/apache/beam/runners/direct/GroupByKeyOnlyEvaluatorFactoryTest.java b/runners/direct-java/src/test/java/org/apache/beam/runners/direct/GroupByKeyOnlyEvaluatorFactoryTest.java
index 7efdb3d..94514ad 100644
--- a/runners/direct-java/src/test/java/org/apache/beam/runners/direct/GroupByKeyOnlyEvaluatorFactoryTest.java
+++ b/runners/direct-java/src/test/java/org/apache/beam/runners/direct/GroupByKeyOnlyEvaluatorFactoryTest.java
@@ -41,6 +41,7 @@ import org.apache.beam.sdk.values.PCollection;
 import org.hamcrest.BaseMatcher;
 import org.hamcrest.Description;
 import org.joda.time.Instant;
+import org.junit.Rule;
 import org.junit.Test;
 import org.junit.runner.RunWith;
 import org.junit.runners.JUnit4;
@@ -52,9 +53,11 @@ import org.junit.runners.JUnit4;
 public class GroupByKeyOnlyEvaluatorFactoryTest {
   private BundleFactory bundleFactory = ImmutableListBundleFactory.create();
 
+  @Rule
+  public TestPipeline p = TestPipeline.create().enableAbandonedNodeEnforcement(false);
+
   @Test
   public void testInMemoryEvaluator() throws Exception {
-    TestPipeline p = TestPipeline.create();
     KV<String, Integer> firstFoo = KV.of("foo", -1);
     KV<String, Integer> secondFoo = KV.of("foo", 1);
     KV<String, Integer> thirdFoo = KV.of("foo", 3);

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/09c404a6/runners/direct-java/src/test/java/org/apache/beam/runners/direct/ImmutabilityCheckingBundleFactoryTest.java
----------------------------------------------------------------------
diff --git a/runners/direct-java/src/test/java/org/apache/beam/runners/direct/ImmutabilityCheckingBundleFactoryTest.java b/runners/direct-java/src/test/java/org/apache/beam/runners/direct/ImmutabilityCheckingBundleFactoryTest.java
index 6ab8aea..2448078 100644
--- a/runners/direct-java/src/test/java/org/apache/beam/runners/direct/ImmutabilityCheckingBundleFactoryTest.java
+++ b/runners/direct-java/src/test/java/org/apache/beam/runners/direct/ImmutabilityCheckingBundleFactoryTest.java
@@ -46,14 +46,16 @@ import org.junit.runners.JUnit4;
  */
 @RunWith(JUnit4.class)
 public class ImmutabilityCheckingBundleFactoryTest {
+
+  @Rule public TestPipeline p = TestPipeline.create().enableAbandonedNodeEnforcement(false);
   @Rule public ExpectedException thrown = ExpectedException.none();
   private ImmutabilityCheckingBundleFactory factory;
   private PCollection<byte[]> created;
   private PCollection<byte[]> transformed;
 
+
   @Before
   public void setup() {
-    TestPipeline p = TestPipeline.create();
     created = p.apply(Create.<byte[]>of().withCoder(ByteArrayCoder.of()));
     transformed = created.apply(ParDo.of(new IdentityDoFn<byte[]>()));
     DirectGraphVisitor visitor = new DirectGraphVisitor();

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/09c404a6/runners/direct-java/src/test/java/org/apache/beam/runners/direct/ImmutabilityEnforcementFactoryTest.java
----------------------------------------------------------------------
diff --git a/runners/direct-java/src/test/java/org/apache/beam/runners/direct/ImmutabilityEnforcementFactoryTest.java b/runners/direct-java/src/test/java/org/apache/beam/runners/direct/ImmutabilityEnforcementFactoryTest.java
index 1ad6ba6..cd3e9b4 100644
--- a/runners/direct-java/src/test/java/org/apache/beam/runners/direct/ImmutabilityEnforcementFactoryTest.java
+++ b/runners/direct-java/src/test/java/org/apache/beam/runners/direct/ImmutabilityEnforcementFactoryTest.java
@@ -42,6 +42,8 @@ import org.junit.runners.JUnit4;
  */
 @RunWith(JUnit4.class)
 public class ImmutabilityEnforcementFactoryTest implements Serializable {
+  @Rule public transient TestPipeline p =
+      TestPipeline.create().enableAbandonedNodeEnforcement(false);
   @Rule public transient ExpectedException thrown = ExpectedException.none();
   private transient ImmutabilityEnforcementFactory factory;
   private transient BundleFactory bundleFactory;
@@ -52,7 +54,6 @@ public class ImmutabilityEnforcementFactoryTest implements Serializable {
   public void setup() {
     factory = new ImmutabilityEnforcementFactory();
     bundleFactory = ImmutableListBundleFactory.create();
-    TestPipeline p = TestPipeline.create();
     pcollection =
         p.apply(Create.of("foo".getBytes(), "spamhameggs".getBytes()))
             .apply(

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/09c404a6/runners/direct-java/src/test/java/org/apache/beam/runners/direct/ImmutableListBundleFactoryTest.java
----------------------------------------------------------------------
diff --git a/runners/direct-java/src/test/java/org/apache/beam/runners/direct/ImmutableListBundleFactoryTest.java b/runners/direct-java/src/test/java/org/apache/beam/runners/direct/ImmutableListBundleFactoryTest.java
index a36c408..46f02cd 100644
--- a/runners/direct-java/src/test/java/org/apache/beam/runners/direct/ImmutableListBundleFactoryTest.java
+++ b/runners/direct-java/src/test/java/org/apache/beam/runners/direct/ImmutableListBundleFactoryTest.java
@@ -57,6 +57,7 @@ import org.junit.runners.JUnit4;
  */
 @RunWith(JUnit4.class)
 public class ImmutableListBundleFactoryTest {
+  @Rule public TestPipeline p = TestPipeline.create().enableAbandonedNodeEnforcement(false);
   @Rule public ExpectedException thrown = ExpectedException.none();
 
   private ImmutableListBundleFactory bundleFactory = ImmutableListBundleFactory.create();
@@ -66,13 +67,12 @@ public class ImmutableListBundleFactoryTest {
 
   @Before
   public void setup() {
-    TestPipeline p = TestPipeline.create();
     created = p.apply(Create.of(1, 2, 3));
     downstream = created.apply(WithKeys.<String, Integer>of("foo"));
   }
 
   private <T> void createKeyedBundle(Coder<T> coder, T key) throws Exception {
-    PCollection<Integer> pcollection = TestPipeline.create().apply(Create.of(1));
+    PCollection<Integer> pcollection = p.apply("Create", Create.of(1));
     StructuralKey<?> skey = StructuralKey.of(key, coder);
 
     UncommittedBundle<Integer> inFlightBundle = bundleFactory.createKeyedBundle(skey, pcollection);
@@ -87,9 +87,17 @@ public class ImmutableListBundleFactoryTest {
   }
 
   @Test
-  public void keyedWithKeyShouldCreateKeyedBundle() throws Exception {
+  public void keyedWithStringKeyShouldCreateKeyedBundle() throws Exception {
     createKeyedBundle(StringUtf8Coder.of(), "foo");
+  }
+
+  @Test
+  public void keyedWithVarIntKeyShouldCreateKeyedBundle() throws Exception {
     createKeyedBundle(VarIntCoder.of(), 1234);
+  }
+
+  @Test
+  public void keyedWithByteArrayKeyShouldCreateKeyedBundle() throws Exception {
     createKeyedBundle(ByteArrayCoder.of(), new byte[] {0, 2, 4, 99});
   }
 

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/09c404a6/runners/direct-java/src/test/java/org/apache/beam/runners/direct/KeyedPValueTrackingVisitorTest.java
----------------------------------------------------------------------
diff --git a/runners/direct-java/src/test/java/org/apache/beam/runners/direct/KeyedPValueTrackingVisitorTest.java b/runners/direct-java/src/test/java/org/apache/beam/runners/direct/KeyedPValueTrackingVisitorTest.java
index 0852cd3..eef3375 100644
--- a/runners/direct-java/src/test/java/org/apache/beam/runners/direct/KeyedPValueTrackingVisitorTest.java
+++ b/runners/direct-java/src/test/java/org/apache/beam/runners/direct/KeyedPValueTrackingVisitorTest.java
@@ -24,7 +24,6 @@ import static org.junit.Assert.assertThat;
 import com.google.common.collect.ImmutableSet;
 import java.util.Collections;
 import java.util.Set;
-import org.apache.beam.sdk.Pipeline;
 import org.apache.beam.sdk.coders.IterableCoder;
 import org.apache.beam.sdk.coders.KvCoder;
 import org.apache.beam.sdk.coders.VarIntCoder;
@@ -53,11 +52,12 @@ public class KeyedPValueTrackingVisitorTest {
   @Rule public ExpectedException thrown = ExpectedException.none();
 
   private KeyedPValueTrackingVisitor visitor;
-  private Pipeline p;
+  @Rule
+  public TestPipeline p = TestPipeline.create().enableAbandonedNodeEnforcement(false);
 
   @Before
   public void setup() {
-    p = TestPipeline.create();
+
     @SuppressWarnings("rawtypes")
     Set<Class<? extends PTransform>> producesKeyed =
         ImmutableSet.<Class<? extends PTransform>>of(PrimitiveKeyer.class, CompositeKeyer.class);

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/09c404a6/runners/direct-java/src/test/java/org/apache/beam/runners/direct/ParDoEvaluatorTest.java
----------------------------------------------------------------------
diff --git a/runners/direct-java/src/test/java/org/apache/beam/runners/direct/ParDoEvaluatorTest.java b/runners/direct-java/src/test/java/org/apache/beam/runners/direct/ParDoEvaluatorTest.java
index d48ac14..1a3207b 100644
--- a/runners/direct-java/src/test/java/org/apache/beam/runners/direct/ParDoEvaluatorTest.java
+++ b/runners/direct-java/src/test/java/org/apache/beam/runners/direct/ParDoEvaluatorTest.java
@@ -54,6 +54,7 @@ import org.apache.beam.sdk.values.TupleTagList;
 import org.hamcrest.Matchers;
 import org.joda.time.Instant;
 import org.junit.Before;
+import org.junit.Rule;
 import org.junit.Test;
 import org.junit.runner.RunWith;
 import org.junit.runners.JUnit4;
@@ -72,10 +73,12 @@ public class ParDoEvaluatorTest {
   private List<TupleTag<?>> sideOutputTags;
   private BundleFactory bundleFactory;
 
+  @Rule
+  public TestPipeline p = TestPipeline.create().enableAbandonedNodeEnforcement(false);
+
   @Before
   public void setup() {
     MockitoAnnotations.initMocks(this);
-    TestPipeline p = TestPipeline.create();
     inputPc = p.apply(Create.of(1, 2, 3));
     mainOutputTag = new TupleTag<Integer>() {};
     sideOutputTags = TupleTagList.empty().getAll();

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/09c404a6/runners/direct-java/src/test/java/org/apache/beam/runners/direct/SideInputContainerTest.java
----------------------------------------------------------------------
diff --git a/runners/direct-java/src/test/java/org/apache/beam/runners/direct/SideInputContainerTest.java b/runners/direct-java/src/test/java/org/apache/beam/runners/direct/SideInputContainerTest.java
index cc7d88a..183decd 100644
--- a/runners/direct-java/src/test/java/org/apache/beam/runners/direct/SideInputContainerTest.java
+++ b/runners/direct-java/src/test/java/org/apache/beam/runners/direct/SideInputContainerTest.java
@@ -96,12 +96,14 @@ public class SideInputContainerTest {
       };
 
   @Rule
+  public TestPipeline pipeline = TestPipeline.create().enableAbandonedNodeEnforcement(false);
+
+  @Rule
   public ExpectedException thrown = ExpectedException.none();
 
   @Mock
   private EvaluationContext context;
 
-  private TestPipeline pipeline;
 
   private SideInputContainer container;
 
@@ -114,7 +116,6 @@ public class SideInputContainerTest {
   @Before
   public void setup() {
     MockitoAnnotations.initMocks(this);
-    pipeline = TestPipeline.create();
 
     PCollection<Integer> create =
         pipeline.apply("forBaseCollection", Create.<Integer>of(1, 2, 3, 4));

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/09c404a6/runners/direct-java/src/test/java/org/apache/beam/runners/direct/StatefulParDoEvaluatorFactoryTest.java
----------------------------------------------------------------------
diff --git a/runners/direct-java/src/test/java/org/apache/beam/runners/direct/StatefulParDoEvaluatorFactoryTest.java b/runners/direct-java/src/test/java/org/apache/beam/runners/direct/StatefulParDoEvaluatorFactoryTest.java
index 326310b..d312aa3 100644
--- a/runners/direct-java/src/test/java/org/apache/beam/runners/direct/StatefulParDoEvaluatorFactoryTest.java
+++ b/runners/direct-java/src/test/java/org/apache/beam/runners/direct/StatefulParDoEvaluatorFactoryTest.java
@@ -67,6 +67,7 @@ import org.apache.beam.sdk.values.PCollectionView;
 import org.joda.time.Duration;
 import org.joda.time.Instant;
 import org.junit.Before;
+import org.junit.Rule;
 import org.junit.Test;
 import org.junit.runner.RunWith;
 import org.junit.runners.JUnit4;
@@ -91,6 +92,10 @@ public class StatefulParDoEvaluatorFactoryTest implements Serializable {
 
   private static final BundleFactory BUNDLE_FACTORY = ImmutableListBundleFactory.create();
 
+  @Rule
+  public transient TestPipeline pipeline =
+      TestPipeline.create().enableAbandonedNodeEnforcement(false);
+
   @Before
   public void setup() {
     MockitoAnnotations.initMocks(this);
@@ -106,7 +111,6 @@ public class StatefulParDoEvaluatorFactoryTest implements Serializable {
   public void windowCleanupScheduled() throws Exception {
     // To test the factory, first we set up a pipeline and then we use the constructed
     // pipeline to create the right parameters to pass to the factory
-    TestPipeline pipeline = TestPipeline.create();
 
     final String stateId = "my-state-id";
 
@@ -208,7 +212,6 @@ public class StatefulParDoEvaluatorFactoryTest implements Serializable {
   public void testUnprocessedElements() throws Exception {
     // To test the factory, first we set up a pipeline and then we use the constructed
     // pipeline to create the right parameters to pass to the factory
-    TestPipeline pipeline = TestPipeline.create();
 
     final String stateId = "my-state-id";
 

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/09c404a6/runners/direct-java/src/test/java/org/apache/beam/runners/direct/StepTransformResultTest.java
----------------------------------------------------------------------
diff --git a/runners/direct-java/src/test/java/org/apache/beam/runners/direct/StepTransformResultTest.java b/runners/direct-java/src/test/java/org/apache/beam/runners/direct/StepTransformResultTest.java
index d3a2cca..0d94b7a 100644
--- a/runners/direct-java/src/test/java/org/apache/beam/runners/direct/StepTransformResultTest.java
+++ b/runners/direct-java/src/test/java/org/apache/beam/runners/direct/StepTransformResultTest.java
@@ -31,6 +31,7 @@ import org.apache.beam.sdk.transforms.Create;
 import org.apache.beam.sdk.values.PCollection;
 import org.hamcrest.Matchers;
 import org.junit.Before;
+import org.junit.Rule;
 import org.junit.Test;
 import org.junit.runner.RunWith;
 import org.junit.runners.JUnit4;
@@ -44,9 +45,11 @@ public class StepTransformResultTest {
   private BundleFactory bundleFactory;
   private PCollection<Integer> pc;
 
+  @Rule
+  public TestPipeline p = TestPipeline.create().enableAbandonedNodeEnforcement(false);
+
   @Before
   public void setup() {
-    TestPipeline p = TestPipeline.create();
     pc = p.apply(Create.of(1, 2, 3));
     transform = DirectGraphs.getGraph(p).getProducer(pc);
 

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/09c404a6/runners/direct-java/src/test/java/org/apache/beam/runners/direct/TestStreamEvaluatorFactoryTest.java
----------------------------------------------------------------------
diff --git a/runners/direct-java/src/test/java/org/apache/beam/runners/direct/TestStreamEvaluatorFactoryTest.java b/runners/direct-java/src/test/java/org/apache/beam/runners/direct/TestStreamEvaluatorFactoryTest.java
index 6bb8623..c5b3b3d 100644
--- a/runners/direct-java/src/test/java/org/apache/beam/runners/direct/TestStreamEvaluatorFactoryTest.java
+++ b/runners/direct-java/src/test/java/org/apache/beam/runners/direct/TestStreamEvaluatorFactoryTest.java
@@ -41,6 +41,7 @@ import org.hamcrest.Matchers;
 import org.joda.time.Duration;
 import org.joda.time.Instant;
 import org.junit.Before;
+import org.junit.Rule;
 import org.junit.Test;
 import org.junit.runner.RunWith;
 import org.junit.runners.JUnit4;
@@ -52,6 +53,9 @@ public class TestStreamEvaluatorFactoryTest {
   private BundleFactory bundleFactory;
   private EvaluationContext context;
 
+  @Rule
+  public TestPipeline p = TestPipeline.create().enableAbandonedNodeEnforcement(false);
+
   @Before
   public void setup() {
     context = mock(EvaluationContext.class);
@@ -62,7 +66,6 @@ public class TestStreamEvaluatorFactoryTest {
   /** Demonstrates that returned evaluators produce elements in sequence. */
   @Test
   public void producesElementsInSequence() throws Exception {
-    TestPipeline p = TestPipeline.create();
     PCollection<Integer> streamVals =
         p.apply(
             TestStream.create(VarIntCoder.of())

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/09c404a6/runners/direct-java/src/test/java/org/apache/beam/runners/direct/TransformExecutorTest.java
----------------------------------------------------------------------
diff --git a/runners/direct-java/src/test/java/org/apache/beam/runners/direct/TransformExecutorTest.java b/runners/direct-java/src/test/java/org/apache/beam/runners/direct/TransformExecutorTest.java
index 4ad22bc..e66ffcf 100644
--- a/runners/direct-java/src/test/java/org/apache/beam/runners/direct/TransformExecutorTest.java
+++ b/runners/direct-java/src/test/java/org/apache/beam/runners/direct/TransformExecutorTest.java
@@ -73,6 +73,9 @@ public class TransformExecutorTest {
   @Mock private EvaluationContext evaluationContext;
   @Mock private TransformEvaluatorRegistry registry;
 
+  @Rule
+  public TestPipeline p = TestPipeline.create().enableAbandonedNodeEnforcement(false);
+
   @Before
   public void setup() {
     MockitoAnnotations.initMocks(this);
@@ -85,7 +88,6 @@ public class TransformExecutorTest {
     evaluatorCompleted = new CountDownLatch(1);
     completionCallback = new RegisteringCompletionCallback(evaluatorCompleted);
 
-    TestPipeline p = TestPipeline.create();
     created = p.apply(Create.of("foo", "spam", "third"));
     PCollection<KV<Integer, String>> downstream = created.apply(WithKeys.<Integer, String>of(3));
 

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/09c404a6/runners/direct-java/src/test/java/org/apache/beam/runners/direct/UnboundedReadEvaluatorFactoryTest.java
----------------------------------------------------------------------
diff --git a/runners/direct-java/src/test/java/org/apache/beam/runners/direct/UnboundedReadEvaluatorFactoryTest.java b/runners/direct-java/src/test/java/org/apache/beam/runners/direct/UnboundedReadEvaluatorFactoryTest.java
index dd36a2f..92d668e 100644
--- a/runners/direct-java/src/test/java/org/apache/beam/runners/direct/UnboundedReadEvaluatorFactoryTest.java
+++ b/runners/direct-java/src/test/java/org/apache/beam/runners/direct/UnboundedReadEvaluatorFactoryTest.java
@@ -72,6 +72,7 @@ import org.joda.time.DateTime;
 import org.joda.time.Instant;
 import org.joda.time.ReadableInstant;
 import org.junit.Before;
+import org.junit.Rule;
 import org.junit.Test;
 import org.junit.runner.RunWith;
 import org.junit.runners.JUnit4;
@@ -93,10 +94,12 @@ public class UnboundedReadEvaluatorFactoryTest {
   private UnboundedSource<Long, ?> source;
   private DirectGraph graph;
 
+  @Rule
+  public TestPipeline p = TestPipeline.create().enableAbandonedNodeEnforcement(false);
+
   @Before
   public void setup() {
     source = CountingSource.unboundedWithTimestampFn(new LongToInstantFn());
-    TestPipeline p = TestPipeline.create();
     longs = p.apply(Read.from(source));
 
     context = mock(EvaluationContext.class);
@@ -190,7 +193,6 @@ public class UnboundedReadEvaluatorFactoryTest {
         new TestUnboundedSource<>(BigEndianLongCoder.of(), outputs);
     source.dedupes = true;
 
-    TestPipeline p = TestPipeline.create();
     PCollection<Long> pcollection = p.apply(Read.from(source));
     AppliedPTransform<?, ?, ?> sourceTransform = getProducer(pcollection);
 
@@ -231,7 +233,6 @@ public class UnboundedReadEvaluatorFactoryTest {
 
   @Test
   public void noElementsAvailableReaderIncludedInResidual() throws Exception {
-    TestPipeline p = TestPipeline.create();
     // Read with a very slow rate so by the second read there are no more elements
     PCollection<Long> pcollection =
         p.apply(Read.from(new TestUnboundedSource<>(VarLongCoder.of(), 1L)));
@@ -291,7 +292,6 @@ public class UnboundedReadEvaluatorFactoryTest {
     TestUnboundedSource<Long> source =
         new TestUnboundedSource<>(BigEndianLongCoder.of(), elems.toArray(new Long[0]));
 
-    TestPipeline p = TestPipeline.create();
     PCollection<Long> pcollection = p.apply(Read.from(source));
     DirectGraph graph = DirectGraphs.getGraph(p);
     AppliedPTransform<?, ?, ?> sourceTransform =
@@ -337,7 +337,6 @@ public class UnboundedReadEvaluatorFactoryTest {
     TestUnboundedSource<Long> source =
         new TestUnboundedSource<>(BigEndianLongCoder.of(), elems.toArray(new Long[0]));
 
-    TestPipeline p = TestPipeline.create();
     PCollection<Long> pcollection = p.apply(Read.from(source));
     AppliedPTransform<?, ?, ?> sourceTransform =
         DirectGraphs.getGraph(p).getProducer(pcollection);

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/09c404a6/runners/direct-java/src/test/java/org/apache/beam/runners/direct/ViewEvaluatorFactoryTest.java
----------------------------------------------------------------------
diff --git a/runners/direct-java/src/test/java/org/apache/beam/runners/direct/ViewEvaluatorFactoryTest.java b/runners/direct-java/src/test/java/org/apache/beam/runners/direct/ViewEvaluatorFactoryTest.java
index 7c08009..6baf55a 100644
--- a/runners/direct-java/src/test/java/org/apache/beam/runners/direct/ViewEvaluatorFactoryTest.java
+++ b/runners/direct-java/src/test/java/org/apache/beam/runners/direct/ViewEvaluatorFactoryTest.java
@@ -41,6 +41,7 @@ import org.apache.beam.sdk.util.WindowedValue;
 import org.apache.beam.sdk.values.PCollection;
 import org.apache.beam.sdk.values.PCollectionView;
 import org.joda.time.Instant;
+import org.junit.Rule;
 import org.junit.Test;
 import org.junit.runner.RunWith;
 import org.junit.runners.JUnit4;
@@ -52,9 +53,11 @@ import org.junit.runners.JUnit4;
 public class ViewEvaluatorFactoryTest {
   private BundleFactory bundleFactory = ImmutableListBundleFactory.create();
 
+  @Rule
+  public TestPipeline p = TestPipeline.create().enableAbandonedNodeEnforcement(false);
+
   @Test
   public void testInMemoryEvaluator() throws Exception {
-    TestPipeline p = TestPipeline.create();
 
     PCollection<String> input = p.apply(Create.of("foo", "bar"));
     CreatePCollectionView<String, Iterable<String>> createView =

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/09c404a6/runners/direct-java/src/test/java/org/apache/beam/runners/direct/WatermarkCallbackExecutorTest.java
----------------------------------------------------------------------
diff --git a/runners/direct-java/src/test/java/org/apache/beam/runners/direct/WatermarkCallbackExecutorTest.java b/runners/direct-java/src/test/java/org/apache/beam/runners/direct/WatermarkCallbackExecutorTest.java
index acdabb6..8d6e73f 100644
--- a/runners/direct-java/src/test/java/org/apache/beam/runners/direct/WatermarkCallbackExecutorTest.java
+++ b/runners/direct-java/src/test/java/org/apache/beam/runners/direct/WatermarkCallbackExecutorTest.java
@@ -37,6 +37,7 @@ import org.apache.beam.sdk.values.PCollection;
 import org.joda.time.Duration;
 import org.joda.time.Instant;
 import org.junit.Before;
+import org.junit.Rule;
 import org.junit.Test;
 import org.junit.runner.RunWith;
 import org.junit.runners.JUnit4;
@@ -51,9 +52,11 @@ public class WatermarkCallbackExecutorTest {
   private AppliedPTransform<?, ?, ?> create;
   private AppliedPTransform<?, ?, ?> sum;
 
+  @Rule
+  public TestPipeline p = TestPipeline.create().enableAbandonedNodeEnforcement(false);
+
   @Before
   public void setup() {
-    TestPipeline p = TestPipeline.create();
     PCollection<Integer> created = p.apply(Create.of(1, 2, 3));
     PCollection<Integer> summed = created.apply(Sum.integersGlobally());
     DirectGraph graph = DirectGraphs.getGraph(p);

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/09c404a6/runners/direct-java/src/test/java/org/apache/beam/runners/direct/WatermarkManagerTest.java
----------------------------------------------------------------------
diff --git a/runners/direct-java/src/test/java/org/apache/beam/runners/direct/WatermarkManagerTest.java b/runners/direct-java/src/test/java/org/apache/beam/runners/direct/WatermarkManagerTest.java
index eb4d0cd..abc8a28 100644
--- a/runners/direct-java/src/test/java/org/apache/beam/runners/direct/WatermarkManagerTest.java
+++ b/runners/direct-java/src/test/java/org/apache/beam/runners/direct/WatermarkManagerTest.java
@@ -70,6 +70,7 @@ import org.hamcrest.Matchers;
 import org.joda.time.Instant;
 import org.joda.time.ReadableInstant;
 import org.junit.Before;
+import org.junit.Rule;
 import org.junit.Test;
 import org.junit.runner.RunWith;
 import org.junit.runners.JUnit4;
@@ -94,9 +95,11 @@ public class WatermarkManagerTest implements Serializable {
   private transient BundleFactory bundleFactory;
   private DirectGraph graph;
 
+  @Rule
+  public transient TestPipeline p = TestPipeline.create().enableAbandonedNodeEnforcement(false);
+
   @Before
   public void setup() {
-    TestPipeline p = TestPipeline.create();
 
     createdInts = p.apply("createdInts", Create.of(1, 2, 3));
 
@@ -278,7 +281,6 @@ public class WatermarkManagerTest implements Serializable {
    */
   @Test
   public void getWatermarkMultiIdenticalInput() {
-    TestPipeline p = TestPipeline.create();
     PCollection<Integer> created = p.apply(Create.of(1, 2, 3));
     PCollection<Integer> multiConsumer =
         PCollectionList.of(created).and(created).apply(Flatten.<Integer>pCollections());

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/09c404a6/runners/direct-java/src/test/java/org/apache/beam/runners/direct/WindowEvaluatorFactoryTest.java
----------------------------------------------------------------------
diff --git a/runners/direct-java/src/test/java/org/apache/beam/runners/direct/WindowEvaluatorFactoryTest.java b/runners/direct-java/src/test/java/org/apache/beam/runners/direct/WindowEvaluatorFactoryTest.java
index 66c28ce..9d0c68d 100644
--- a/runners/direct-java/src/test/java/org/apache/beam/runners/direct/WindowEvaluatorFactoryTest.java
+++ b/runners/direct-java/src/test/java/org/apache/beam/runners/direct/WindowEvaluatorFactoryTest.java
@@ -52,6 +52,7 @@ import org.hamcrest.Matchers;
 import org.joda.time.Duration;
 import org.joda.time.Instant;
 import org.junit.Before;
+import org.junit.Rule;
 import org.junit.Test;
 import org.junit.runner.RunWith;
 import org.junit.runners.JUnit4;
@@ -96,10 +97,12 @@ public class WindowEvaluatorFactoryTest {
           ImmutableList.of(GlobalWindow.INSTANCE, intervalWindow1, intervalWindow2),
           PaneInfo.NO_FIRING);
 
+  @Rule
+  public TestPipeline p = TestPipeline.create().enableAbandonedNodeEnforcement(false);
+
   @Before
   public void setup() {
     MockitoAnnotations.initMocks(this);
-    TestPipeline p = TestPipeline.create();
     input = p.apply(Create.of(1L, 2L, 3L));
 
     bundleFactory = ImmutableListBundleFactory.create();

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/09c404a6/runners/direct-java/src/test/java/org/apache/beam/runners/direct/WriteWithShardingFactoryTest.java
----------------------------------------------------------------------
diff --git a/runners/direct-java/src/test/java/org/apache/beam/runners/direct/WriteWithShardingFactoryTest.java b/runners/direct-java/src/test/java/org/apache/beam/runners/direct/WriteWithShardingFactoryTest.java
index aeb75ed..a8c4c02 100644
--- a/runners/direct-java/src/test/java/org/apache/beam/runners/direct/WriteWithShardingFactoryTest.java
+++ b/runners/direct-java/src/test/java/org/apache/beam/runners/direct/WriteWithShardingFactoryTest.java
@@ -69,6 +69,7 @@ public class WriteWithShardingFactoryTest {
   public static final int INPUT_SIZE = 10000;
   @Rule public TemporaryFolder tmp = new TemporaryFolder();
   private WriteWithShardingFactory<Object> factory = new WriteWithShardingFactory<>();
+  @Rule public TestPipeline p = TestPipeline.create().enableAbandonedNodeEnforcement(false);
 
   @Test
   public void dynamicallyReshardedWrite() throws Exception {
@@ -81,7 +82,6 @@ public class WriteWithShardingFactoryTest {
     String fileName = "resharded_write";
     String outputPath = tmp.getRoot().getAbsolutePath();
     String targetLocation = IOChannelUtils.resolve(outputPath, fileName);
-    TestPipeline p = TestPipeline.create();
     // TextIO is implemented in terms of the Write PTransform. When sharding is not specified,
     // resharding should be automatically applied
     p.apply(Create.of(strs)).apply(TextIO.Write.to(targetLocation));
@@ -134,7 +134,7 @@ public class WriteWithShardingFactoryTest {
   public void keyBasedOnCountFnWithOneElement() throws Exception {
     PCollectionView<Long> elementCountView =
         PCollectionViews.singletonView(
-            TestPipeline.create(), WindowingStrategy.globalDefault(), true, 0L, VarLongCoder.of());
+            p, WindowingStrategy.globalDefault(), true, 0L, VarLongCoder.of());
     KeyBasedOnCountFn<String> fn = new KeyBasedOnCountFn<>(elementCountView, 0);
     DoFnTester<String, KV<Integer, String>> fnTester = DoFnTester.of(fn);
 
@@ -149,7 +149,7 @@ public class WriteWithShardingFactoryTest {
   public void keyBasedOnCountFnWithTwoElements() throws Exception {
     PCollectionView<Long> elementCountView =
         PCollectionViews.singletonView(
-            TestPipeline.create(), WindowingStrategy.globalDefault(), true, 0L, VarLongCoder.of());
+            p, WindowingStrategy.globalDefault(), true, 0L, VarLongCoder.of());
     KeyBasedOnCountFn<String> fn = new KeyBasedOnCountFn<>(elementCountView, 0);
     DoFnTester<String, KV<Integer, String>> fnTester = DoFnTester.of(fn);
 
@@ -167,7 +167,7 @@ public class WriteWithShardingFactoryTest {
   public void keyBasedOnCountFnFewElementsThreeShards() throws Exception {
     PCollectionView<Long> elementCountView =
         PCollectionViews.singletonView(
-            TestPipeline.create(), WindowingStrategy.globalDefault(), true, 0L, VarLongCoder.of());
+            p, WindowingStrategy.globalDefault(), true, 0L, VarLongCoder.of());
     KeyBasedOnCountFn<String> fn = new KeyBasedOnCountFn<>(elementCountView, 0);
     DoFnTester<String, KV<Integer, String>> fnTester = DoFnTester.of(fn);
 
@@ -191,7 +191,7 @@ public class WriteWithShardingFactoryTest {
   public void keyBasedOnCountFnManyElements() throws Exception {
     PCollectionView<Long> elementCountView =
         PCollectionViews.singletonView(
-            TestPipeline.create(), WindowingStrategy.globalDefault(), true, 0L, VarLongCoder.of());
+            p, WindowingStrategy.globalDefault(), true, 0L, VarLongCoder.of());
     KeyBasedOnCountFn<String> fn = new KeyBasedOnCountFn<>(elementCountView, 0);
     DoFnTester<String, KV<Integer, String>> fnTester = DoFnTester.of(fn);
 
@@ -214,7 +214,7 @@ public class WriteWithShardingFactoryTest {
   public void keyBasedOnCountFnFewElementsExtraShards() throws Exception {
     PCollectionView<Long> elementCountView =
         PCollectionViews.singletonView(
-            TestPipeline.create(), WindowingStrategy.globalDefault(), true, 0L, VarLongCoder.of());
+            p, WindowingStrategy.globalDefault(), true, 0L, VarLongCoder.of());
     KeyBasedOnCountFn<String> fn = new KeyBasedOnCountFn<>(elementCountView, 10);
     DoFnTester<String, KV<Integer, String>> fnTester = DoFnTester.of(fn);
 
@@ -238,7 +238,7 @@ public class WriteWithShardingFactoryTest {
   public void keyBasedOnCountFnManyElementsExtraShards() throws Exception {
     PCollectionView<Long> elementCountView =
         PCollectionViews.singletonView(
-            TestPipeline.create(), WindowingStrategy.globalDefault(), true, 0L, VarLongCoder.of());
+            p, WindowingStrategy.globalDefault(), true, 0L, VarLongCoder.of());
     KeyBasedOnCountFn<String> fn = new KeyBasedOnCountFn<>(elementCountView, 3);
     DoFnTester<String, KV<Integer, String>> fnTester = DoFnTester.of(fn);