You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@beam.apache.org by dh...@apache.org on 2016/12/22 23:34:55 UTC
[4/5] incubator-beam git commit: Add Parameters to finishSpecifying
Add Parameters to finishSpecifying
Remove the need to use getProducingTransformInternal in TypedPValue.
Ensure that all nodes are finished specifying before a call to
TransformHierarchy#visit. This ensures that all nodes are fully
specified without requiring the Pipeline or Runner to do so explicitly.
Project: http://git-wip-us.apache.org/repos/asf/incubator-beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-beam/commit/99c49b04
Tree: http://git-wip-us.apache.org/repos/asf/incubator-beam/tree/99c49b04
Diff: http://git-wip-us.apache.org/repos/asf/incubator-beam/diff/99c49b04
Branch: refs/heads/python-sdk
Commit: 99c49b040aad52fc6558d70fee65d74f59b420de
Parents: 6d97262
Author: Thomas Groh <tg...@google.com>
Authored: Thu Dec 8 14:33:36 2016 -0800
Committer: Dan Halperin <dh...@google.com>
Committed: Thu Dec 22 15:34:41 2016 -0800
----------------------------------------------------------------------
.../translation/ParDoBoundTranslatorTest.java | 32 ++++----
.../beam/runners/direct/DirectGraphVisitor.java | 21 ------
.../beam/runners/direct/DirectRunner.java | 1 -
.../runners/direct/DirectGraphVisitorTest.java | 32 +-------
.../direct/FlattenEvaluatorFactoryTest.java | 2 +
.../direct/KeyedPValueTrackingVisitorTest.java | 17 ++++-
.../beam/runners/spark/ForceStreamingTest.java | 2 -
.../main/java/org/apache/beam/sdk/Pipeline.java | 3 +
.../beam/sdk/runners/TransformHierarchy.java | 45 ++++++-----
.../transforms/join/KeyedPCollectionTuple.java | 32 ++++----
.../java/org/apache/beam/sdk/values/PBegin.java | 5 --
.../apache/beam/sdk/values/PCollectionList.java | 13 +---
.../beam/sdk/values/PCollectionTuple.java | 13 +---
.../java/org/apache/beam/sdk/values/PInput.java | 9 ---
.../org/apache/beam/sdk/values/POutput.java | 20 ++---
.../beam/sdk/values/POutputValueBase.java | 4 +-
.../java/org/apache/beam/sdk/values/PValue.java | 15 ++++
.../org/apache/beam/sdk/values/PValueBase.java | 3 +-
.../org/apache/beam/sdk/values/TypedPValue.java | 78 +++++++++++---------
.../sdk/runners/TransformHierarchyTest.java | 34 +++++----
.../apache/beam/sdk/transforms/ParDoTest.java | 7 +-
.../apache/beam/sdk/values/TypedPValueTest.java | 7 +-
22 files changed, 185 insertions(+), 210 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/99c49b04/runners/apex/src/test/java/org/apache/beam/runners/apex/translation/ParDoBoundTranslatorTest.java
----------------------------------------------------------------------
diff --git a/runners/apex/src/test/java/org/apache/beam/runners/apex/translation/ParDoBoundTranslatorTest.java b/runners/apex/src/test/java/org/apache/beam/runners/apex/translation/ParDoBoundTranslatorTest.java
index fa94b2a..f88a94d 100644
--- a/runners/apex/src/test/java/org/apache/beam/runners/apex/translation/ParDoBoundTranslatorTest.java
+++ b/runners/apex/src/test/java/org/apache/beam/runners/apex/translation/ParDoBoundTranslatorTest.java
@@ -45,6 +45,7 @@ import org.apache.beam.sdk.Pipeline;
import org.apache.beam.sdk.coders.Coder;
import org.apache.beam.sdk.coders.SerializableCoder;
import org.apache.beam.sdk.coders.VarIntCoder;
+import org.apache.beam.sdk.coders.VoidCoder;
import org.apache.beam.sdk.options.PipelineOptionsFactory;
import org.apache.beam.sdk.testing.PAssert;
import org.apache.beam.sdk.transforms.Create;
@@ -286,21 +287,22 @@ public class ParDoBoundTranslatorTest {
Arrays.asList(sideInput1, sideInput2),
Arrays.<TupleTag<String>>asList())));
- outputs.get(mainOutputTag).apply(ParDo.of(new EmbeddedCollector()));
- ApexRunnerResult result = (ApexRunnerResult) pipeline.run();
-
- HashSet<String> expected = Sets.newHashSet("processing: 3: [11, 222]",
- "processing: -42: [11, 222]", "processing: 666: [11, 222]");
- long timeout = System.currentTimeMillis() + TIMEOUT_MILLIS;
- while (System.currentTimeMillis() < timeout) {
- if (EmbeddedCollector.RESULTS.containsAll(expected)) {
- break;
- }
- LOG.info("Waiting for expected results.");
- Thread.sleep(SLEEP_MILLIS);
- }
- result.cancel();
- Assert.assertEquals(Sets.newHashSet(expected), EmbeddedCollector.RESULTS);
+ outputs.get(mainOutputTag).apply(ParDo.of(new EmbeddedCollector()));
+ outputs.get(sideOutputTag).setCoder(VoidCoder.of());
+ ApexRunnerResult result = (ApexRunnerResult) pipeline.run();
+
+ HashSet<String> expected = Sets.newHashSet("processing: 3: [11, 222]",
+ "processing: -42: [11, 222]", "processing: 666: [11, 222]");
+ long timeout = System.currentTimeMillis() + TIMEOUT_MILLIS;
+ while (System.currentTimeMillis() < timeout) {
+ if (EmbeddedCollector.RESULTS.containsAll(expected)) {
+ break;
+ }
+ LOG.info("Waiting for expected results.");
+ Thread.sleep(SLEEP_MILLIS);
+ }
+ result.cancel();
+ Assert.assertEquals(Sets.newHashSet(expected), EmbeddedCollector.RESULTS);
}
private static class TestMultiOutputWithSideInputsFn extends DoFn<Integer, String> {
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/99c49b04/runners/direct-java/src/main/java/org/apache/beam/runners/direct/DirectGraphVisitor.java
----------------------------------------------------------------------
diff --git a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/DirectGraphVisitor.java b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/DirectGraphVisitor.java
index 425bbf1..7e6845d 100644
--- a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/DirectGraphVisitor.java
+++ b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/DirectGraphVisitor.java
@@ -27,7 +27,6 @@ import java.util.Map;
import java.util.Set;
import org.apache.beam.sdk.Pipeline;
import org.apache.beam.sdk.Pipeline.PipelineVisitor;
-import org.apache.beam.sdk.runners.PipelineRunner;
import org.apache.beam.sdk.runners.TransformHierarchy;
import org.apache.beam.sdk.transforms.AppliedPTransform;
import org.apache.beam.sdk.transforms.PTransform;
@@ -51,7 +50,6 @@ class DirectGraphVisitor extends PipelineVisitor.Defaults {
private Set<PCollectionView<?>> views = new HashSet<>();
private Set<AppliedPTransform<?, ?, ?>> rootTransforms = new HashSet<>();
private Map<AppliedPTransform<?, ?, ?>, String> stepNames = new HashMap<>();
- private Set<PValue> toFinalize = new HashSet<>();
private int numTransforms = 0;
private boolean finalized = false;
@@ -80,9 +78,6 @@ class DirectGraphVisitor extends PipelineVisitor.Defaults {
@Override
public void visitPrimitiveTransform(TransformHierarchy.Node node) {
- for (TaggedPValue consumed : node.getInputs()) {
- toFinalize.remove(consumed.getValue());
- }
AppliedPTransform<?, ?, ?> appliedTransform = getAppliedTransform(node);
stepNames.put(appliedTransform, genStepName());
if (node.getInputs().isEmpty()) {
@@ -96,8 +91,6 @@ class DirectGraphVisitor extends PipelineVisitor.Defaults {
@Override
public void visitValue(PValue value, TransformHierarchy.Node producer) {
- toFinalize.add(value);
-
AppliedPTransform<?, ?, ?> appliedTransform = getAppliedTransform(producer);
if (value instanceof PCollectionView) {
views.add((PCollectionView<?>) value);
@@ -118,20 +111,6 @@ class DirectGraphVisitor extends PipelineVisitor.Defaults {
}
/**
- * Returns all of the {@link PValue PValues} that have been produced but not consumed. These
- * {@link PValue PValues} should be finalized by the {@link PipelineRunner} before the
- * {@link Pipeline} is executed.
- */
- public void finishSpecifyingRemainder() {
- checkState(
- finalized,
- "Can't call finishSpecifyingRemainder before the Pipeline has been completely traversed");
- for (PValue unfinalized : toFinalize) {
- unfinalized.finishSpecifying();
- }
- }
-
- /**
* Get the graph constructed by this {@link DirectGraphVisitor}, which provides
* lookups for producers and consumers of {@link PValue PValues}.
*/
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/99c49b04/runners/direct-java/src/main/java/org/apache/beam/runners/direct/DirectRunner.java
----------------------------------------------------------------------
diff --git a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/DirectRunner.java b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/DirectRunner.java
index 7e6ea15..5793b00 100644
--- a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/DirectRunner.java
+++ b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/DirectRunner.java
@@ -301,7 +301,6 @@ public class DirectRunner extends PipelineRunner<DirectPipelineResult> {
MetricsEnvironment.setMetricsSupported(true);
DirectGraphVisitor graphVisitor = new DirectGraphVisitor();
pipeline.traverseTopologically(graphVisitor);
- graphVisitor.finishSpecifyingRemainder();
@SuppressWarnings("rawtypes")
KeyedPValueTrackingVisitor keyedPValueVisitor = KeyedPValueTrackingVisitor.create();
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/99c49b04/runners/direct-java/src/test/java/org/apache/beam/runners/direct/DirectGraphVisitorTest.java
----------------------------------------------------------------------
diff --git a/runners/direct-java/src/test/java/org/apache/beam/runners/direct/DirectGraphVisitorTest.java b/runners/direct-java/src/test/java/org/apache/beam/runners/direct/DirectGraphVisitorTest.java
index c3bbe2d..01d11a3 100644
--- a/runners/direct-java/src/test/java/org/apache/beam/runners/direct/DirectGraphVisitorTest.java
+++ b/runners/direct-java/src/test/java/org/apache/beam/runners/direct/DirectGraphVisitorTest.java
@@ -20,7 +20,6 @@ package org.apache.beam.runners.direct;
import static org.hamcrest.Matchers.emptyIterable;
import static org.hamcrest.Matchers.equalTo;
import static org.hamcrest.Matchers.hasSize;
-import static org.hamcrest.Matchers.is;
import static org.junit.Assert.assertThat;
import static org.junit.Assert.assertTrue;
@@ -28,6 +27,7 @@ import com.google.common.collect.Iterables;
import java.io.Serializable;
import java.util.ArrayList;
import java.util.List;
+import org.apache.beam.sdk.coders.StringUtf8Coder;
import org.apache.beam.sdk.io.CountingInput;
import org.apache.beam.sdk.io.CountingSource;
import org.apache.beam.sdk.io.Read;
@@ -111,6 +111,7 @@ public class DirectGraphVisitorTest implements Serializable {
FlattenPCollectionList<String> flatten = Flatten.pCollections();
PCollectionList<String> emptyList = PCollectionList.empty(p);
PCollection<String> empty = emptyList.apply(flatten);
+ empty.setCoder(StringUtf8Coder.of());
p.traverseTopologically(visitor);
DirectGraph graph = visitor.getGraph();
assertThat(
@@ -177,27 +178,6 @@ public class DirectGraphVisitorTest implements Serializable {
}
@Test
- public void getUnfinalizedPValuesContainsDanglingOutputs() {
- PCollection<String> created = p.apply(Create.of("1", "2", "3"));
- PCollection<String> transformed =
- created.apply(
- ParDo.of(
- new DoFn<String, String>() {
- @ProcessElement
- public void processElement(DoFn<String, String>.ProcessContext c)
- throws Exception {
- c.output(Integer.toString(c.element().length()));
- }
- }));
-
- assertThat(transformed.isFinishedSpecifyingInternal(), is(false));
-
- p.traverseTopologically(visitor);
- visitor.finishSpecifyingRemainder();
- assertThat(transformed.isFinishedSpecifyingInternal(), is(true));
- }
-
- @Test
public void getStepNamesContainsAllTransforms() {
PCollection<String> created = p.apply(Create.of("1", "2", "3"));
PCollection<String> transformed =
@@ -254,12 +234,4 @@ public class DirectGraphVisitorTest implements Serializable {
thrown.expectMessage("get a graph");
visitor.getGraph();
}
-
- @Test
- public void finishSpecifyingRemainderWithoutVisitingThrows() {
- thrown.expect(IllegalStateException.class);
- thrown.expectMessage("completely traversed");
- thrown.expectMessage("finishSpecifyingRemainder");
- visitor.finishSpecifyingRemainder();
- }
}
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/99c49b04/runners/direct-java/src/test/java/org/apache/beam/runners/direct/FlattenEvaluatorFactoryTest.java
----------------------------------------------------------------------
diff --git a/runners/direct-java/src/test/java/org/apache/beam/runners/direct/FlattenEvaluatorFactoryTest.java b/runners/direct-java/src/test/java/org/apache/beam/runners/direct/FlattenEvaluatorFactoryTest.java
index cda68f0..e07c9f9 100644
--- a/runners/direct-java/src/test/java/org/apache/beam/runners/direct/FlattenEvaluatorFactoryTest.java
+++ b/runners/direct-java/src/test/java/org/apache/beam/runners/direct/FlattenEvaluatorFactoryTest.java
@@ -26,6 +26,7 @@ import static org.mockito.Mockito.when;
import com.google.common.collect.Iterables;
import org.apache.beam.runners.direct.DirectRunner.CommittedBundle;
import org.apache.beam.runners.direct.DirectRunner.UncommittedBundle;
+import org.apache.beam.sdk.coders.VarIntCoder;
import org.apache.beam.sdk.testing.TestPipeline;
import org.apache.beam.sdk.transforms.AppliedPTransform;
import org.apache.beam.sdk.transforms.Create;
@@ -124,6 +125,7 @@ public class FlattenEvaluatorFactoryTest {
PCollectionList<Integer> list = PCollectionList.empty(p);
PCollection<Integer> flattened = list.apply(Flatten.<Integer>pCollections());
+ flattened.setCoder(VarIntCoder.of());
EvaluationContext evaluationContext = mock(EvaluationContext.class);
when(evaluationContext.createBundle(flattened))
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/99c49b04/runners/direct-java/src/test/java/org/apache/beam/runners/direct/KeyedPValueTrackingVisitorTest.java
----------------------------------------------------------------------
diff --git a/runners/direct-java/src/test/java/org/apache/beam/runners/direct/KeyedPValueTrackingVisitorTest.java b/runners/direct-java/src/test/java/org/apache/beam/runners/direct/KeyedPValueTrackingVisitorTest.java
index a1fb81b..8fac534 100644
--- a/runners/direct-java/src/test/java/org/apache/beam/runners/direct/KeyedPValueTrackingVisitorTest.java
+++ b/runners/direct-java/src/test/java/org/apache/beam/runners/direct/KeyedPValueTrackingVisitorTest.java
@@ -23,6 +23,7 @@ import static org.junit.Assert.assertThat;
import java.util.Collections;
import org.apache.beam.runners.core.KeyedWorkItem;
+import org.apache.beam.runners.core.KeyedWorkItemCoder;
import org.apache.beam.sdk.coders.IterableCoder;
import org.apache.beam.sdk.coders.KvCoder;
import org.apache.beam.sdk.coders.StringUtf8Coder;
@@ -34,6 +35,7 @@ import org.apache.beam.sdk.transforms.DoFn;
import org.apache.beam.sdk.transforms.GroupByKey;
import org.apache.beam.sdk.transforms.Keys;
import org.apache.beam.sdk.transforms.ParDo;
+import org.apache.beam.sdk.transforms.windowing.GlobalWindow;
import org.apache.beam.sdk.transforms.windowing.IntervalWindow;
import org.apache.beam.sdk.transforms.windowing.PaneInfo;
import org.apache.beam.sdk.util.WindowedValue;
@@ -111,7 +113,13 @@ public class KeyedPValueTrackingVisitorTest {
KvCoder.of(StringUtf8Coder.of(), VarIntCoder.of()))))));
PCollection<KeyedWorkItem<String, KV<String, Integer>>> unkeyed =
- input.apply(ParDo.of(new ParDoMultiOverrideFactory.ToKeyedWorkItem<String, Integer>()));
+ input
+ .apply(ParDo.of(new ParDoMultiOverrideFactory.ToKeyedWorkItem<String, Integer>()))
+ .setCoder(
+ KeyedWorkItemCoder.of(
+ StringUtf8Coder.of(),
+ KvCoder.of(StringUtf8Coder.of(), VarIntCoder.of()),
+ GlobalWindow.Coder.INSTANCE));
p.traverseTopologically(visitor);
assertThat(visitor.getKeyedPValues(), not(hasItem(unkeyed)));
@@ -139,7 +147,12 @@ public class KeyedPValueTrackingVisitorTest {
PCollection<KeyedWorkItem<String, KV<String, Integer>>> keyed =
input
.apply(GroupByKey.<String, WindowedValue<KV<String, Integer>>>create())
- .apply(ParDo.of(new ParDoMultiOverrideFactory.ToKeyedWorkItem<String, Integer>()));
+ .apply(ParDo.of(new ParDoMultiOverrideFactory.ToKeyedWorkItem<String, Integer>()))
+ .setCoder(
+ KeyedWorkItemCoder.of(
+ StringUtf8Coder.of(),
+ KvCoder.of(StringUtf8Coder.of(), VarIntCoder.of()),
+ GlobalWindow.Coder.INSTANCE));
p.traverseTopologically(visitor);
assertThat(visitor.getKeyedPValues(), hasItem(keyed));
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/99c49b04/runners/spark/src/test/java/org/apache/beam/runners/spark/ForceStreamingTest.java
----------------------------------------------------------------------
diff --git a/runners/spark/src/test/java/org/apache/beam/runners/spark/ForceStreamingTest.java b/runners/spark/src/test/java/org/apache/beam/runners/spark/ForceStreamingTest.java
index 1b2ff08..b7b59d1 100644
--- a/runners/spark/src/test/java/org/apache/beam/runners/spark/ForceStreamingTest.java
+++ b/runners/spark/src/test/java/org/apache/beam/runners/spark/ForceStreamingTest.java
@@ -21,12 +21,10 @@ package org.apache.beam.runners.spark;
import static org.hamcrest.MatcherAssert.assertThat;
import java.io.IOException;
-import java.util.List;
import org.apache.beam.sdk.Pipeline;
import org.apache.beam.sdk.io.BoundedReadFromUnboundedSource;
import org.apache.beam.sdk.io.CountingSource;
import org.apache.beam.sdk.io.Read;
-import org.apache.beam.sdk.options.PipelineOptions;
import org.apache.beam.sdk.options.PipelineOptionsFactory;
import org.apache.beam.sdk.runners.TransformHierarchy;
import org.apache.beam.sdk.transforms.PTransform;
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/99c49b04/sdks/java/core/src/main/java/org/apache/beam/sdk/Pipeline.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/Pipeline.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/Pipeline.java
index 7a16f9d..eb0b199 100644
--- a/sdks/java/core/src/main/java/org/apache/beam/sdk/Pipeline.java
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/Pipeline.java
@@ -171,6 +171,8 @@ public class Pipeline {
* Runs the {@link Pipeline} using its {@link PipelineRunner}.
*/
public PipelineResult run() {
+ // Ensure all of the nodes are fully specified before a PipelineRunner gets access to the
+ // pipeline.
LOG.debug("Running {} via {}", this, runner);
try {
return runner.run(this);
@@ -281,6 +283,7 @@ public class Pipeline {
* <p>Typically invoked by {@link PipelineRunner} subclasses.
*/
public void traverseTopologically(PipelineVisitor visitor) {
+ // Ensure all nodes are fully specified before visiting the pipeline
Set<PValue> visitedValues =
// Visit all the transforms, which should implicitly visit all the values.
transforms.visit(visitor);
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/99c49b04/sdks/java/core/src/main/java/org/apache/beam/sdk/runners/TransformHierarchy.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/runners/TransformHierarchy.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/runners/TransformHierarchy.java
index 29e7fcb..3676e1a 100644
--- a/sdks/java/core/src/main/java/org/apache/beam/sdk/runners/TransformHierarchy.java
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/runners/TransformHierarchy.java
@@ -28,6 +28,7 @@ import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
+import java.util.Map.Entry;
import java.util.Set;
import javax.annotation.Nullable;
import org.apache.beam.sdk.Pipeline.PipelineVisitor;
@@ -46,6 +47,8 @@ import org.apache.beam.sdk.values.TaggedPValue;
public class TransformHierarchy {
private final Node root;
private final Map<POutput, Node> producers;
+ // A map of PValue to the PInput the producing PTransform is applied to
+ private final Map<PValue, PInput> producerInput;
// Maintain a stack based on the enclosing nodes
private Node current;
@@ -53,6 +56,7 @@ public class TransformHierarchy {
root = new Node(null, null, "", null);
current = root;
producers = new HashMap<>();
+ producerInput = new HashMap<>();
}
/**
@@ -86,7 +90,13 @@ public class TransformHierarchy {
public void finishSpecifyingInput() {
// Inputs must be completely specified before they are consumed by a transform.
for (TaggedPValue inputValue : current.getInputs()) {
- inputValue.getValue().finishSpecifying();
+ Node producerNode = getProducer(inputValue.getValue());
+ PInput input = producerInput.remove(inputValue.getValue());
+ inputValue.getValue().finishSpecifying(input, producerNode.getTransform());
+ checkState(
+ producers.get(inputValue.getValue()) != null,
+ "Producer unknown for input %s",
+ inputValue);
checkState(
producers.get(inputValue.getValue()) != null,
"Producer unknown for input %s",
@@ -105,12 +115,14 @@ public class TransformHierarchy {
* nodes.
*/
public void setOutput(POutput output) {
- output.finishSpecifyingOutput();
for (TaggedPValue value : output.expand()) {
if (!producers.containsKey(value.getValue())) {
producers.put(value.getValue(), current);
}
+ value.getValue().finishSpecifyingOutput(current.input, current.transform);
+ producerInput.put(value.getValue(), current.input);
}
+ output.finishSpecifyingOutput(current.input, current.transform);
current.setOutput(output);
// TODO: Replace with a "generateDefaultNames" method.
output.recordAsOutput(current.toAppliedPTransform());
@@ -130,27 +142,26 @@ public class TransformHierarchy {
return producers.get(produced);
}
- /**
- * Returns all producing transforms for the {@link PValue PValues} contained
- * in {@code output}.
- */
- List<Node> getProducingTransforms(POutput output) {
- List<Node> producingTransforms = new ArrayList<>();
- for (TaggedPValue value : output.expand()) {
- Node producer = getProducer(value.getValue());
- if (producer != null) {
- producingTransforms.add(producer);
- }
- }
- return producingTransforms;
- }
-
public Set<PValue> visit(PipelineVisitor visitor) {
+ finishSpecifying();
Set<PValue> visitedValues = new HashSet<>();
root.visit(visitor, visitedValues);
return visitedValues;
}
+ /**
+ * Finish specifying any remaining nodes within the {@link TransformHierarchy}. These are {@link
+ * PValue PValues} that are produced as output of some {@link PTransform} but are never consumed
+ * as input. These values must still be finished specifying.
+ */
+ private void finishSpecifying() {
+ for (Entry<PValue, PInput> producerInputEntry : producerInput.entrySet()) {
+ PValue value = producerInputEntry.getKey();
+ value.finishSpecifying(producerInputEntry.getValue(), getProducer(value).getTransform());
+ }
+ producerInput.clear();
+ }
+
public Node getCurrent() {
return current;
}
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/99c49b04/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/join/KeyedPCollectionTuple.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/join/KeyedPCollectionTuple.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/join/KeyedPCollectionTuple.java
index 13d4ee1..b373909 100644
--- a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/join/KeyedPCollectionTuple.java
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/join/KeyedPCollectionTuple.java
@@ -152,13 +152,21 @@ public class KeyedPCollectionTuple<K> implements PInput {
return pipeline;
}
- @Override
- public void finishSpecifying() {
- for (TaggedKeyedPCollection<K, ?> taggedPCollection : keyedCollections) {
- taggedPCollection.pCollection.finishSpecifying();
+ private static <K, V> Coder<K> getKeyCoder(PCollection<KV<K, V>> pc) {
+ // TODO: This should already have run coder inference for output, but may not have been consumed
+ // as input yet (and won't be fully specified); This is fine
+
+ // Assumes that the PCollection uses a KvCoder.
+ Coder<?> entryCoder = pc.getCoder();
+ if (!(entryCoder instanceof KvCoder<?, ?>)) {
+ throw new IllegalArgumentException("PCollection does not use a KvCoder");
}
+ @SuppressWarnings("unchecked")
+ KvCoder<K, V> coder = (KvCoder<K, V>) entryCoder;
+ return coder.getKeyCoder();
}
+
/////////////////////////////////////////////////////////////////////////////
/**
@@ -197,7 +205,7 @@ public class KeyedPCollectionTuple<K> implements PInput {
*/
private final List<TaggedKeyedPCollection<K, ?>> keyedCollections;
- private final Coder<K> keyCoder;
+ private Coder<K> keyCoder;
private final CoGbkResultSchema schema;
@@ -221,20 +229,6 @@ public class KeyedPCollectionTuple<K> implements PInput {
this.keyCoder = keyCoder;
}
- private static <K, V> Coder<K> getKeyCoder(PCollection<KV<K, V>> pc) {
- // Need to run coder inference on this PCollection before inspecting it.
- pc.finishSpecifying();
-
- // Assumes that the PCollection uses a KvCoder.
- Coder<?> entryCoder = pc.getCoder();
- if (!(entryCoder instanceof KvCoder<?, ?>)) {
- throw new IllegalArgumentException("PCollection does not use a KvCoder");
- }
- @SuppressWarnings("unchecked")
- KvCoder<K, V> coder = (KvCoder<K, V>) entryCoder;
- return coder.getKeyCoder();
- }
-
private static <K> List<TaggedKeyedPCollection<K, ?>> copyAddLast(
List<TaggedKeyedPCollection<K, ?>> keyedCollections,
TaggedKeyedPCollection<K, ?> taggedCollection) {
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/99c49b04/sdks/java/core/src/main/java/org/apache/beam/sdk/values/PBegin.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/values/PBegin.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/values/PBegin.java
index 9aa4615..2ba0f1c 100644
--- a/sdks/java/core/src/main/java/org/apache/beam/sdk/values/PBegin.java
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/values/PBegin.java
@@ -69,11 +69,6 @@ public class PBegin implements PInput {
return Collections.emptyList();
}
- @Override
- public void finishSpecifying() {
- // Nothing more to be done.
- }
-
/////////////////////////////////////////////////////////////////////////////
/**
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/99c49b04/sdks/java/core/src/main/java/org/apache/beam/sdk/values/PCollectionList.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/values/PCollectionList.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/values/PCollectionList.java
index e4bb7c5..dcb64a8 100644
--- a/sdks/java/core/src/main/java/org/apache/beam/sdk/values/PCollectionList.java
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/values/PCollectionList.java
@@ -234,17 +234,8 @@ public class PCollectionList<T> implements PInput, POutput {
}
@Override
- public void finishSpecifying() {
- for (TaggedPValue pc : pcollections) {
- pc.getValue().finishSpecifying();
- }
- }
-
- @Override
- public void finishSpecifyingOutput() {
- for (TaggedPValue pc : pcollections) {
- pc.getValue().finishSpecifyingOutput();
- }
+ public void finishSpecifyingOutput(PInput input, PTransform<?, ?> transform) {
+ // All component PCollections will have already been finished.
}
@Override
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/99c49b04/sdks/java/core/src/main/java/org/apache/beam/sdk/values/PCollectionTuple.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/values/PCollectionTuple.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/values/PCollectionTuple.java
index 6afe59e..d61db51 100644
--- a/sdks/java/core/src/main/java/org/apache/beam/sdk/values/PCollectionTuple.java
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/values/PCollectionTuple.java
@@ -255,17 +255,8 @@ public class PCollectionTuple implements PInput, POutput {
}
@Override
- public void finishSpecifying() {
- for (PCollection<?> pc : pcollectionMap.values()) {
- pc.finishSpecifying();
- }
- }
-
- @Override
- public void finishSpecifyingOutput() {
- for (PCollection<?> pc : pcollectionMap.values()) {
- pc.finishSpecifyingOutput();
- }
+ public void finishSpecifyingOutput(PInput input, PTransform<?, ?> transform) {
+ // All component PCollections will already have been finished
}
@Override
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/99c49b04/sdks/java/core/src/main/java/org/apache/beam/sdk/values/PInput.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/values/PInput.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/values/PInput.java
index a27b939..30d4297 100644
--- a/sdks/java/core/src/main/java/org/apache/beam/sdk/values/PInput.java
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/values/PInput.java
@@ -44,13 +44,4 @@ public interface PInput {
* <p>Not intended to be invoked directly by user code.
*/
List<TaggedPValue> expand();
-
- /**
- * After building, finalizes this {@code PInput} to make it ready for
- * being used as an input to a {@link org.apache.beam.sdk.transforms.PTransform}.
- *
- * <p>Automatically invoked whenever {@code apply()} is invoked on
- * this {@code PInput}, so users do not normally call this explicitly.
- */
- void finishSpecifying();
}
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/99c49b04/sdks/java/core/src/main/java/org/apache/beam/sdk/values/POutput.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/values/POutput.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/values/POutput.java
index e5d4504..062f565 100644
--- a/sdks/java/core/src/main/java/org/apache/beam/sdk/values/POutput.java
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/values/POutput.java
@@ -61,16 +61,18 @@ public interface POutput {
void recordAsOutput(AppliedPTransform<?, ?, ?> transform);
/**
- * As part of applying the producing {@link PTransform}, finalizes this
- * output to make it ready for being used as an input and for running.
+ * As part of applying the producing {@link PTransform}, finalizes this output to make it ready
+ * for being used as an input and for running.
*
- * <p>This includes ensuring that all {@link PCollection PCollections}
- * have {@link org.apache.beam.sdk.coders.Coder Coders} specified or defaulted.
+ * <p>This includes ensuring that all {@link PCollection PCollections} have {@link
+ * org.apache.beam.sdk.coders.Coder Coders} specified or defaulted.
*
- * <p>Automatically invoked whenever this {@link POutput} is used
- * as a {@link PInput} to another {@link PTransform}, or if never
- * used as a {@link PInput}, when {@link Pipeline#run}
- * is called, so users do not normally call this explicitly.
+ * <p>Automatically invoked whenever this {@link POutput} is output, after {@link
+ * PValue#finishSpecifyingOutput(PInput, PTransform)} has been called on each component {@link
+ * PValue} returned by {@link #expand()}.
+ *
+ * @deprecated see BEAM-1199
*/
- void finishSpecifyingOutput();
+ @Deprecated
+ void finishSpecifyingOutput(PInput input, PTransform<?, ?> transform);
}
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/99c49b04/sdks/java/core/src/main/java/org/apache/beam/sdk/values/POutputValueBase.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/values/POutputValueBase.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/values/POutputValueBase.java
index 4772c47..cdef58c 100644
--- a/sdks/java/core/src/main/java/org/apache/beam/sdk/values/POutputValueBase.java
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/values/POutputValueBase.java
@@ -89,12 +89,12 @@ public abstract class POutputValueBase implements POutput {
}
/**
- * Default behavior for {@link #finishSpecifyingOutput()} is
+ * Default behavior for {@link #finishSpecifyingOutput(PInput, PTransform)}} is
* to do nothing. Override if your {@link PValue} requires
* finalization.
*/
@Override
- public void finishSpecifyingOutput() { }
+ public void finishSpecifyingOutput(PInput input, PTransform<?, ?> transform) { }
/**
* The {@link PTransform} that produces this {@link POutputValueBase}.
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/99c49b04/sdks/java/core/src/main/java/org/apache/beam/sdk/values/PValue.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/values/PValue.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/values/PValue.java
index e6dbaf7..052a1f3 100644
--- a/sdks/java/core/src/main/java/org/apache/beam/sdk/values/PValue.java
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/values/PValue.java
@@ -36,6 +36,7 @@ public interface PValue extends POutput, PInput {
*
* <p>For internal use only.
*/
+ @Deprecated
AppliedPTransform<?, ?, ?> getProducingTransformInternal();
/**
@@ -46,4 +47,18 @@ public interface PValue extends POutput, PInput {
*/
@Deprecated
List<TaggedPValue> expand();
+
+ /**
+ * After building, finalizes this {@code PValue} to make it ready for being used as an input to a
+ * {@link org.apache.beam.sdk.transforms.PTransform}.
+ *
+ * <p>Automatically invoked whenever {@code apply()} is invoked on this {@code PValue}, after
+ * {@link PValue#finishSpecifying(PInput, PTransform)} has been called on each component {@link
+ * PValue}, so users do not normally call this explicitly.
+ *
+ * @param upstreamInput the {@link PInput} the {@link PTransform} was applied to to produce this
+ * output
+ * @param upstreamTransform the {@link PTransform} that produced this {@link PValue}
+ */
+ void finishSpecifying(PInput upstreamInput, PTransform<?, ?> upstreamTransform);
}
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/99c49b04/sdks/java/core/src/main/java/org/apache/beam/sdk/values/PValueBase.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/values/PValueBase.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/values/PValueBase.java
index 3a10d5d..7b44737 100644
--- a/sdks/java/core/src/main/java/org/apache/beam/sdk/values/PValueBase.java
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/values/PValueBase.java
@@ -138,8 +138,7 @@ public abstract class PValueBase extends POutputValueBase implements PValue {
}
@Override
- public void finishSpecifying() {
- finishSpecifyingOutput();
+ public void finishSpecifying(PInput input, PTransform<?, ?> transform) {
finishedSpecifying = true;
}
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/99c49b04/sdks/java/core/src/main/java/org/apache/beam/sdk/values/TypedPValue.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/values/TypedPValue.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/values/TypedPValue.java
index 7afd0a1..de1b99c 100644
--- a/sdks/java/core/src/main/java/org/apache/beam/sdk/values/TypedPValue.java
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/values/TypedPValue.java
@@ -17,12 +17,15 @@
*/
package org.apache.beam.sdk.values;
+import static com.google.common.base.Preconditions.checkArgument;
+import static com.google.common.base.Preconditions.checkState;
+
+import javax.annotation.Nullable;
import org.apache.beam.sdk.Pipeline;
import org.apache.beam.sdk.coders.CannotProvideCoderException;
import org.apache.beam.sdk.coders.CannotProvideCoderException.ReasonCode;
import org.apache.beam.sdk.coders.Coder;
import org.apache.beam.sdk.coders.CoderRegistry;
-import org.apache.beam.sdk.transforms.AppliedPTransform;
import org.apache.beam.sdk.transforms.PTransform;
import org.apache.beam.sdk.transforms.ParDo;
@@ -45,10 +48,8 @@ public abstract class TypedPValue<T> extends PValueBase implements PValue {
* couldn't be inferred.
*/
public Coder<T> getCoder() {
- if (coder == null) {
- coder = inferCoderOrFail();
- }
- return coder;
+ checkState(coderOrFailure.coder != null, coderOrFailure.failure);
+ return coderOrFailure.coder;
}
/**
@@ -60,18 +61,18 @@ public abstract class TypedPValue<T> extends PValueBase implements PValue {
* {@code apply()} called on it
*/
public TypedPValue<T> setCoder(Coder<T> coder) {
- if (isFinishedSpecifyingInternal()) {
- throw new IllegalStateException(
- "cannot change the Coder of " + this + " once it's been used");
- }
- if (coder == null) {
- throw new IllegalArgumentException(
- "Cannot setCoder(null)");
- }
- this.coder = coder;
+ checkState(
+ !isFinishedSpecifyingInternal(), "cannot change the Coder of %s once it's been used", this);
+ checkArgument(coder != null, "Cannot setCoder(null)");
+ this.coderOrFailure = new CoderOrFailure<>(coder, null);
return this;
}
+ @Override
+ public void finishSpecifyingOutput(PInput input, PTransform<?, ?> transform) {
+ this.coderOrFailure = inferCoderOrFail(input, transform, getPipeline().getCoderRegistry());
+ }
+
/**
* After building, finalizes this {@link PValue} to make it ready for
* running. Automatically invoked whenever the {@link PValue} is "used"
@@ -79,24 +80,26 @@ public abstract class TypedPValue<T> extends PValueBase implements PValue {
* run (useful if this is a {@link PValue} with no consumers).
*/
@Override
- public void finishSpecifying() {
+ public void finishSpecifying(PInput input, PTransform<?, ?> transform) {
if (isFinishedSpecifyingInternal()) {
return;
}
- super.finishSpecifying();
+ this.coderOrFailure = inferCoderOrFail(input, transform, getPipeline().getCoderRegistry());
// Ensure that this TypedPValue has a coder by inferring the coder if none exists; If not,
// this will throw an exception.
getCoder();
+ super.finishSpecifying(input, transform);
}
/////////////////////////////////////////////////////////////////////////////
// Internal details below here.
/**
- * The {@link Coder} used by this {@link TypedPValue} to encode and decode the
- * values stored in it, or null if not specified nor inferred yet.
+ * The {@link Coder} used by this {@link TypedPValue} to encode and decode the values stored in
+ * it, or null if not specified nor inferred yet.
*/
- private Coder<T> coder;
+ private CoderOrFailure<T> coderOrFailure =
+ new CoderOrFailure<>(null, "No Coder was specified, and Coder Inference did not occur");
protected TypedPValue(Pipeline p) {
super(p);
@@ -125,34 +128,31 @@ public abstract class TypedPValue<T> extends PValueBase implements PValue {
}
/**
- * If the coder is not explicitly set, this sets the coder for
- * this {@link TypedPValue} to the best coder that can be inferred
- * based upon the known {@link TypeDescriptor}. By default, this is null,
- * but can and should be improved by subclasses.
+ * If the coder is not explicitly set, this sets the coder for this {@link TypedPValue} to the
+ * best coder that can be inferred based upon the known {@link TypeDescriptor}. By default, this
+ * is null, but can and should be improved by subclasses.
*/
@SuppressWarnings({"unchecked", "rawtypes"})
- private Coder<T> inferCoderOrFail() {
+ private CoderOrFailure<T> inferCoderOrFail(
+ PInput input, PTransform<?, ?> transform, CoderRegistry registry) {
// First option for a coder: use the Coder set on this PValue.
- if (coder != null) {
- return coder;
+ if (coderOrFailure.coder != null) {
+ return coderOrFailure;
}
- AppliedPTransform<?, ?, ?> application = getProducingTransformInternal();
-
// Second option for a coder: Look in the coder registry.
- CoderRegistry registry = getPipeline().getCoderRegistry();
TypeDescriptor<T> token = getTypeDescriptor();
CannotProvideCoderException inferFromTokenException = null;
if (token != null) {
try {
- return registry.getDefaultCoder(token);
+ return new CoderOrFailure<>(registry.getDefaultCoder(token), null);
} catch (CannotProvideCoderException exc) {
inferFromTokenException = exc;
// Attempt to detect when the token came from a TupleTag used for a ParDo side output,
// and provide a better error message if so. Unfortunately, this information is not
// directly available from the TypeDescriptor, so infer based on the type of the PTransform
// and the error message itself.
- if (application.getTransform() instanceof ParDo.BoundMulti
+ if (transform instanceof ParDo.BoundMulti
&& exc.getReason() == ReasonCode.TYPE_ERASURE) {
inferFromTokenException = new CannotProvideCoderException(exc.getMessage()
+ " If this error occurs for a side output of the producing ParDo, verify that the "
@@ -165,8 +165,8 @@ public abstract class TypedPValue<T> extends PValueBase implements PValue {
// Third option for a coder: use the default Coder from the producing PTransform.
CannotProvideCoderException inputCoderException;
try {
- return ((PTransform) application.getTransform()).getDefaultOutputCoder(
- application.getInput(), this);
+ return new CoderOrFailure<>(
+ ((PTransform) transform).getDefaultOutputCoder(input, this), null);
} catch (CannotProvideCoderException exc) {
inputCoderException = exc;
}
@@ -193,6 +193,16 @@ public abstract class TypedPValue<T> extends PValueBase implements PValue {
}
// Build and throw the exception.
- throw new IllegalStateException(messageBuilder.toString());
+ return new CoderOrFailure<>(null, messageBuilder.toString());
+ }
+
+ private static class CoderOrFailure<T> {
+ @Nullable private final Coder<T> coder;
+ @Nullable private final String failure;
+
+ public CoderOrFailure(@Nullable Coder<T> coder, @Nullable String failure) {
+ this.coder = coder;
+ this.failure = failure;
+ }
}
}
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/99c49b04/sdks/java/core/src/test/java/org/apache/beam/sdk/runners/TransformHierarchyTest.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/test/java/org/apache/beam/sdk/runners/TransformHierarchyTest.java b/sdks/java/core/src/test/java/org/apache/beam/sdk/runners/TransformHierarchyTest.java
index d790d39..d373caf 100644
--- a/sdks/java/core/src/test/java/org/apache/beam/sdk/runners/TransformHierarchyTest.java
+++ b/sdks/java/core/src/test/java/org/apache/beam/sdk/runners/TransformHierarchyTest.java
@@ -24,6 +24,7 @@ import static org.junit.Assert.assertThat;
import com.google.common.base.Function;
import com.google.common.collect.Lists;
+import java.io.Serializable;
import java.util.HashSet;
import java.util.List;
import java.util.Set;
@@ -32,9 +33,9 @@ import org.apache.beam.sdk.io.CountingSource;
import org.apache.beam.sdk.io.Read;
import org.apache.beam.sdk.testing.TestPipeline;
import org.apache.beam.sdk.transforms.Create;
-import org.apache.beam.sdk.transforms.MapElements;
+import org.apache.beam.sdk.transforms.DoFn;
import org.apache.beam.sdk.transforms.PTransform;
-import org.apache.beam.sdk.transforms.SimpleFunction;
+import org.apache.beam.sdk.transforms.ParDo;
import org.apache.beam.sdk.util.WindowingStrategy;
import org.apache.beam.sdk.values.PBegin;
import org.apache.beam.sdk.values.PCollection;
@@ -54,13 +55,11 @@ import org.junit.runners.JUnit4;
* Tests for {@link TransformHierarchy}.
*/
@RunWith(JUnit4.class)
-public class TransformHierarchyTest {
-
- @Rule public final TestPipeline pipeline = TestPipeline.create();
- @Rule public ExpectedException thrown = ExpectedException.none();
-
- private TransformHierarchy hierarchy;
+public class TransformHierarchyTest implements Serializable {
+ @Rule public final transient TestPipeline pipeline = TestPipeline.create();
+ @Rule public transient ExpectedException thrown = ExpectedException.none();
+ private transient TransformHierarchy hierarchy;
@Before
public void setup() {
@@ -162,18 +161,21 @@ public class TransformHierarchyTest {
PCollection.createPrimitiveOutputInternal(
pipeline, WindowingStrategy.globalDefault(), IsBounded.BOUNDED);
- MapElements<Long, Long> map = MapElements.via(new SimpleFunction<Long, Long>() {
- @Override
- public Long apply(Long input) {
- return input;
- }
- });
+ ParDo.Bound<Long, Long> pardo =
+ ParDo.of(
+ new DoFn<Long, Long>() {
+ @ProcessElement
+ public void processElement(ProcessContext ctxt) {
+ ctxt.output(ctxt.element());
+ }
+ });
PCollection<Long> mapped =
PCollection.createPrimitiveOutputInternal(
pipeline, WindowingStrategy.globalDefault(), IsBounded.BOUNDED);
TransformHierarchy.Node compositeNode = hierarchy.pushNode("Create", begin, create);
+ hierarchy.finishSpecifyingInput();
assertThat(hierarchy.getCurrent(), equalTo(compositeNode));
assertThat(compositeNode.getInputs(), Matchers.emptyIterable());
assertThat(compositeNode.getTransform(), Matchers.<PTransform<?, ?>>equalTo(create));
@@ -183,6 +185,7 @@ public class TransformHierarchyTest {
TransformHierarchy.Node primitiveNode = hierarchy.pushNode("Create/Read", begin, read);
assertThat(hierarchy.getCurrent(), equalTo(primitiveNode));
+ hierarchy.finishSpecifyingInput();
hierarchy.setOutput(created);
hierarchy.popNode();
assertThat(
@@ -199,7 +202,8 @@ public class TransformHierarchyTest {
assertThat(hierarchy.getProducer(created), equalTo(primitiveNode));
hierarchy.popNode();
- TransformHierarchy.Node otherPrimitive = hierarchy.pushNode("ParDo", created, map);
+ TransformHierarchy.Node otherPrimitive = hierarchy.pushNode("ParDo", created, pardo);
+ hierarchy.finishSpecifyingInput();
hierarchy.setOutput(mapped);
hierarchy.popNode();
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/99c49b04/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/ParDoTest.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/ParDoTest.java b/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/ParDoTest.java
index fa8874c..d95b2d0 100644
--- a/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/ParDoTest.java
+++ b/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/ParDoTest.java
@@ -1147,15 +1147,16 @@ public class ParDoTest implements Serializable {
final TupleTag<Integer> mainOutputTag = new TupleTag<Integer>("main");
final TupleTag<TestDummy> sideOutputTag = new TupleTag<TestDummy>("unregisteredSide");
- PCollectionTuple outputTuple = input.apply(ParDo.of(new SideOutputDummyFn(sideOutputTag))
- .withOutputTags(mainOutputTag, TupleTagList.of(sideOutputTag)));
+ ParDo.BoundMulti<Integer, Integer> pardo = ParDo.of(new SideOutputDummyFn(sideOutputTag))
+ .withOutputTags(mainOutputTag, TupleTagList.of(sideOutputTag));
+ PCollectionTuple outputTuple = input.apply(pardo);
outputTuple.get(sideOutputTag).setCoder(new TestDummyCoder());
outputTuple.get(sideOutputTag).apply(View.<TestDummy>asSingleton());
assertEquals(new TestDummyCoder(), outputTuple.get(sideOutputTag).getCoder());
- outputTuple.get(sideOutputTag).finishSpecifyingOutput(); // Check for crashes
+ outputTuple.get(sideOutputTag).finishSpecifyingOutput(input, pardo); // Check for crashes
assertEquals(new TestDummyCoder(),
outputTuple.get(sideOutputTag).getCoder()); // Check for corruption
}
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/99c49b04/sdks/java/core/src/test/java/org/apache/beam/sdk/values/TypedPValueTest.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/test/java/org/apache/beam/sdk/values/TypedPValueTest.java b/sdks/java/core/src/test/java/org/apache/beam/sdk/values/TypedPValueTest.java
index 8381f12..5e7cc7d 100644
--- a/sdks/java/core/src/test/java/org/apache/beam/sdk/values/TypedPValueTest.java
+++ b/sdks/java/core/src/test/java/org/apache/beam/sdk/values/TypedPValueTest.java
@@ -159,13 +159,16 @@ public class TypedPValueTest {
@Test
public void testFinishSpecifyingShouldFailIfNoCoderInferrable() {
+ p.enableAbandonedNodeEnforcement(false);
+ PCollection<Integer> created = p.apply(Create.of(1, 2, 3));
+ ParDo.Bound<Integer, EmptyClass> uninferrableParDo = ParDo.of(new EmptyClassDoFn());
PCollection<EmptyClass> unencodable =
- p.apply(Create.of(1, 2, 3)).apply(ParDo.of(new EmptyClassDoFn()));
+ created.apply(uninferrableParDo);
thrown.expect(IllegalStateException.class);
thrown.expectMessage("Unable to return a default Coder");
thrown.expectMessage("Inferring a Coder from the CoderRegistry failed");
- unencodable.finishSpecifying();
+ unencodable.finishSpecifying(created, uninferrableParDo);
}
}