You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@beam.apache.org by ke...@apache.org on 2016/12/20 19:04:57 UTC
[07/17] incubator-beam git commit: Migrated the
beam-runners-direct-java module to TestPipeline as a JUnit rule.
Migrated the beam-runners-direct-java module to TestPipeline as a JUnit rule.
Project: http://git-wip-us.apache.org/repos/asf/incubator-beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-beam/commit/09c404a6
Tree: http://git-wip-us.apache.org/repos/asf/incubator-beam/tree/09c404a6
Diff: http://git-wip-us.apache.org/repos/asf/incubator-beam/diff/09c404a6
Branch: refs/heads/master
Commit: 09c404a6c407898fcbc2fd22797cba4da8839b93
Parents: b671025
Author: Stas Levin <st...@gmail.com>
Authored: Mon Dec 19 10:20:16 2016 +0200
Committer: Kenneth Knowles <kl...@google.com>
Committed: Tue Dec 20 09:55:45 2016 -0800
----------------------------------------------------------------------
.../direct/BoundedReadEvaluatorFactoryTest.java | 13 ++++++-------
.../beam/runners/direct/CloningBundleFactoryTest.java | 8 ++------
.../beam/runners/direct/CommittedResultTest.java | 6 +++++-
.../CopyOnAccessInMemoryStateInternalsTest.java | 11 +++++++++--
.../beam/runners/direct/DirectGraphVisitorTest.java | 3 ++-
.../beam/runners/direct/EvaluationContextTest.java | 7 ++++---
.../runners/direct/FlattenEvaluatorFactoryTest.java | 6 ++++--
.../direct/GroupByKeyEvaluatorFactoryTest.java | 5 ++++-
.../direct/GroupByKeyOnlyEvaluatorFactoryTest.java | 5 ++++-
.../direct/ImmutabilityCheckingBundleFactoryTest.java | 4 +++-
.../direct/ImmutabilityEnforcementFactoryTest.java | 3 ++-
.../direct/ImmutableListBundleFactoryTest.java | 14 +++++++++++---
.../direct/KeyedPValueTrackingVisitorTest.java | 6 +++---
.../beam/runners/direct/ParDoEvaluatorTest.java | 5 ++++-
.../beam/runners/direct/SideInputContainerTest.java | 5 +++--
.../direct/StatefulParDoEvaluatorFactoryTest.java | 7 +++++--
.../beam/runners/direct/StepTransformResultTest.java | 5 ++++-
.../direct/TestStreamEvaluatorFactoryTest.java | 5 ++++-
.../beam/runners/direct/TransformExecutorTest.java | 4 +++-
.../direct/UnboundedReadEvaluatorFactoryTest.java | 9 ++++-----
.../beam/runners/direct/ViewEvaluatorFactoryTest.java | 5 ++++-
.../runners/direct/WatermarkCallbackExecutorTest.java | 5 ++++-
.../beam/runners/direct/WatermarkManagerTest.java | 6 ++++--
.../runners/direct/WindowEvaluatorFactoryTest.java | 5 ++++-
.../runners/direct/WriteWithShardingFactoryTest.java | 14 +++++++-------
25 files changed, 109 insertions(+), 57 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/09c404a6/runners/direct-java/src/test/java/org/apache/beam/runners/direct/BoundedReadEvaluatorFactoryTest.java
----------------------------------------------------------------------
diff --git a/runners/direct-java/src/test/java/org/apache/beam/runners/direct/BoundedReadEvaluatorFactoryTest.java b/runners/direct-java/src/test/java/org/apache/beam/runners/direct/BoundedReadEvaluatorFactoryTest.java
index acb1444..97eae27 100644
--- a/runners/direct-java/src/test/java/org/apache/beam/runners/direct/BoundedReadEvaluatorFactoryTest.java
+++ b/runners/direct-java/src/test/java/org/apache/beam/runners/direct/BoundedReadEvaluatorFactoryTest.java
@@ -62,6 +62,7 @@ import org.apache.beam.sdk.values.PCollection;
import org.hamcrest.Matchers;
import org.joda.time.Instant;
import org.junit.Before;
+import org.junit.Rule;
import org.junit.Test;
import org.junit.runner.RunWith;
import org.junit.runners.JUnit4;
@@ -82,11 +83,13 @@ public class BoundedReadEvaluatorFactoryTest {
private BundleFactory bundleFactory;
private AppliedPTransform<?, ?, ?> longsProducer;
+ @Rule
+ public TestPipeline p = TestPipeline.create().enableAbandonedNodeEnforcement(false);
+
@Before
public void setup() {
MockitoAnnotations.initMocks(this);
source = CountingSource.upTo(10L);
- TestPipeline p = TestPipeline.create();
longs = p.apply(Read.from(source));
factory =
@@ -142,7 +145,7 @@ public class BoundedReadEvaluatorFactoryTest {
elems[i] = (long) i;
}
PCollection<Long> read =
- TestPipeline.create().apply(Read.from(new TestSource<>(VarLongCoder.of(), 5, elems)));
+ p.apply(Read.from(new TestSource<>(VarLongCoder.of(), 5, elems)));
AppliedPTransform<?, ?, ?> transform = DirectGraphs.getProducer(read);
Collection<CommittedBundle<?>> unreadInputs =
new BoundedReadEvaluatorFactory.InputProvider(context).getInitialInputs(transform, 1);
@@ -191,8 +194,7 @@ public class BoundedReadEvaluatorFactoryTest {
BoundedReadEvaluatorFactory factory = new BoundedReadEvaluatorFactory(context, 0L);
PCollection<Long> read =
- TestPipeline.create()
- .apply(Read.from(SourceTestUtils.toUnsplittableSource(CountingSource.upTo(10L))));
+ p.apply(Read.from(SourceTestUtils.toUnsplittableSource(CountingSource.upTo(10L))));
AppliedPTransform<?, ?, ?> transform = DirectGraphs.getProducer(read);
when(context.createRootBundle()).thenReturn(bundleFactory.createRootBundle());
@@ -298,8 +300,6 @@ public class BoundedReadEvaluatorFactoryTest {
@Test
public void boundedSourceEvaluatorClosesReader() throws Exception {
TestSource<Long> source = new TestSource<>(BigEndianLongCoder.of(), 1L, 2L, 3L);
-
- TestPipeline p = TestPipeline.create();
PCollection<Long> pcollection = p.apply(Read.from(source));
AppliedPTransform<?, ?, ?> sourceTransform = DirectGraphs.getProducer(pcollection);
@@ -320,7 +320,6 @@ public class BoundedReadEvaluatorFactoryTest {
public void boundedSourceEvaluatorNoElementsClosesReader() throws Exception {
TestSource<Long> source = new TestSource<>(BigEndianLongCoder.of());
- TestPipeline p = TestPipeline.create();
PCollection<Long> pcollection = p.apply(Read.from(source));
AppliedPTransform<?, ?, ?> sourceTransform = DirectGraphs.getProducer(pcollection);
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/09c404a6/runners/direct-java/src/test/java/org/apache/beam/runners/direct/CloningBundleFactoryTest.java
----------------------------------------------------------------------
diff --git a/runners/direct-java/src/test/java/org/apache/beam/runners/direct/CloningBundleFactoryTest.java b/runners/direct-java/src/test/java/org/apache/beam/runners/direct/CloningBundleFactoryTest.java
index bafab59..e5299a2 100644
--- a/runners/direct-java/src/test/java/org/apache/beam/runners/direct/CloningBundleFactoryTest.java
+++ b/runners/direct-java/src/test/java/org/apache/beam/runners/direct/CloningBundleFactoryTest.java
@@ -62,6 +62,8 @@ import org.junit.runners.JUnit4;
@RunWith(JUnit4.class)
public class CloningBundleFactoryTest {
@Rule public ExpectedException thrown = ExpectedException.none();
+ @Rule public TestPipeline p = TestPipeline.create().enableAbandonedNodeEnforcement(false);
+
private CloningBundleFactory factory = CloningBundleFactory.create();
@Test
@@ -76,7 +78,6 @@ public class CloningBundleFactoryTest {
@Test
public void bundleWorkingCoderSucceedsClonesOutput() {
- TestPipeline p = TestPipeline.create();
PCollection<Integer> created = p.apply(Create.of(1, 3).withCoder(VarIntCoder.of()));
PCollection<KV<String, Integer>> kvs =
created
@@ -101,7 +102,6 @@ public class CloningBundleFactoryTest {
@Test
public void keyedBundleWorkingCoderSucceedsClonesOutput() {
- TestPipeline p = TestPipeline.create();
PCollection<Integer> created = p.apply(Create.of(1, 3).withCoder(VarIntCoder.of()));
PCollection<KV<String, Iterable<Integer>>> keyed =
@@ -130,7 +130,6 @@ public class CloningBundleFactoryTest {
@Test
public void bundleEncodeFailsAddFails() {
- TestPipeline p = TestPipeline.create();
PCollection<Record> pc = p.apply(Create.<Record>of().withCoder(new RecordNoEncodeCoder()));
UncommittedBundle<Record> bundle = factory.createBundle(pc);
@@ -142,7 +141,6 @@ public class CloningBundleFactoryTest {
@Test
public void bundleDecodeFailsAddFails() {
- TestPipeline p = TestPipeline.create();
PCollection<Record> pc = p.apply(Create.<Record>of().withCoder(new RecordNoDecodeCoder()));
UncommittedBundle<Record> bundle = factory.createBundle(pc);
@@ -154,7 +152,6 @@ public class CloningBundleFactoryTest {
@Test
public void keyedBundleEncodeFailsAddFails() {
- TestPipeline p = TestPipeline.create();
PCollection<Record> pc = p.apply(Create.<Record>of().withCoder(new RecordNoEncodeCoder()));
UncommittedBundle<Record> bundle =
factory.createKeyedBundle(StructuralKey.of("foo", StringUtf8Coder.of()), pc);
@@ -167,7 +164,6 @@ public class CloningBundleFactoryTest {
@Test
public void keyedBundleDecodeFailsAddFails() {
- TestPipeline p = TestPipeline.create();
PCollection<Record> pc = p.apply(Create.<Record>of().withCoder(new RecordNoDecodeCoder()));
UncommittedBundle<Record> bundle =
factory.createKeyedBundle(StructuralKey.of("foo", StringUtf8Coder.of()), pc);
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/09c404a6/runners/direct-java/src/test/java/org/apache/beam/runners/direct/CommittedResultTest.java
----------------------------------------------------------------------
diff --git a/runners/direct-java/src/test/java/org/apache/beam/runners/direct/CommittedResultTest.java b/runners/direct-java/src/test/java/org/apache/beam/runners/direct/CommittedResultTest.java
index c6986c0..736f554 100644
--- a/runners/direct-java/src/test/java/org/apache/beam/runners/direct/CommittedResultTest.java
+++ b/runners/direct-java/src/test/java/org/apache/beam/runners/direct/CommittedResultTest.java
@@ -38,6 +38,7 @@ import org.apache.beam.sdk.values.PCollection;
import org.apache.beam.sdk.values.PDone;
import org.hamcrest.Matchers;
import org.joda.time.Instant;
+import org.junit.Rule;
import org.junit.Test;
import org.junit.runner.RunWith;
import org.junit.runners.JUnit4;
@@ -47,7 +48,10 @@ import org.junit.runners.JUnit4;
*/
@RunWith(JUnit4.class)
public class CommittedResultTest implements Serializable {
- private transient TestPipeline p = TestPipeline.create();
+
+ @Rule
+ public transient TestPipeline p = TestPipeline.create().enableAbandonedNodeEnforcement(false);
+
private transient PCollection<Integer> created = p.apply(Create.of(1, 2));
private transient AppliedPTransform<?, ?, ?> transform =
AppliedPTransform.of("foo", p.begin(), PDone.in(p), new PTransform<PBegin, PDone>() {
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/09c404a6/runners/direct-java/src/test/java/org/apache/beam/runners/direct/CopyOnAccessInMemoryStateInternalsTest.java
----------------------------------------------------------------------
diff --git a/runners/direct-java/src/test/java/org/apache/beam/runners/direct/CopyOnAccessInMemoryStateInternalsTest.java b/runners/direct-java/src/test/java/org/apache/beam/runners/direct/CopyOnAccessInMemoryStateInternalsTest.java
index deefc68..35245f4 100644
--- a/runners/direct-java/src/test/java/org/apache/beam/runners/direct/CopyOnAccessInMemoryStateInternalsTest.java
+++ b/runners/direct-java/src/test/java/org/apache/beam/runners/direct/CopyOnAccessInMemoryStateInternalsTest.java
@@ -61,8 +61,15 @@ import org.junit.runners.JUnit4;
*/
@RunWith(JUnit4.class)
public class CopyOnAccessInMemoryStateInternalsTest {
+
+ @Rule public TestPipeline pipeline = TestPipeline.create();
@Rule public ExpectedException thrown = ExpectedException.none();
private String key = "foo";
+
+ public CopyOnAccessInMemoryStateInternalsTest() {
+ pipeline = TestPipeline.create();
+ }
+
@Test
public void testGetWithEmpty() {
CopyOnAccessInMemoryStateInternals<String> internals =
@@ -167,7 +174,7 @@ public class CopyOnAccessInMemoryStateInternalsTest {
CombineFn<Long, long[], Long> sumLongFn = new Sum.SumLongFn();
StateNamespace namespace = new StateNamespaceForTest("foo");
- CoderRegistry reg = TestPipeline.create().getCoderRegistry();
+ CoderRegistry reg = pipeline.getCoderRegistry();
StateTag<Object, AccumulatorCombiningState<Long, long[], Long>> stateTag =
StateTags.combiningValue("summer",
sumLongFn.getAccumulatorCoder(reg, reg.getDefaultCoder(Long.class)), sumLongFn);
@@ -197,7 +204,7 @@ public class CopyOnAccessInMemoryStateInternalsTest {
KeyedCombineFn<String, Long, long[], Long> sumLongFn = new Sum.SumLongFn().asKeyedFn();
StateNamespace namespace = new StateNamespaceForTest("foo");
- CoderRegistry reg = TestPipeline.create().getCoderRegistry();
+ CoderRegistry reg = pipeline.getCoderRegistry();
StateTag<String, AccumulatorCombiningState<Long, long[], Long>> stateTag =
StateTags.keyedCombiningValue(
"summer",
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/09c404a6/runners/direct-java/src/test/java/org/apache/beam/runners/direct/DirectGraphVisitorTest.java
----------------------------------------------------------------------
diff --git a/runners/direct-java/src/test/java/org/apache/beam/runners/direct/DirectGraphVisitorTest.java b/runners/direct-java/src/test/java/org/apache/beam/runners/direct/DirectGraphVisitorTest.java
index b88c9a4..c3bbe2d 100644
--- a/runners/direct-java/src/test/java/org/apache/beam/runners/direct/DirectGraphVisitorTest.java
+++ b/runners/direct-java/src/test/java/org/apache/beam/runners/direct/DirectGraphVisitorTest.java
@@ -60,8 +60,9 @@ import org.junit.runners.JUnit4;
@RunWith(JUnit4.class)
public class DirectGraphVisitorTest implements Serializable {
@Rule public transient ExpectedException thrown = ExpectedException.none();
+ @Rule public transient TestPipeline p = TestPipeline.create()
+ .enableAbandonedNodeEnforcement(false);
- private transient TestPipeline p = TestPipeline.create();
private transient DirectGraphVisitor visitor = new DirectGraphVisitor();
@Test
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/09c404a6/runners/direct-java/src/test/java/org/apache/beam/runners/direct/EvaluationContextTest.java
----------------------------------------------------------------------
diff --git a/runners/direct-java/src/test/java/org/apache/beam/runners/direct/EvaluationContextTest.java b/runners/direct-java/src/test/java/org/apache/beam/runners/direct/EvaluationContextTest.java
index f11c370..bf36204 100644
--- a/runners/direct-java/src/test/java/org/apache/beam/runners/direct/EvaluationContextTest.java
+++ b/runners/direct-java/src/test/java/org/apache/beam/runners/direct/EvaluationContextTest.java
@@ -68,6 +68,7 @@ import org.apache.beam.sdk.values.PCollectionView;
import org.hamcrest.Matchers;
import org.joda.time.Instant;
import org.junit.Before;
+import org.junit.Rule;
import org.junit.Test;
import org.junit.runner.RunWith;
import org.junit.runners.JUnit4;
@@ -77,7 +78,6 @@ import org.junit.runners.JUnit4;
*/
@RunWith(JUnit4.class)
public class EvaluationContextTest {
- private TestPipeline p;
private EvaluationContext context;
private PCollection<Integer> created;
@@ -92,13 +92,14 @@ public class EvaluationContextTest {
private AppliedPTransform<?, ?, ?> viewProducer;
private AppliedPTransform<?, ?, ?> unboundedProducer;
+ @Rule
+ public TestPipeline p = TestPipeline.create().enableAbandonedNodeEnforcement(false);
+
@Before
public void setup() {
DirectRunner runner =
DirectRunner.fromOptions(PipelineOptionsFactory.create());
- p = TestPipeline.create();
-
created = p.apply(Create.of(1, 2, 3));
downstream = created.apply(WithKeys.<String, Integer>of("foo"));
view = created.apply(View.<Integer>asIterable());
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/09c404a6/runners/direct-java/src/test/java/org/apache/beam/runners/direct/FlattenEvaluatorFactoryTest.java
----------------------------------------------------------------------
diff --git a/runners/direct-java/src/test/java/org/apache/beam/runners/direct/FlattenEvaluatorFactoryTest.java b/runners/direct-java/src/test/java/org/apache/beam/runners/direct/FlattenEvaluatorFactoryTest.java
index 9e22c36..cda68f0 100644
--- a/runners/direct-java/src/test/java/org/apache/beam/runners/direct/FlattenEvaluatorFactoryTest.java
+++ b/runners/direct-java/src/test/java/org/apache/beam/runners/direct/FlattenEvaluatorFactoryTest.java
@@ -37,6 +37,7 @@ import org.apache.beam.sdk.values.PCollection;
import org.apache.beam.sdk.values.PCollectionList;
import org.hamcrest.Matchers;
import org.joda.time.Instant;
+import org.junit.Rule;
import org.junit.Test;
import org.junit.runner.RunWith;
import org.junit.runners.JUnit4;
@@ -48,9 +49,11 @@ import org.junit.runners.JUnit4;
public class FlattenEvaluatorFactoryTest {
private BundleFactory bundleFactory = ImmutableListBundleFactory.create();
+ @Rule
+ public TestPipeline p = TestPipeline.create().enableAbandonedNodeEnforcement(false);
+
@Test
public void testFlattenInMemoryEvaluator() throws Exception {
- TestPipeline p = TestPipeline.create();
PCollection<Integer> left = p.apply("left", Create.of(1, 2, 4));
PCollection<Integer> right = p.apply("right", Create.of(-1, 2, -4));
PCollectionList<Integer> list = PCollectionList.of(left).and(right);
@@ -118,7 +121,6 @@ public class FlattenEvaluatorFactoryTest {
@Test
public void testFlattenInMemoryEvaluatorWithEmptyPCollectionList() throws Exception {
- TestPipeline p = TestPipeline.create();
PCollectionList<Integer> list = PCollectionList.empty(p);
PCollection<Integer> flattened = list.apply(Flatten.<Integer>pCollections());
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/09c404a6/runners/direct-java/src/test/java/org/apache/beam/runners/direct/GroupByKeyEvaluatorFactoryTest.java
----------------------------------------------------------------------
diff --git a/runners/direct-java/src/test/java/org/apache/beam/runners/direct/GroupByKeyEvaluatorFactoryTest.java b/runners/direct-java/src/test/java/org/apache/beam/runners/direct/GroupByKeyEvaluatorFactoryTest.java
index f0b29f0..fefafd0 100644
--- a/runners/direct-java/src/test/java/org/apache/beam/runners/direct/GroupByKeyEvaluatorFactoryTest.java
+++ b/runners/direct-java/src/test/java/org/apache/beam/runners/direct/GroupByKeyEvaluatorFactoryTest.java
@@ -41,6 +41,7 @@ import org.apache.beam.sdk.values.PCollection;
import org.hamcrest.BaseMatcher;
import org.hamcrest.Description;
import org.joda.time.Instant;
+import org.junit.Rule;
import org.junit.Test;
import org.junit.runner.RunWith;
import org.junit.runners.JUnit4;
@@ -52,9 +53,11 @@ import org.junit.runners.JUnit4;
public class GroupByKeyEvaluatorFactoryTest {
private BundleFactory bundleFactory = ImmutableListBundleFactory.create();
+ @Rule
+ public TestPipeline p = TestPipeline.create().enableAbandonedNodeEnforcement(false);
+
@Test
public void testInMemoryEvaluator() throws Exception {
- TestPipeline p = TestPipeline.create();
KV<String, Integer> firstFoo = KV.of("foo", -1);
KV<String, Integer> secondFoo = KV.of("foo", 1);
KV<String, Integer> thirdFoo = KV.of("foo", 3);
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/09c404a6/runners/direct-java/src/test/java/org/apache/beam/runners/direct/GroupByKeyOnlyEvaluatorFactoryTest.java
----------------------------------------------------------------------
diff --git a/runners/direct-java/src/test/java/org/apache/beam/runners/direct/GroupByKeyOnlyEvaluatorFactoryTest.java b/runners/direct-java/src/test/java/org/apache/beam/runners/direct/GroupByKeyOnlyEvaluatorFactoryTest.java
index 7efdb3d..94514ad 100644
--- a/runners/direct-java/src/test/java/org/apache/beam/runners/direct/GroupByKeyOnlyEvaluatorFactoryTest.java
+++ b/runners/direct-java/src/test/java/org/apache/beam/runners/direct/GroupByKeyOnlyEvaluatorFactoryTest.java
@@ -41,6 +41,7 @@ import org.apache.beam.sdk.values.PCollection;
import org.hamcrest.BaseMatcher;
import org.hamcrest.Description;
import org.joda.time.Instant;
+import org.junit.Rule;
import org.junit.Test;
import org.junit.runner.RunWith;
import org.junit.runners.JUnit4;
@@ -52,9 +53,11 @@ import org.junit.runners.JUnit4;
public class GroupByKeyOnlyEvaluatorFactoryTest {
private BundleFactory bundleFactory = ImmutableListBundleFactory.create();
+ @Rule
+ public TestPipeline p = TestPipeline.create().enableAbandonedNodeEnforcement(false);
+
@Test
public void testInMemoryEvaluator() throws Exception {
- TestPipeline p = TestPipeline.create();
KV<String, Integer> firstFoo = KV.of("foo", -1);
KV<String, Integer> secondFoo = KV.of("foo", 1);
KV<String, Integer> thirdFoo = KV.of("foo", 3);
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/09c404a6/runners/direct-java/src/test/java/org/apache/beam/runners/direct/ImmutabilityCheckingBundleFactoryTest.java
----------------------------------------------------------------------
diff --git a/runners/direct-java/src/test/java/org/apache/beam/runners/direct/ImmutabilityCheckingBundleFactoryTest.java b/runners/direct-java/src/test/java/org/apache/beam/runners/direct/ImmutabilityCheckingBundleFactoryTest.java
index 6ab8aea..2448078 100644
--- a/runners/direct-java/src/test/java/org/apache/beam/runners/direct/ImmutabilityCheckingBundleFactoryTest.java
+++ b/runners/direct-java/src/test/java/org/apache/beam/runners/direct/ImmutabilityCheckingBundleFactoryTest.java
@@ -46,14 +46,16 @@ import org.junit.runners.JUnit4;
*/
@RunWith(JUnit4.class)
public class ImmutabilityCheckingBundleFactoryTest {
+
+ @Rule public TestPipeline p = TestPipeline.create().enableAbandonedNodeEnforcement(false);
@Rule public ExpectedException thrown = ExpectedException.none();
private ImmutabilityCheckingBundleFactory factory;
private PCollection<byte[]> created;
private PCollection<byte[]> transformed;
+
@Before
public void setup() {
- TestPipeline p = TestPipeline.create();
created = p.apply(Create.<byte[]>of().withCoder(ByteArrayCoder.of()));
transformed = created.apply(ParDo.of(new IdentityDoFn<byte[]>()));
DirectGraphVisitor visitor = new DirectGraphVisitor();
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/09c404a6/runners/direct-java/src/test/java/org/apache/beam/runners/direct/ImmutabilityEnforcementFactoryTest.java
----------------------------------------------------------------------
diff --git a/runners/direct-java/src/test/java/org/apache/beam/runners/direct/ImmutabilityEnforcementFactoryTest.java b/runners/direct-java/src/test/java/org/apache/beam/runners/direct/ImmutabilityEnforcementFactoryTest.java
index 1ad6ba6..cd3e9b4 100644
--- a/runners/direct-java/src/test/java/org/apache/beam/runners/direct/ImmutabilityEnforcementFactoryTest.java
+++ b/runners/direct-java/src/test/java/org/apache/beam/runners/direct/ImmutabilityEnforcementFactoryTest.java
@@ -42,6 +42,8 @@ import org.junit.runners.JUnit4;
*/
@RunWith(JUnit4.class)
public class ImmutabilityEnforcementFactoryTest implements Serializable {
+ @Rule public transient TestPipeline p =
+ TestPipeline.create().enableAbandonedNodeEnforcement(false);
@Rule public transient ExpectedException thrown = ExpectedException.none();
private transient ImmutabilityEnforcementFactory factory;
private transient BundleFactory bundleFactory;
@@ -52,7 +54,6 @@ public class ImmutabilityEnforcementFactoryTest implements Serializable {
public void setup() {
factory = new ImmutabilityEnforcementFactory();
bundleFactory = ImmutableListBundleFactory.create();
- TestPipeline p = TestPipeline.create();
pcollection =
p.apply(Create.of("foo".getBytes(), "spamhameggs".getBytes()))
.apply(
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/09c404a6/runners/direct-java/src/test/java/org/apache/beam/runners/direct/ImmutableListBundleFactoryTest.java
----------------------------------------------------------------------
diff --git a/runners/direct-java/src/test/java/org/apache/beam/runners/direct/ImmutableListBundleFactoryTest.java b/runners/direct-java/src/test/java/org/apache/beam/runners/direct/ImmutableListBundleFactoryTest.java
index a36c408..46f02cd 100644
--- a/runners/direct-java/src/test/java/org/apache/beam/runners/direct/ImmutableListBundleFactoryTest.java
+++ b/runners/direct-java/src/test/java/org/apache/beam/runners/direct/ImmutableListBundleFactoryTest.java
@@ -57,6 +57,7 @@ import org.junit.runners.JUnit4;
*/
@RunWith(JUnit4.class)
public class ImmutableListBundleFactoryTest {
+ @Rule public TestPipeline p = TestPipeline.create().enableAbandonedNodeEnforcement(false);
@Rule public ExpectedException thrown = ExpectedException.none();
private ImmutableListBundleFactory bundleFactory = ImmutableListBundleFactory.create();
@@ -66,13 +67,12 @@ public class ImmutableListBundleFactoryTest {
@Before
public void setup() {
- TestPipeline p = TestPipeline.create();
created = p.apply(Create.of(1, 2, 3));
downstream = created.apply(WithKeys.<String, Integer>of("foo"));
}
private <T> void createKeyedBundle(Coder<T> coder, T key) throws Exception {
- PCollection<Integer> pcollection = TestPipeline.create().apply(Create.of(1));
+ PCollection<Integer> pcollection = p.apply("Create", Create.of(1));
StructuralKey<?> skey = StructuralKey.of(key, coder);
UncommittedBundle<Integer> inFlightBundle = bundleFactory.createKeyedBundle(skey, pcollection);
@@ -87,9 +87,17 @@ public class ImmutableListBundleFactoryTest {
}
@Test
- public void keyedWithKeyShouldCreateKeyedBundle() throws Exception {
+ public void keyedWithStringKeyShouldCreateKeyedBundle() throws Exception {
createKeyedBundle(StringUtf8Coder.of(), "foo");
+ }
+
+ @Test
+ public void keyedWithVarIntKeyShouldCreateKeyedBundle() throws Exception {
createKeyedBundle(VarIntCoder.of(), 1234);
+ }
+
+ @Test
+ public void keyedWithByteArrayKeyShouldCreateKeyedBundle() throws Exception {
createKeyedBundle(ByteArrayCoder.of(), new byte[] {0, 2, 4, 99});
}
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/09c404a6/runners/direct-java/src/test/java/org/apache/beam/runners/direct/KeyedPValueTrackingVisitorTest.java
----------------------------------------------------------------------
diff --git a/runners/direct-java/src/test/java/org/apache/beam/runners/direct/KeyedPValueTrackingVisitorTest.java b/runners/direct-java/src/test/java/org/apache/beam/runners/direct/KeyedPValueTrackingVisitorTest.java
index 0852cd3..eef3375 100644
--- a/runners/direct-java/src/test/java/org/apache/beam/runners/direct/KeyedPValueTrackingVisitorTest.java
+++ b/runners/direct-java/src/test/java/org/apache/beam/runners/direct/KeyedPValueTrackingVisitorTest.java
@@ -24,7 +24,6 @@ import static org.junit.Assert.assertThat;
import com.google.common.collect.ImmutableSet;
import java.util.Collections;
import java.util.Set;
-import org.apache.beam.sdk.Pipeline;
import org.apache.beam.sdk.coders.IterableCoder;
import org.apache.beam.sdk.coders.KvCoder;
import org.apache.beam.sdk.coders.VarIntCoder;
@@ -53,11 +52,12 @@ public class KeyedPValueTrackingVisitorTest {
@Rule public ExpectedException thrown = ExpectedException.none();
private KeyedPValueTrackingVisitor visitor;
- private Pipeline p;
+ @Rule
+ public TestPipeline p = TestPipeline.create().enableAbandonedNodeEnforcement(false);
@Before
public void setup() {
- p = TestPipeline.create();
+
@SuppressWarnings("rawtypes")
Set<Class<? extends PTransform>> producesKeyed =
ImmutableSet.<Class<? extends PTransform>>of(PrimitiveKeyer.class, CompositeKeyer.class);
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/09c404a6/runners/direct-java/src/test/java/org/apache/beam/runners/direct/ParDoEvaluatorTest.java
----------------------------------------------------------------------
diff --git a/runners/direct-java/src/test/java/org/apache/beam/runners/direct/ParDoEvaluatorTest.java b/runners/direct-java/src/test/java/org/apache/beam/runners/direct/ParDoEvaluatorTest.java
index d48ac14..1a3207b 100644
--- a/runners/direct-java/src/test/java/org/apache/beam/runners/direct/ParDoEvaluatorTest.java
+++ b/runners/direct-java/src/test/java/org/apache/beam/runners/direct/ParDoEvaluatorTest.java
@@ -54,6 +54,7 @@ import org.apache.beam.sdk.values.TupleTagList;
import org.hamcrest.Matchers;
import org.joda.time.Instant;
import org.junit.Before;
+import org.junit.Rule;
import org.junit.Test;
import org.junit.runner.RunWith;
import org.junit.runners.JUnit4;
@@ -72,10 +73,12 @@ public class ParDoEvaluatorTest {
private List<TupleTag<?>> sideOutputTags;
private BundleFactory bundleFactory;
+ @Rule
+ public TestPipeline p = TestPipeline.create().enableAbandonedNodeEnforcement(false);
+
@Before
public void setup() {
MockitoAnnotations.initMocks(this);
- TestPipeline p = TestPipeline.create();
inputPc = p.apply(Create.of(1, 2, 3));
mainOutputTag = new TupleTag<Integer>() {};
sideOutputTags = TupleTagList.empty().getAll();
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/09c404a6/runners/direct-java/src/test/java/org/apache/beam/runners/direct/SideInputContainerTest.java
----------------------------------------------------------------------
diff --git a/runners/direct-java/src/test/java/org/apache/beam/runners/direct/SideInputContainerTest.java b/runners/direct-java/src/test/java/org/apache/beam/runners/direct/SideInputContainerTest.java
index cc7d88a..183decd 100644
--- a/runners/direct-java/src/test/java/org/apache/beam/runners/direct/SideInputContainerTest.java
+++ b/runners/direct-java/src/test/java/org/apache/beam/runners/direct/SideInputContainerTest.java
@@ -96,12 +96,14 @@ public class SideInputContainerTest {
};
@Rule
+ public TestPipeline pipeline = TestPipeline.create().enableAbandonedNodeEnforcement(false);
+
+ @Rule
public ExpectedException thrown = ExpectedException.none();
@Mock
private EvaluationContext context;
- private TestPipeline pipeline;
private SideInputContainer container;
@@ -114,7 +116,6 @@ public class SideInputContainerTest {
@Before
public void setup() {
MockitoAnnotations.initMocks(this);
- pipeline = TestPipeline.create();
PCollection<Integer> create =
pipeline.apply("forBaseCollection", Create.<Integer>of(1, 2, 3, 4));
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/09c404a6/runners/direct-java/src/test/java/org/apache/beam/runners/direct/StatefulParDoEvaluatorFactoryTest.java
----------------------------------------------------------------------
diff --git a/runners/direct-java/src/test/java/org/apache/beam/runners/direct/StatefulParDoEvaluatorFactoryTest.java b/runners/direct-java/src/test/java/org/apache/beam/runners/direct/StatefulParDoEvaluatorFactoryTest.java
index 326310b..d312aa3 100644
--- a/runners/direct-java/src/test/java/org/apache/beam/runners/direct/StatefulParDoEvaluatorFactoryTest.java
+++ b/runners/direct-java/src/test/java/org/apache/beam/runners/direct/StatefulParDoEvaluatorFactoryTest.java
@@ -67,6 +67,7 @@ import org.apache.beam.sdk.values.PCollectionView;
import org.joda.time.Duration;
import org.joda.time.Instant;
import org.junit.Before;
+import org.junit.Rule;
import org.junit.Test;
import org.junit.runner.RunWith;
import org.junit.runners.JUnit4;
@@ -91,6 +92,10 @@ public class StatefulParDoEvaluatorFactoryTest implements Serializable {
private static final BundleFactory BUNDLE_FACTORY = ImmutableListBundleFactory.create();
+ @Rule
+ public transient TestPipeline pipeline =
+ TestPipeline.create().enableAbandonedNodeEnforcement(false);
+
@Before
public void setup() {
MockitoAnnotations.initMocks(this);
@@ -106,7 +111,6 @@ public class StatefulParDoEvaluatorFactoryTest implements Serializable {
public void windowCleanupScheduled() throws Exception {
// To test the factory, first we set up a pipeline and then we use the constructed
// pipeline to create the right parameters to pass to the factory
- TestPipeline pipeline = TestPipeline.create();
final String stateId = "my-state-id";
@@ -208,7 +212,6 @@ public class StatefulParDoEvaluatorFactoryTest implements Serializable {
public void testUnprocessedElements() throws Exception {
// To test the factory, first we set up a pipeline and then we use the constructed
// pipeline to create the right parameters to pass to the factory
- TestPipeline pipeline = TestPipeline.create();
final String stateId = "my-state-id";
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/09c404a6/runners/direct-java/src/test/java/org/apache/beam/runners/direct/StepTransformResultTest.java
----------------------------------------------------------------------
diff --git a/runners/direct-java/src/test/java/org/apache/beam/runners/direct/StepTransformResultTest.java b/runners/direct-java/src/test/java/org/apache/beam/runners/direct/StepTransformResultTest.java
index d3a2cca..0d94b7a 100644
--- a/runners/direct-java/src/test/java/org/apache/beam/runners/direct/StepTransformResultTest.java
+++ b/runners/direct-java/src/test/java/org/apache/beam/runners/direct/StepTransformResultTest.java
@@ -31,6 +31,7 @@ import org.apache.beam.sdk.transforms.Create;
import org.apache.beam.sdk.values.PCollection;
import org.hamcrest.Matchers;
import org.junit.Before;
+import org.junit.Rule;
import org.junit.Test;
import org.junit.runner.RunWith;
import org.junit.runners.JUnit4;
@@ -44,9 +45,11 @@ public class StepTransformResultTest {
private BundleFactory bundleFactory;
private PCollection<Integer> pc;
+ @Rule
+ public TestPipeline p = TestPipeline.create().enableAbandonedNodeEnforcement(false);
+
@Before
public void setup() {
- TestPipeline p = TestPipeline.create();
pc = p.apply(Create.of(1, 2, 3));
transform = DirectGraphs.getGraph(p).getProducer(pc);
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/09c404a6/runners/direct-java/src/test/java/org/apache/beam/runners/direct/TestStreamEvaluatorFactoryTest.java
----------------------------------------------------------------------
diff --git a/runners/direct-java/src/test/java/org/apache/beam/runners/direct/TestStreamEvaluatorFactoryTest.java b/runners/direct-java/src/test/java/org/apache/beam/runners/direct/TestStreamEvaluatorFactoryTest.java
index 6bb8623..c5b3b3d 100644
--- a/runners/direct-java/src/test/java/org/apache/beam/runners/direct/TestStreamEvaluatorFactoryTest.java
+++ b/runners/direct-java/src/test/java/org/apache/beam/runners/direct/TestStreamEvaluatorFactoryTest.java
@@ -41,6 +41,7 @@ import org.hamcrest.Matchers;
import org.joda.time.Duration;
import org.joda.time.Instant;
import org.junit.Before;
+import org.junit.Rule;
import org.junit.Test;
import org.junit.runner.RunWith;
import org.junit.runners.JUnit4;
@@ -52,6 +53,9 @@ public class TestStreamEvaluatorFactoryTest {
private BundleFactory bundleFactory;
private EvaluationContext context;
+ @Rule
+ public TestPipeline p = TestPipeline.create().enableAbandonedNodeEnforcement(false);
+
@Before
public void setup() {
context = mock(EvaluationContext.class);
@@ -62,7 +66,6 @@ public class TestStreamEvaluatorFactoryTest {
/** Demonstrates that returned evaluators produce elements in sequence. */
@Test
public void producesElementsInSequence() throws Exception {
- TestPipeline p = TestPipeline.create();
PCollection<Integer> streamVals =
p.apply(
TestStream.create(VarIntCoder.of())
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/09c404a6/runners/direct-java/src/test/java/org/apache/beam/runners/direct/TransformExecutorTest.java
----------------------------------------------------------------------
diff --git a/runners/direct-java/src/test/java/org/apache/beam/runners/direct/TransformExecutorTest.java b/runners/direct-java/src/test/java/org/apache/beam/runners/direct/TransformExecutorTest.java
index 4ad22bc..e66ffcf 100644
--- a/runners/direct-java/src/test/java/org/apache/beam/runners/direct/TransformExecutorTest.java
+++ b/runners/direct-java/src/test/java/org/apache/beam/runners/direct/TransformExecutorTest.java
@@ -73,6 +73,9 @@ public class TransformExecutorTest {
@Mock private EvaluationContext evaluationContext;
@Mock private TransformEvaluatorRegistry registry;
+ @Rule
+ public TestPipeline p = TestPipeline.create().enableAbandonedNodeEnforcement(false);
+
@Before
public void setup() {
MockitoAnnotations.initMocks(this);
@@ -85,7 +88,6 @@ public class TransformExecutorTest {
evaluatorCompleted = new CountDownLatch(1);
completionCallback = new RegisteringCompletionCallback(evaluatorCompleted);
- TestPipeline p = TestPipeline.create();
created = p.apply(Create.of("foo", "spam", "third"));
PCollection<KV<Integer, String>> downstream = created.apply(WithKeys.<Integer, String>of(3));
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/09c404a6/runners/direct-java/src/test/java/org/apache/beam/runners/direct/UnboundedReadEvaluatorFactoryTest.java
----------------------------------------------------------------------
diff --git a/runners/direct-java/src/test/java/org/apache/beam/runners/direct/UnboundedReadEvaluatorFactoryTest.java b/runners/direct-java/src/test/java/org/apache/beam/runners/direct/UnboundedReadEvaluatorFactoryTest.java
index dd36a2f..92d668e 100644
--- a/runners/direct-java/src/test/java/org/apache/beam/runners/direct/UnboundedReadEvaluatorFactoryTest.java
+++ b/runners/direct-java/src/test/java/org/apache/beam/runners/direct/UnboundedReadEvaluatorFactoryTest.java
@@ -72,6 +72,7 @@ import org.joda.time.DateTime;
import org.joda.time.Instant;
import org.joda.time.ReadableInstant;
import org.junit.Before;
+import org.junit.Rule;
import org.junit.Test;
import org.junit.runner.RunWith;
import org.junit.runners.JUnit4;
@@ -93,10 +94,12 @@ public class UnboundedReadEvaluatorFactoryTest {
private UnboundedSource<Long, ?> source;
private DirectGraph graph;
+ @Rule
+ public TestPipeline p = TestPipeline.create().enableAbandonedNodeEnforcement(false);
+
@Before
public void setup() {
source = CountingSource.unboundedWithTimestampFn(new LongToInstantFn());
- TestPipeline p = TestPipeline.create();
longs = p.apply(Read.from(source));
context = mock(EvaluationContext.class);
@@ -190,7 +193,6 @@ public class UnboundedReadEvaluatorFactoryTest {
new TestUnboundedSource<>(BigEndianLongCoder.of(), outputs);
source.dedupes = true;
- TestPipeline p = TestPipeline.create();
PCollection<Long> pcollection = p.apply(Read.from(source));
AppliedPTransform<?, ?, ?> sourceTransform = getProducer(pcollection);
@@ -231,7 +233,6 @@ public class UnboundedReadEvaluatorFactoryTest {
@Test
public void noElementsAvailableReaderIncludedInResidual() throws Exception {
- TestPipeline p = TestPipeline.create();
// Read with a very slow rate so by the second read there are no more elements
PCollection<Long> pcollection =
p.apply(Read.from(new TestUnboundedSource<>(VarLongCoder.of(), 1L)));
@@ -291,7 +292,6 @@ public class UnboundedReadEvaluatorFactoryTest {
TestUnboundedSource<Long> source =
new TestUnboundedSource<>(BigEndianLongCoder.of(), elems.toArray(new Long[0]));
- TestPipeline p = TestPipeline.create();
PCollection<Long> pcollection = p.apply(Read.from(source));
DirectGraph graph = DirectGraphs.getGraph(p);
AppliedPTransform<?, ?, ?> sourceTransform =
@@ -337,7 +337,6 @@ public class UnboundedReadEvaluatorFactoryTest {
TestUnboundedSource<Long> source =
new TestUnboundedSource<>(BigEndianLongCoder.of(), elems.toArray(new Long[0]));
- TestPipeline p = TestPipeline.create();
PCollection<Long> pcollection = p.apply(Read.from(source));
AppliedPTransform<?, ?, ?> sourceTransform =
DirectGraphs.getGraph(p).getProducer(pcollection);
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/09c404a6/runners/direct-java/src/test/java/org/apache/beam/runners/direct/ViewEvaluatorFactoryTest.java
----------------------------------------------------------------------
diff --git a/runners/direct-java/src/test/java/org/apache/beam/runners/direct/ViewEvaluatorFactoryTest.java b/runners/direct-java/src/test/java/org/apache/beam/runners/direct/ViewEvaluatorFactoryTest.java
index 7c08009..6baf55a 100644
--- a/runners/direct-java/src/test/java/org/apache/beam/runners/direct/ViewEvaluatorFactoryTest.java
+++ b/runners/direct-java/src/test/java/org/apache/beam/runners/direct/ViewEvaluatorFactoryTest.java
@@ -41,6 +41,7 @@ import org.apache.beam.sdk.util.WindowedValue;
import org.apache.beam.sdk.values.PCollection;
import org.apache.beam.sdk.values.PCollectionView;
import org.joda.time.Instant;
+import org.junit.Rule;
import org.junit.Test;
import org.junit.runner.RunWith;
import org.junit.runners.JUnit4;
@@ -52,9 +53,11 @@ import org.junit.runners.JUnit4;
public class ViewEvaluatorFactoryTest {
private BundleFactory bundleFactory = ImmutableListBundleFactory.create();
+ @Rule
+ public TestPipeline p = TestPipeline.create().enableAbandonedNodeEnforcement(false);
+
@Test
public void testInMemoryEvaluator() throws Exception {
- TestPipeline p = TestPipeline.create();
PCollection<String> input = p.apply(Create.of("foo", "bar"));
CreatePCollectionView<String, Iterable<String>> createView =
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/09c404a6/runners/direct-java/src/test/java/org/apache/beam/runners/direct/WatermarkCallbackExecutorTest.java
----------------------------------------------------------------------
diff --git a/runners/direct-java/src/test/java/org/apache/beam/runners/direct/WatermarkCallbackExecutorTest.java b/runners/direct-java/src/test/java/org/apache/beam/runners/direct/WatermarkCallbackExecutorTest.java
index acdabb6..8d6e73f 100644
--- a/runners/direct-java/src/test/java/org/apache/beam/runners/direct/WatermarkCallbackExecutorTest.java
+++ b/runners/direct-java/src/test/java/org/apache/beam/runners/direct/WatermarkCallbackExecutorTest.java
@@ -37,6 +37,7 @@ import org.apache.beam.sdk.values.PCollection;
import org.joda.time.Duration;
import org.joda.time.Instant;
import org.junit.Before;
+import org.junit.Rule;
import org.junit.Test;
import org.junit.runner.RunWith;
import org.junit.runners.JUnit4;
@@ -51,9 +52,11 @@ public class WatermarkCallbackExecutorTest {
private AppliedPTransform<?, ?, ?> create;
private AppliedPTransform<?, ?, ?> sum;
+ @Rule
+ public TestPipeline p = TestPipeline.create().enableAbandonedNodeEnforcement(false);
+
@Before
public void setup() {
- TestPipeline p = TestPipeline.create();
PCollection<Integer> created = p.apply(Create.of(1, 2, 3));
PCollection<Integer> summed = created.apply(Sum.integersGlobally());
DirectGraph graph = DirectGraphs.getGraph(p);
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/09c404a6/runners/direct-java/src/test/java/org/apache/beam/runners/direct/WatermarkManagerTest.java
----------------------------------------------------------------------
diff --git a/runners/direct-java/src/test/java/org/apache/beam/runners/direct/WatermarkManagerTest.java b/runners/direct-java/src/test/java/org/apache/beam/runners/direct/WatermarkManagerTest.java
index eb4d0cd..abc8a28 100644
--- a/runners/direct-java/src/test/java/org/apache/beam/runners/direct/WatermarkManagerTest.java
+++ b/runners/direct-java/src/test/java/org/apache/beam/runners/direct/WatermarkManagerTest.java
@@ -70,6 +70,7 @@ import org.hamcrest.Matchers;
import org.joda.time.Instant;
import org.joda.time.ReadableInstant;
import org.junit.Before;
+import org.junit.Rule;
import org.junit.Test;
import org.junit.runner.RunWith;
import org.junit.runners.JUnit4;
@@ -94,9 +95,11 @@ public class WatermarkManagerTest implements Serializable {
private transient BundleFactory bundleFactory;
private DirectGraph graph;
+ @Rule
+ public transient TestPipeline p = TestPipeline.create().enableAbandonedNodeEnforcement(false);
+
@Before
public void setup() {
- TestPipeline p = TestPipeline.create();
createdInts = p.apply("createdInts", Create.of(1, 2, 3));
@@ -278,7 +281,6 @@ public class WatermarkManagerTest implements Serializable {
*/
@Test
public void getWatermarkMultiIdenticalInput() {
- TestPipeline p = TestPipeline.create();
PCollection<Integer> created = p.apply(Create.of(1, 2, 3));
PCollection<Integer> multiConsumer =
PCollectionList.of(created).and(created).apply(Flatten.<Integer>pCollections());
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/09c404a6/runners/direct-java/src/test/java/org/apache/beam/runners/direct/WindowEvaluatorFactoryTest.java
----------------------------------------------------------------------
diff --git a/runners/direct-java/src/test/java/org/apache/beam/runners/direct/WindowEvaluatorFactoryTest.java b/runners/direct-java/src/test/java/org/apache/beam/runners/direct/WindowEvaluatorFactoryTest.java
index 66c28ce..9d0c68d 100644
--- a/runners/direct-java/src/test/java/org/apache/beam/runners/direct/WindowEvaluatorFactoryTest.java
+++ b/runners/direct-java/src/test/java/org/apache/beam/runners/direct/WindowEvaluatorFactoryTest.java
@@ -52,6 +52,7 @@ import org.hamcrest.Matchers;
import org.joda.time.Duration;
import org.joda.time.Instant;
import org.junit.Before;
+import org.junit.Rule;
import org.junit.Test;
import org.junit.runner.RunWith;
import org.junit.runners.JUnit4;
@@ -96,10 +97,12 @@ public class WindowEvaluatorFactoryTest {
ImmutableList.of(GlobalWindow.INSTANCE, intervalWindow1, intervalWindow2),
PaneInfo.NO_FIRING);
+ @Rule
+ public TestPipeline p = TestPipeline.create().enableAbandonedNodeEnforcement(false);
+
@Before
public void setup() {
MockitoAnnotations.initMocks(this);
- TestPipeline p = TestPipeline.create();
input = p.apply(Create.of(1L, 2L, 3L));
bundleFactory = ImmutableListBundleFactory.create();
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/09c404a6/runners/direct-java/src/test/java/org/apache/beam/runners/direct/WriteWithShardingFactoryTest.java
----------------------------------------------------------------------
diff --git a/runners/direct-java/src/test/java/org/apache/beam/runners/direct/WriteWithShardingFactoryTest.java b/runners/direct-java/src/test/java/org/apache/beam/runners/direct/WriteWithShardingFactoryTest.java
index aeb75ed..a8c4c02 100644
--- a/runners/direct-java/src/test/java/org/apache/beam/runners/direct/WriteWithShardingFactoryTest.java
+++ b/runners/direct-java/src/test/java/org/apache/beam/runners/direct/WriteWithShardingFactoryTest.java
@@ -69,6 +69,7 @@ public class WriteWithShardingFactoryTest {
public static final int INPUT_SIZE = 10000;
@Rule public TemporaryFolder tmp = new TemporaryFolder();
private WriteWithShardingFactory<Object> factory = new WriteWithShardingFactory<>();
+ @Rule public TestPipeline p = TestPipeline.create().enableAbandonedNodeEnforcement(false);
@Test
public void dynamicallyReshardedWrite() throws Exception {
@@ -81,7 +82,6 @@ public class WriteWithShardingFactoryTest {
String fileName = "resharded_write";
String outputPath = tmp.getRoot().getAbsolutePath();
String targetLocation = IOChannelUtils.resolve(outputPath, fileName);
- TestPipeline p = TestPipeline.create();
// TextIO is implemented in terms of the Write PTransform. When sharding is not specified,
// resharding should be automatically applied
p.apply(Create.of(strs)).apply(TextIO.Write.to(targetLocation));
@@ -134,7 +134,7 @@ public class WriteWithShardingFactoryTest {
public void keyBasedOnCountFnWithOneElement() throws Exception {
PCollectionView<Long> elementCountView =
PCollectionViews.singletonView(
- TestPipeline.create(), WindowingStrategy.globalDefault(), true, 0L, VarLongCoder.of());
+ p, WindowingStrategy.globalDefault(), true, 0L, VarLongCoder.of());
KeyBasedOnCountFn<String> fn = new KeyBasedOnCountFn<>(elementCountView, 0);
DoFnTester<String, KV<Integer, String>> fnTester = DoFnTester.of(fn);
@@ -149,7 +149,7 @@ public class WriteWithShardingFactoryTest {
public void keyBasedOnCountFnWithTwoElements() throws Exception {
PCollectionView<Long> elementCountView =
PCollectionViews.singletonView(
- TestPipeline.create(), WindowingStrategy.globalDefault(), true, 0L, VarLongCoder.of());
+ p, WindowingStrategy.globalDefault(), true, 0L, VarLongCoder.of());
KeyBasedOnCountFn<String> fn = new KeyBasedOnCountFn<>(elementCountView, 0);
DoFnTester<String, KV<Integer, String>> fnTester = DoFnTester.of(fn);
@@ -167,7 +167,7 @@ public class WriteWithShardingFactoryTest {
public void keyBasedOnCountFnFewElementsThreeShards() throws Exception {
PCollectionView<Long> elementCountView =
PCollectionViews.singletonView(
- TestPipeline.create(), WindowingStrategy.globalDefault(), true, 0L, VarLongCoder.of());
+ p, WindowingStrategy.globalDefault(), true, 0L, VarLongCoder.of());
KeyBasedOnCountFn<String> fn = new KeyBasedOnCountFn<>(elementCountView, 0);
DoFnTester<String, KV<Integer, String>> fnTester = DoFnTester.of(fn);
@@ -191,7 +191,7 @@ public class WriteWithShardingFactoryTest {
public void keyBasedOnCountFnManyElements() throws Exception {
PCollectionView<Long> elementCountView =
PCollectionViews.singletonView(
- TestPipeline.create(), WindowingStrategy.globalDefault(), true, 0L, VarLongCoder.of());
+ p, WindowingStrategy.globalDefault(), true, 0L, VarLongCoder.of());
KeyBasedOnCountFn<String> fn = new KeyBasedOnCountFn<>(elementCountView, 0);
DoFnTester<String, KV<Integer, String>> fnTester = DoFnTester.of(fn);
@@ -214,7 +214,7 @@ public class WriteWithShardingFactoryTest {
public void keyBasedOnCountFnFewElementsExtraShards() throws Exception {
PCollectionView<Long> elementCountView =
PCollectionViews.singletonView(
- TestPipeline.create(), WindowingStrategy.globalDefault(), true, 0L, VarLongCoder.of());
+ p, WindowingStrategy.globalDefault(), true, 0L, VarLongCoder.of());
KeyBasedOnCountFn<String> fn = new KeyBasedOnCountFn<>(elementCountView, 10);
DoFnTester<String, KV<Integer, String>> fnTester = DoFnTester.of(fn);
@@ -238,7 +238,7 @@ public class WriteWithShardingFactoryTest {
public void keyBasedOnCountFnManyElementsExtraShards() throws Exception {
PCollectionView<Long> elementCountView =
PCollectionViews.singletonView(
- TestPipeline.create(), WindowingStrategy.globalDefault(), true, 0L, VarLongCoder.of());
+ p, WindowingStrategy.globalDefault(), true, 0L, VarLongCoder.of());
KeyBasedOnCountFn<String> fn = new KeyBasedOnCountFn<>(elementCountView, 3);
DoFnTester<String, KV<Integer, String>> fnTester = DoFnTester.of(fn);