You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@beam.apache.org by lc...@apache.org on 2018/12/08 00:14:38 UTC
[beam] branch master updated: [BEAM-6195] Make
ProcessRemoteBundleOperation map PCollectionId into correct OutputReceiver
and throws Exception when there is more than one input PCollection. (#7223)
This is an automated email from the ASF dual-hosted git repository.
lcwik pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/beam.git
The following commit(s) were added to refs/heads/master by this push:
new 17968a2 [BEAM-6195] Make ProcessRemoteBundleOperation map PCollectionId into correct OutputReceiver and throws Exception when there is more than one input PCollection. (#7223)
17968a2 is described below
commit 17968a27c02d990f4880e8247a88508ff83c6cfd
Author: Boyuan Zhang <36...@users.noreply.github.com>
AuthorDate: Fri Dec 7 16:14:31 2018 -0800
[BEAM-6195] Make ProcessRemoteBundleOperation map PCollectionId into correct OutputReceiver and throws Exception when there is more than one input PCollection. (#7223)
---
.../dataflow/worker/BatchDataflowWorker.java | 2 +-
.../worker/BeamFnMapTaskExecutorFactory.java | 17 ++---
.../worker/IntrinsicMapTaskExecutorFactory.java | 2 +-
.../dataflow/worker/StreamingDataflowWorker.java | 2 +-
.../fn/control/ProcessRemoteBundleOperation.java | 20 +++---
.../graph/CloneAmbiguousFlattensFunction.java | 9 ++-
.../graph/CreateExecutableStageNodeFunction.java | 6 +-
.../graph/CreateRegisterFnOperationFunction.java | 6 +-
...nsertFetchAndFilterStreamingSideInputNodes.java | 3 +-
.../worker/graph/LengthPrefixUnknownCoders.java | 4 +-
.../worker/graph/MapTaskToNetworkFunction.java | 11 +++-
.../beam/runners/dataflow/worker/graph/Nodes.java | 19 ++++--
.../IntrinsicMapTaskExecutorFactoryTest.java | 18 ++++--
.../graph/CloneAmbiguousFlattensFunctionTest.java | 2 +-
.../CreateRegisterFnOperationFunctionTest.java | 2 +-
.../graph/DeduceFlattenLocationsFunctionTest.java | 2 +-
.../graph/DeduceNodeLocationsFunctionTest.java | 4 +-
...tFetchAndFilterStreamingSideInputNodesTest.java | 7 +-
.../graph/LengthPrefixUnknownCodersTest.java | 2 +-
.../worker/graph/MapTaskToNetworkFunctionTest.java | 25 +++++---
.../runners/dataflow/worker/graph/NodesTest.java | 16 +++--
.../RemoveFlattenInstructionsFunctionTest.java | 74 +++++++++++++++-------
.../ReplacePgbkWithPrecombineFunctionTest.java | 2 +-
23 files changed, 168 insertions(+), 87 deletions(-)
diff --git a/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/BatchDataflowWorker.java b/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/BatchDataflowWorker.java
index f0e8ddb..00504d2 100644
--- a/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/BatchDataflowWorker.java
+++ b/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/BatchDataflowWorker.java
@@ -97,7 +97,7 @@ public class BatchDataflowWorker implements Closeable {
*/
private static final Function<MapTask, MutableNetwork<Node, Edge>> mapTaskToBaseNetwork =
new FixMultiOutputInfosOnParDoInstructions(idGenerator)
- .andThen(new MapTaskToNetworkFunction());
+ .andThen(new MapTaskToNetworkFunction(idGenerator));
/** Registry of known {@link ReaderFactory ReaderFactories}. */
private final ReaderRegistry readerRegistry = ReaderRegistry.defaultRegistry();
diff --git a/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/BeamFnMapTaskExecutorFactory.java b/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/BeamFnMapTaskExecutorFactory.java
index 78bba56..f2d31e8 100644
--- a/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/BeamFnMapTaskExecutorFactory.java
+++ b/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/BeamFnMapTaskExecutorFactory.java
@@ -37,11 +37,11 @@ import com.google.common.graph.MutableNetwork;
import com.google.common.graph.Network;
import java.util.ArrayList;
import java.util.Arrays;
+import java.util.HashMap;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.function.Function;
-import java.util.stream.Collectors;
import org.apache.beam.model.fnexecution.v1.BeamFnApi.Target;
import org.apache.beam.model.pipeline.v1.Endpoints;
import org.apache.beam.runners.core.ElementByteSizeObservable;
@@ -338,19 +338,20 @@ public class BeamFnMapTaskExecutorFactory implements DataflowMapTaskExecutorFact
Iterable<OutputReceiverNode> outputReceiverNodes =
Iterables.filter(network.successors(input), OutputReceiverNode.class);
- OutputReceiver[] outputReceivers = new OutputReceiver[Iterables.size(outputReceiverNodes)];
+ Map<String, OutputReceiver> outputReceiverMap = new HashMap<>();
Lists.newArrayList(outputReceiverNodes)
.stream()
- .map(outputReceiverNode -> outputReceiverNode.getOutputReceiver())
- .collect(Collectors.toList())
- .toArray(outputReceivers);
-
+ .forEach(
+ outputReceiverNode ->
+ outputReceiverMap.put(
+ outputReceiverNode.getPcollectionId(),
+ outputReceiverNode.getOutputReceiver()));
return OperationNode.create(
new ProcessRemoteBundleOperation(
executionContext.createOperationContext(
NameContext.create(stageName, stageName, stageName, stageName)),
stageBundleFactory,
- outputReceivers));
+ outputReceiverMap));
}
};
}
@@ -667,7 +668,7 @@ public class BeamFnMapTaskExecutorFactory implements DataflowMapTaskExecutorFact
cloudOutput.getName()));
outputReceiver.addOutputCounter(outputCounter);
- return OutputReceiverNode.create(outputReceiver, coder);
+ return OutputReceiverNode.create(outputReceiver, coder, input.getPcollectionId());
}
};
}
diff --git a/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/IntrinsicMapTaskExecutorFactory.java b/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/IntrinsicMapTaskExecutorFactory.java
index 4ba66ba..6285dfd 100644
--- a/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/IntrinsicMapTaskExecutorFactory.java
+++ b/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/IntrinsicMapTaskExecutorFactory.java
@@ -357,7 +357,7 @@ public class IntrinsicMapTaskExecutorFactory implements DataflowMapTaskExecutorF
cloudOutput.getName()));
outputReceiver.addOutputCounter(outputCounter);
- return OutputReceiverNode.create(outputReceiver, coder);
+ return OutputReceiverNode.create(outputReceiver, coder, input.getPcollectionId());
}
};
}
diff --git a/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/StreamingDataflowWorker.java b/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/StreamingDataflowWorker.java
index d6de907..f32fa746 100644
--- a/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/StreamingDataflowWorker.java
+++ b/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/StreamingDataflowWorker.java
@@ -162,7 +162,7 @@ public class StreamingDataflowWorker {
* </ul>
*/
private static final Function<MapTask, MutableNetwork<Node, Edge>> mapTaskToBaseNetwork =
- new MapTaskToNetworkFunction();
+ new MapTaskToNetworkFunction(idGenerator);
// Maximum number of threads for processing. Currently each thread processes one key at a time.
static final int MAX_PROCESSING_THREADS = 300;
diff --git a/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/fn/control/ProcessRemoteBundleOperation.java b/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/fn/control/ProcessRemoteBundleOperation.java
index ae8a13e..8161970 100644
--- a/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/fn/control/ProcessRemoteBundleOperation.java
+++ b/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/fn/control/ProcessRemoteBundleOperation.java
@@ -19,6 +19,7 @@ package org.apache.beam.runners.dataflow.worker.fn.control;
import com.google.common.collect.Iterables;
import java.io.Closeable;
+import java.util.Map;
import org.apache.beam.runners.dataflow.worker.util.common.worker.OperationContext;
import org.apache.beam.runners.dataflow.worker.util.common.worker.OutputReceiver;
import org.apache.beam.runners.dataflow.worker.util.common.worker.ReceivingOperation;
@@ -41,15 +42,15 @@ import org.slf4j.LoggerFactory;
public class ProcessRemoteBundleOperation<InputT> extends ReceivingOperation {
private static final Logger LOG = LoggerFactory.getLogger(ProcessRemoteBundleOperation.class);
private final StageBundleFactory stageBundleFactory;
+ private static final OutputReceiver[] EMPTY_RECEIVER_ARRAY = new OutputReceiver[0];
+ private final Map<String, OutputReceiver> outputReceiverMap;
private final OutputReceiverFactory receiverFactory =
new OutputReceiverFactory() {
@Override
public FnDataReceiver<?> create(String pCollectionId) {
return receivedElement -> {
- for (OutputReceiver receiver : receivers) {
- LOG.debug("Consume element {}", receivedElement);
- receiver.process((WindowedValue<?>) receivedElement);
- }
+ LOG.debug("Consume element {}", receivedElement);
+ outputReceiverMap.get(pCollectionId).process((WindowedValue<?>) receivedElement);
};
}
};
@@ -58,11 +59,14 @@ public class ProcessRemoteBundleOperation<InputT> extends ReceivingOperation {
private RemoteBundle remoteBundle;
public ProcessRemoteBundleOperation(
- OperationContext context, StageBundleFactory stageBundleFactory, OutputReceiver[] receivers) {
- super(receivers, context);
+ OperationContext context,
+ StageBundleFactory stageBundleFactory,
+ Map<String, OutputReceiver> outputReceiverMap) {
+ super(EMPTY_RECEIVER_ARRAY, context);
this.stageBundleFactory = stageBundleFactory;
- stateRequestHandler = StateRequestHandler.unsupported();
- progressHandler = BundleProgressHandler.ignored();
+ this.outputReceiverMap = outputReceiverMap;
+ this.stateRequestHandler = StateRequestHandler.unsupported();
+ this.progressHandler = BundleProgressHandler.ignored();
}
@Override
diff --git a/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/graph/CloneAmbiguousFlattensFunction.java b/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/graph/CloneAmbiguousFlattensFunction.java
index 7b356e8..04edc4e 100644
--- a/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/graph/CloneAmbiguousFlattensFunction.java
+++ b/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/graph/CloneAmbiguousFlattensFunction.java
@@ -89,21 +89,24 @@ public class CloneAmbiguousFlattensFunction
*/
private void cloneFlatten(Node flatten, MutableNetwork<Node, Edge> network) {
// Start by creating the clones of the flatten and its PCollection.
- Node flattenOut = Iterables.getOnlyElement(network.successors(flatten));
+ InstructionOutputNode flattenOut =
+ (InstructionOutputNode) Iterables.getOnlyElement(network.successors(flatten));
ParallelInstruction flattenInstruction =
((ParallelInstructionNode) flatten).getParallelInstruction();
Node runnerFlatten =
ParallelInstructionNode.create(flattenInstruction, ExecutionLocation.RUNNER_HARNESS);
Node runnerFlattenOut =
- InstructionOutputNode.create(((InstructionOutputNode) flattenOut).getInstructionOutput());
+ InstructionOutputNode.create(
+ flattenOut.getInstructionOutput(), flattenOut.getPcollectionId());
network.addNode(runnerFlatten);
network.addNode(runnerFlattenOut);
Node sdkFlatten =
ParallelInstructionNode.create(flattenInstruction, ExecutionLocation.SDK_HARNESS);
Node sdkFlattenOut =
- InstructionOutputNode.create(((InstructionOutputNode) flattenOut).getInstructionOutput());
+ InstructionOutputNode.create(
+ flattenOut.getInstructionOutput(), flattenOut.getPcollectionId());
network.addNode(sdkFlatten);
network.addNode(sdkFlattenOut);
diff --git a/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/graph/CreateExecutableStageNodeFunction.java b/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/graph/CreateExecutableStageNodeFunction.java
index c93e9e9..81d0295 100644
--- a/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/graph/CreateExecutableStageNodeFunction.java
+++ b/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/graph/CreateExecutableStageNodeFunction.java
@@ -227,7 +227,7 @@ public class CreateExecutableStageNodeFunction
e);
}
- String pcollectionId = "generatedPcollection" + idGenerator.getId();
+ String pcollectionId = node.getPcollectionId();
RunnerApi.PCollection pCollection =
RunnerApi.PCollection.newBuilder()
.setCoderId(coderId)
@@ -351,6 +351,10 @@ public class CreateExecutableStageNodeFunction
executableStageTransforms.add(PipelineNode.pTransform(ptransformId, pTransform.build()));
}
+ if (executableStageInputs.size() != 1) {
+ throw new UnsupportedOperationException("ExecutableStage only support one input PCollection");
+ }
+
PCollectionNode executableInput = executableStageInputs.iterator().next();
RunnerApi.Components executableStageComponents = componentsBuilder.build();
diff --git a/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/graph/CreateRegisterFnOperationFunction.java b/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/graph/CreateRegisterFnOperationFunction.java
index 5a1dffb..996a3e2 100644
--- a/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/graph/CreateRegisterFnOperationFunction.java
+++ b/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/graph/CreateRegisterFnOperationFunction.java
@@ -261,9 +261,11 @@ public class CreateRegisterFnOperationFunction
Set<Node> successors) {
InstructionOutputNode newPredecessorOutputNode =
- InstructionOutputNode.create(outputNode.getInstructionOutput());
+ InstructionOutputNode.create(
+ outputNode.getInstructionOutput(), outputNode.getPcollectionId());
InstructionOutputNode portOutputNode =
- InstructionOutputNode.create(outputNode.getInstructionOutput());
+ InstructionOutputNode.create(
+ outputNode.getInstructionOutput(), outputNode.getPcollectionId());
String predecessorPortEdgeId = idGenerator.getId();
String successorPortEdgeId = idGenerator.getId();
Node portNode = portSupplier.apply(predecessorPortEdgeId, successorPortEdgeId);
diff --git a/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/graph/InsertFetchAndFilterStreamingSideInputNodes.java b/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/graph/InsertFetchAndFilterStreamingSideInputNodes.java
index fab79af..2803621 100644
--- a/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/graph/InsertFetchAndFilterStreamingSideInputNodes.java
+++ b/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/graph/InsertFetchAndFilterStreamingSideInputNodes.java
@@ -158,7 +158,8 @@ public class InsertFetchAndFilterStreamingSideInputNodes {
InstructionOutputNode predecessor =
(InstructionOutputNode) network.incidentNodes(mainInput).source();
InstructionOutputNode predecessorCopy =
- InstructionOutputNode.create(predecessor.getInstructionOutput());
+ InstructionOutputNode.create(
+ predecessor.getInstructionOutput(), predecessor.getPcollectionId());
network.removeEdge(mainInput);
network.addNode(streamingSideInputWindowHandlerNode);
network.addNode(predecessorCopy);
diff --git a/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/graph/LengthPrefixUnknownCoders.java b/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/graph/LengthPrefixUnknownCoders.java
index 398037e..0487438 100644
--- a/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/graph/LengthPrefixUnknownCoders.java
+++ b/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/graph/LengthPrefixUnknownCoders.java
@@ -153,7 +153,7 @@ public class LengthPrefixUnknownCoders {
e);
}
}
- return InstructionOutputNode.create(cloudOutput);
+ return InstructionOutputNode.create(cloudOutput, input.getPcollectionId());
}
};
}
@@ -179,7 +179,7 @@ public class LengthPrefixUnknownCoders {
input.getInstructionOutput()),
e);
}
- return InstructionOutputNode.create(instructionOutput);
+ return InstructionOutputNode.create(instructionOutput, input.getPcollectionId());
}
};
}
diff --git a/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/graph/MapTaskToNetworkFunction.java b/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/graph/MapTaskToNetworkFunction.java
index 0856cc8..b9212bc 100644
--- a/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/graph/MapTaskToNetworkFunction.java
+++ b/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/graph/MapTaskToNetworkFunction.java
@@ -36,6 +36,7 @@ import org.apache.beam.runners.dataflow.worker.graph.Edges.MultiOutputInfoEdge;
import org.apache.beam.runners.dataflow.worker.graph.Nodes.InstructionOutputNode;
import org.apache.beam.runners.dataflow.worker.graph.Nodes.Node;
import org.apache.beam.runners.dataflow.worker.graph.Nodes.ParallelInstructionNode;
+import org.apache.beam.sdk.fn.IdGenerator;
import org.apache.beam.sdk.util.Transport;
/**
@@ -63,6 +64,12 @@ public class MapTaskToNetworkFunction implements Function<MapTask, MutableNetwor
}
}
+ private final IdGenerator idGenerator;
+
+ public MapTaskToNetworkFunction(IdGenerator idGenerator) {
+ this.idGenerator = idGenerator;
+ }
+
@Override
public MutableNetwork<Node, Edge> apply(MapTask mapTask) {
List<ParallelInstruction> parallelInstructions = Apiary.listOrEmpty(mapTask.getInstructions());
@@ -98,7 +105,9 @@ public class MapTaskToNetworkFunction implements Function<MapTask, MutableNetwor
// Connect the instruction node output to the output PCollection node
for (int j = 0; j < outputs.size(); ++j) {
InstructionOutput instructionOutput = outputs.get(j);
- InstructionOutputNode outputNode = InstructionOutputNode.create(instructionOutput);
+ InstructionOutputNode outputNode =
+ InstructionOutputNode.create(
+ instructionOutput, "generatedPcollection" + this.idGenerator.getId());
network.addNode(outputNode);
if (parallelInstruction.getParDo() != null) {
network.addEdge(
diff --git a/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/graph/Nodes.java b/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/graph/Nodes.java
index 5d058a97..a328e22 100644
--- a/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/graph/Nodes.java
+++ b/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/graph/Nodes.java
@@ -206,20 +206,25 @@ public class Nodes {
public abstract ExecutionLocation getExecutionLocation();
}
- /** A node that stores {@link InstructionOutput}s. */
+ /** A node that stores {@link InstructionOutput}s with the corresponding . */
@AutoValue
public abstract static class InstructionOutputNode extends Node {
- public static InstructionOutputNode create(InstructionOutput instructionOutput) {
+ public static InstructionOutputNode create(
+ InstructionOutput instructionOutput, String pcollectionId) {
checkNotNull(instructionOutput);
- return new AutoValue_Nodes_InstructionOutputNode(instructionOutput);
+ checkNotNull(pcollectionId);
+ return new AutoValue_Nodes_InstructionOutputNode(instructionOutput, pcollectionId);
}
public abstract InstructionOutput getInstructionOutput();
+ public abstract String getPcollectionId();
+
@Override
public String toString() {
return MoreObjects.toStringHelper(this)
.add("instructionOutput", toStringWithTrimmedLiterals(getInstructionOutput()))
+ .add("pcollectionId", getPcollectionId())
.toString();
}
}
@@ -227,14 +232,18 @@ public class Nodes {
/** A node that stores {@link OutputReceiver}s. */
@AutoValue
public abstract static class OutputReceiverNode extends Node {
- public static OutputReceiverNode create(OutputReceiver outputReceiver, Coder<?> coder) {
+ public static OutputReceiverNode create(
+ OutputReceiver outputReceiver, Coder<?> coder, String pcollectionId) {
checkNotNull(outputReceiver);
- return new AutoValue_Nodes_OutputReceiverNode(outputReceiver, coder);
+ checkNotNull(pcollectionId);
+ return new AutoValue_Nodes_OutputReceiverNode(outputReceiver, coder, pcollectionId);
}
public abstract OutputReceiver getOutputReceiver();
public abstract Coder<?> getCoder();
+
+ public abstract String getPcollectionId();
}
/** A node that stores {@link Operation}s. */
diff --git a/runners/google-cloud-dataflow-java/worker/src/test/java/org/apache/beam/runners/dataflow/worker/IntrinsicMapTaskExecutorFactoryTest.java b/runners/google-cloud-dataflow-java/worker/src/test/java/org/apache/beam/runners/dataflow/worker/IntrinsicMapTaskExecutorFactoryTest.java
index 182ad50..95836c5 100644
--- a/runners/google-cloud-dataflow-java/worker/src/test/java/org/apache/beam/runners/dataflow/worker/IntrinsicMapTaskExecutorFactoryTest.java
+++ b/runners/google-cloud-dataflow-java/worker/src/test/java/org/apache/beam/runners/dataflow/worker/IntrinsicMapTaskExecutorFactoryTest.java
@@ -120,7 +120,7 @@ public class IntrinsicMapTaskExecutorFactoryTest {
private static final Function<MapTask, MutableNetwork<Node, Edge>> mapTaskToNetwork =
new FixMultiOutputInfosOnParDoInstructions(idGenerator)
- .andThen(new MapTaskToNetworkFunction());
+ .andThen(new MapTaskToNetworkFunction(idGenerator));
private static final CloudObject windowedStringCoder =
CloudObjects.asCloudObject(
@@ -130,6 +130,8 @@ public class IntrinsicMapTaskExecutorFactoryTest {
private PipelineOptions options;
private ReaderRegistry readerRegistry;
private SinkRegistry sinkRegistry;
+ private static final String PCOLLECTION_ID = "fakeId";
+
@Mock private Network<Node, Edge> network;
@Mock private CounterUpdateExtractor<?> updateExtractor;
@@ -326,7 +328,8 @@ public class IntrinsicMapTaskExecutorFactoryTest {
IntrinsicMapTaskExecutorFactory.createOutputReceiversTransform(STAGE, counterSet)
.apply(
InstructionOutputNode.create(
- instructionNode.getParallelInstruction().getOutputs().get(0)))));
+ instructionNode.getParallelInstruction().getOutputs().get(0),
+ PCOLLECTION_ID))));
when(network.outDegree(instructionNode)).thenReturn(1);
Node operationNode =
@@ -524,7 +527,7 @@ public class IntrinsicMapTaskExecutorFactoryTest {
IntrinsicMapTaskExecutorFactory.createOutputReceiversTransform(STAGE, counterSet)
.apply(
InstructionOutputNode.create(
- instructionNode.getParallelInstruction().getOutputs().get(0)));
+ instructionNode.getParallelInstruction().getOutputs().get(0), PCOLLECTION_ID));
when(network.successors(instructionNode)).thenReturn(ImmutableSet.of(outputReceiverNode));
when(network.outDegree(instructionNode)).thenReturn(1);
@@ -601,7 +604,8 @@ public class IntrinsicMapTaskExecutorFactoryTest {
IntrinsicMapTaskExecutorFactory.createOutputReceiversTransform(STAGE, counterSet)
.apply(
InstructionOutputNode.create(
- instructionNode.getParallelInstruction().getOutputs().get(0)))));
+ instructionNode.getParallelInstruction().getOutputs().get(0),
+ PCOLLECTION_ID))));
when(network.outDegree(instructionNode)).thenReturn(1);
Node operationNode =
@@ -652,7 +656,8 @@ public class IntrinsicMapTaskExecutorFactoryTest {
IntrinsicMapTaskExecutorFactory.createOutputReceiversTransform(STAGE, counterSet)
.apply(
InstructionOutputNode.create(
- instructionNode.getParallelInstruction().getOutputs().get(0)))));
+ instructionNode.getParallelInstruction().getOutputs().get(0),
+ PCOLLECTION_ID))));
when(network.outDegree(instructionNode)).thenReturn(1);
Node operationNode =
@@ -729,7 +734,8 @@ public class IntrinsicMapTaskExecutorFactoryTest {
IntrinsicMapTaskExecutorFactory.createOutputReceiversTransform(STAGE, counterSet)
.apply(
InstructionOutputNode.create(
- instructionNode.getParallelInstruction().getOutputs().get(0)))));
+ instructionNode.getParallelInstruction().getOutputs().get(0),
+ PCOLLECTION_ID))));
when(network.outDegree(instructionNode)).thenReturn(1);
Node operationNode =
diff --git a/runners/google-cloud-dataflow-java/worker/src/test/java/org/apache/beam/runners/dataflow/worker/graph/CloneAmbiguousFlattensFunctionTest.java b/runners/google-cloud-dataflow-java/worker/src/test/java/org/apache/beam/runners/dataflow/worker/graph/CloneAmbiguousFlattensFunctionTest.java
index b74a287..04f1214 100644
--- a/runners/google-cloud-dataflow-java/worker/src/test/java/org/apache/beam/runners/dataflow/worker/graph/CloneAmbiguousFlattensFunctionTest.java
+++ b/runners/google-cloud-dataflow-java/worker/src/test/java/org/apache/beam/runners/dataflow/worker/graph/CloneAmbiguousFlattensFunctionTest.java
@@ -382,7 +382,7 @@ public final class CloneAmbiguousFlattensFunctionTest {
/** Creates an {@link InstructionOutputNode} to act as a PCollection. */
private static InstructionOutputNode createPCollection(String name) {
- return InstructionOutputNode.create(new InstructionOutput().setName(name));
+ return InstructionOutputNode.create(new InstructionOutput().setName(name), "fakeId");
}
/** Creates a {@link NoLocationNode} to use for testing nodes that have no ExecutionLocation */
diff --git a/runners/google-cloud-dataflow-java/worker/src/test/java/org/apache/beam/runners/dataflow/worker/graph/CreateRegisterFnOperationFunctionTest.java b/runners/google-cloud-dataflow-java/worker/src/test/java/org/apache/beam/runners/dataflow/worker/graph/CreateRegisterFnOperationFunctionTest.java
index 87d4035..ec24fd7 100644
--- a/runners/google-cloud-dataflow-java/worker/src/test/java/org/apache/beam/runners/dataflow/worker/graph/CreateRegisterFnOperationFunctionTest.java
+++ b/runners/google-cloud-dataflow-java/worker/src/test/java/org/apache/beam/runners/dataflow/worker/graph/CreateRegisterFnOperationFunctionTest.java
@@ -535,7 +535,7 @@ public class CreateRegisterFnOperationFunctionTest {
}
private static InstructionOutputNode createInstructionOutputNode(String name) {
- return InstructionOutputNode.create(new InstructionOutput().setName(name));
+ return InstructionOutputNode.create(new InstructionOutput().setName(name), "fakeId");
}
/** A named node to easily differentiate graph construction problems during testing. */
diff --git a/runners/google-cloud-dataflow-java/worker/src/test/java/org/apache/beam/runners/dataflow/worker/graph/DeduceFlattenLocationsFunctionTest.java b/runners/google-cloud-dataflow-java/worker/src/test/java/org/apache/beam/runners/dataflow/worker/graph/DeduceFlattenLocationsFunctionTest.java
index 44fd072..c1c5df0 100644
--- a/runners/google-cloud-dataflow-java/worker/src/test/java/org/apache/beam/runners/dataflow/worker/graph/DeduceFlattenLocationsFunctionTest.java
+++ b/runners/google-cloud-dataflow-java/worker/src/test/java/org/apache/beam/runners/dataflow/worker/graph/DeduceFlattenLocationsFunctionTest.java
@@ -381,7 +381,7 @@ public final class DeduceFlattenLocationsFunctionTest {
/** Creates an {@link InstructionOutputNode} to act as a PCollection. */
private static InstructionOutputNode createPCollection(String name) {
- return InstructionOutputNode.create(new InstructionOutput().setName(name));
+ return InstructionOutputNode.create(new InstructionOutput().setName(name), "fakeID");
}
private static ExecutionLocation getExecutionLocationOf(
diff --git a/runners/google-cloud-dataflow-java/worker/src/test/java/org/apache/beam/runners/dataflow/worker/graph/DeduceNodeLocationsFunctionTest.java b/runners/google-cloud-dataflow-java/worker/src/test/java/org/apache/beam/runners/dataflow/worker/graph/DeduceNodeLocationsFunctionTest.java
index 4717cca..7d8177d 100644
--- a/runners/google-cloud-dataflow-java/worker/src/test/java/org/apache/beam/runners/dataflow/worker/graph/DeduceNodeLocationsFunctionTest.java
+++ b/runners/google-cloud-dataflow-java/worker/src/test/java/org/apache/beam/runners/dataflow/worker/graph/DeduceNodeLocationsFunctionTest.java
@@ -198,9 +198,9 @@ public final class DeduceNodeLocationsFunctionTest {
// --> Flatten --> D
// B --> out2 --/-->C
Node a = createReadNode("A", CUSTOM_SOURCE);
- Node out1 = InstructionOutputNode.create(new InstructionOutput());
+ Node out1 = InstructionOutputNode.create(new InstructionOutput(), "fakeId");
Node b = createReadNode("B", RUNNER_SOURCE);
- Node out2 = InstructionOutputNode.create(new InstructionOutput());
+ Node out2 = InstructionOutputNode.create(new InstructionOutput(), "fakeId");
Node c = createParDoNode("C", "RunnerDoFn");
Node flatten =
ParallelInstructionNode.create(
diff --git a/runners/google-cloud-dataflow-java/worker/src/test/java/org/apache/beam/runners/dataflow/worker/graph/InsertFetchAndFilterStreamingSideInputNodesTest.java b/runners/google-cloud-dataflow-java/worker/src/test/java/org/apache/beam/runners/dataflow/worker/graph/InsertFetchAndFilterStreamingSideInputNodesTest.java
index 6f727ee..8d39fc3 100644
--- a/runners/google-cloud-dataflow-java/worker/src/test/java/org/apache/beam/runners/dataflow/worker/graph/InsertFetchAndFilterStreamingSideInputNodesTest.java
+++ b/runners/google-cloud-dataflow-java/worker/src/test/java/org/apache/beam/runners/dataflow/worker/graph/InsertFetchAndFilterStreamingSideInputNodesTest.java
@@ -93,7 +93,8 @@ public class InsertFetchAndFilterStreamingSideInputNodesTest {
RunnerApi.Pipeline pipeline = PipelineTranslation.toProto(p);
Node predecessor = createParDoNode("predecessor");
- InstructionOutputNode mainInput = InstructionOutputNode.create(new InstructionOutput());
+ InstructionOutputNode mainInput =
+ InstructionOutputNode.create(new InstructionOutput(), "fakeId");
Node sideInputParDo = createParDoNode(findParDoWithSideInput(pipeline));
MutableNetwork<Node, Edge> network = createEmptyNetwork();
@@ -106,7 +107,7 @@ public class InsertFetchAndFilterStreamingSideInputNodesTest {
Network<Node, Edge> inputNetwork = ImmutableNetwork.copyOf(network);
network = InsertFetchAndFilterStreamingSideInputNodes.with(pipeline).forNetwork(network);
- Node mainInputClone = InstructionOutputNode.create(mainInput.getInstructionOutput());
+ Node mainInputClone = InstructionOutputNode.create(mainInput.getInstructionOutput(), "fakeId");
Node fetchAndFilter =
FetchAndFilterStreamingSideInputsNode.create(
pcView.getWindowingStrategyInternal(),
@@ -139,7 +140,7 @@ public class InsertFetchAndFilterStreamingSideInputNodesTest {
RunnerApi.Pipeline pipeline = PipelineTranslation.toProto(p);
Node predecessor = createParDoNode("predecessor");
- Node mainInput = InstructionOutputNode.create(new InstructionOutput());
+ Node mainInput = InstructionOutputNode.create(new InstructionOutput(), "fakeId");
Node sideInputParDo = createParDoNode("noSideInput");
MutableNetwork<Node, Edge> network = createEmptyNetwork();
diff --git a/runners/google-cloud-dataflow-java/worker/src/test/java/org/apache/beam/runners/dataflow/worker/graph/LengthPrefixUnknownCodersTest.java b/runners/google-cloud-dataflow-java/worker/src/test/java/org/apache/beam/runners/dataflow/worker/graph/LengthPrefixUnknownCodersTest.java
index 661e394..0ee0d45 100644
--- a/runners/google-cloud-dataflow-java/worker/src/test/java/org/apache/beam/runners/dataflow/worker/graph/LengthPrefixUnknownCodersTest.java
+++ b/runners/google-cloud-dataflow-java/worker/src/test/java/org/apache/beam/runners/dataflow/worker/graph/LengthPrefixUnknownCodersTest.java
@@ -367,6 +367,6 @@ public class LengthPrefixUnknownCodersTest {
.setName(name)
.setCodec(CloudObjects.asCloudObject(coder, /*sdkComponents=*/ null));
instructionOutput.setFactory(new JacksonFactory());
- return InstructionOutputNode.create(instructionOutput);
+ return InstructionOutputNode.create(instructionOutput, "fakeId");
}
}
diff --git a/runners/google-cloud-dataflow-java/worker/src/test/java/org/apache/beam/runners/dataflow/worker/graph/MapTaskToNetworkFunctionTest.java b/runners/google-cloud-dataflow-java/worker/src/test/java/org/apache/beam/runners/dataflow/worker/graph/MapTaskToNetworkFunctionTest.java
index 19b4556..8daa494 100644
--- a/runners/google-cloud-dataflow-java/worker/src/test/java/org/apache/beam/runners/dataflow/worker/graph/MapTaskToNetworkFunctionTest.java
+++ b/runners/google-cloud-dataflow-java/worker/src/test/java/org/apache/beam/runners/dataflow/worker/graph/MapTaskToNetworkFunctionTest.java
@@ -47,6 +47,7 @@ import org.apache.beam.runners.dataflow.worker.graph.Edges.MultiOutputInfoEdge;
import org.apache.beam.runners.dataflow.worker.graph.Nodes.InstructionOutputNode;
import org.apache.beam.runners.dataflow.worker.graph.Nodes.Node;
import org.apache.beam.runners.dataflow.worker.graph.Nodes.ParallelInstructionNode;
+import org.apache.beam.sdk.fn.IdGenerators;
import org.apache.beam.sdk.util.Transport;
import org.hamcrest.Matchers;
import org.junit.Test;
@@ -58,7 +59,8 @@ import org.junit.runners.JUnit4;
public class MapTaskToNetworkFunctionTest {
@Test
public void testEmptyMapTask() {
- Network<Node, Edge> network = new MapTaskToNetworkFunction().apply(new MapTask());
+ Network<Node, Edge> network =
+ new MapTaskToNetworkFunction(IdGenerators.decrementingLongs()).apply(new MapTask());
assertTrue(network.isDirected());
assertTrue(network.allowsParallelEdges());
assertFalse(network.allowsSelfLoops());
@@ -75,7 +77,8 @@ public class MapTaskToNetworkFunctionTest {
mapTask.setInstructions(ImmutableList.of(read));
mapTask.setFactory(Transport.getJsonFactory());
- Network<Node, Edge> network = new MapTaskToNetworkFunction().apply(mapTask);
+ Network<Node, Edge> network =
+ new MapTaskToNetworkFunction(IdGenerators.decrementingLongs()).apply(mapTask);
assertNetworkProperties(network);
assertEquals(2, network.nodes().size());
assertEquals(1, network.edges().size());
@@ -103,7 +106,8 @@ public class MapTaskToNetworkFunctionTest {
mapTask.setInstructions(ImmutableList.of(read, parDo));
mapTask.setFactory(Transport.getJsonFactory());
- Network<Node, Edge> network = new MapTaskToNetworkFunction().apply(mapTask);
+ Network<Node, Edge> network =
+ new MapTaskToNetworkFunction(IdGenerators.decrementingLongs()).apply(mapTask);
assertNetworkProperties(network);
assertEquals(4, network.nodes().size());
assertEquals(3, network.edges().size());
@@ -149,7 +153,8 @@ public class MapTaskToNetworkFunctionTest {
mapTask.setInstructions(ImmutableList.of(readA, readB, flatten));
mapTask.setFactory(Transport.getJsonFactory());
- Network<Node, Edge> network = new MapTaskToNetworkFunction().apply(mapTask);
+ Network<Node, Edge> network =
+ new MapTaskToNetworkFunction(IdGenerators.decrementingLongs()).apply(mapTask);
assertNetworkProperties(network);
assertEquals(6, network.nodes().size());
assertEquals(5, network.edges().size());
@@ -193,7 +198,8 @@ public class MapTaskToNetworkFunctionTest {
mapTask.setInstructions(ImmutableList.of(read, flatten));
mapTask.setFactory(Transport.getJsonFactory());
- Network<Node, Edge> network = new MapTaskToNetworkFunction().apply(mapTask);
+ Network<Node, Edge> network =
+ new MapTaskToNetworkFunction(IdGenerators.decrementingLongs()).apply(mapTask);
assertNetworkProperties(network);
assertEquals(4, network.nodes().size());
assertEquals(5, network.edges().size());
@@ -225,7 +231,8 @@ public class MapTaskToNetworkFunctionTest {
mapTask.setInstructions(ImmutableList.of(read, write));
mapTask.setFactory(Transport.getJsonFactory());
- Network<Node, Edge> network = new MapTaskToNetworkFunction().apply(mapTask);
+ Network<Node, Edge> network =
+ new MapTaskToNetworkFunction(IdGenerators.decrementingLongs()).apply(mapTask);
assertNetworkProperties(network);
assertEquals(3, network.nodes().size());
assertEquals(2, network.edges().size());
@@ -260,7 +267,8 @@ public class MapTaskToNetworkFunctionTest {
mapTask.setInstructions(ImmutableList.of(read, pgbk, write));
mapTask.setFactory(Transport.getJsonFactory());
- Network<Node, Edge> network = new MapTaskToNetworkFunction().apply(mapTask);
+ Network<Node, Edge> network =
+ new MapTaskToNetworkFunction(IdGenerators.decrementingLongs()).apply(mapTask);
assertNetworkProperties(network);
assertEquals(5, network.nodes().size());
assertEquals(4, network.edges().size());
@@ -310,7 +318,8 @@ public class MapTaskToNetworkFunctionTest {
mapTask.setInstructions(ImmutableList.of(read, parDo, writeA, writeB));
mapTask.setFactory(Transport.getJsonFactory());
- Network<Node, Edge> network = new MapTaskToNetworkFunction().apply(mapTask);
+ Network<Node, Edge> network =
+ new MapTaskToNetworkFunction(IdGenerators.decrementingLongs()).apply(mapTask);
assertNetworkProperties(network);
assertEquals(7, network.nodes().size());
assertEquals(6, network.edges().size());
diff --git a/runners/google-cloud-dataflow-java/worker/src/test/java/org/apache/beam/runners/dataflow/worker/graph/NodesTest.java b/runners/google-cloud-dataflow-java/worker/src/test/java/org/apache/beam/runners/dataflow/worker/graph/NodesTest.java
index 6da18d7..b27520d 100644
--- a/runners/google-cloud-dataflow-java/worker/src/test/java/org/apache/beam/runners/dataflow/worker/graph/NodesTest.java
+++ b/runners/google-cloud-dataflow-java/worker/src/test/java/org/apache/beam/runners/dataflow/worker/graph/NodesTest.java
@@ -57,6 +57,8 @@ import org.junit.runners.JUnit4;
/** Tests for {@link Nodes}. */
@RunWith(JUnit4.class)
public class NodesTest {
+ private static final String PCOLLECTION_ID = "fakeId";
+
@Test
public void testParallelInstructionNode() {
ParallelInstruction param = new ParallelInstruction();
@@ -73,18 +75,22 @@ public class NodesTest {
@Test
public void testInstructionOutputNode() {
InstructionOutput param = new InstructionOutput();
- assertSame(param, InstructionOutputNode.create(param).getInstructionOutput());
- assertNotEquals(InstructionOutputNode.create(param), InstructionOutputNode.create(param));
+ assertSame(param, InstructionOutputNode.create(param, PCOLLECTION_ID).getInstructionOutput());
+ assertNotEquals(
+ InstructionOutputNode.create(param, PCOLLECTION_ID),
+ InstructionOutputNode.create(param, PCOLLECTION_ID));
}
@Test
public void testOutputReceiverNode() {
OutputReceiver receiver = new OutputReceiver();
Coder<?> coder = StringUtf8Coder.of();
- assertSame(receiver, OutputReceiverNode.create(receiver, coder).getOutputReceiver());
- assertSame(coder, OutputReceiverNode.create(receiver, coder).getCoder());
+ assertSame(
+ receiver, OutputReceiverNode.create(receiver, coder, PCOLLECTION_ID).getOutputReceiver());
+ assertSame(coder, OutputReceiverNode.create(receiver, coder, PCOLLECTION_ID).getCoder());
assertNotEquals(
- OutputReceiverNode.create(receiver, coder), OutputReceiverNode.create(receiver, coder));
+ OutputReceiverNode.create(receiver, coder, PCOLLECTION_ID),
+ OutputReceiverNode.create(receiver, coder, PCOLLECTION_ID));
}
@Test
diff --git a/runners/google-cloud-dataflow-java/worker/src/test/java/org/apache/beam/runners/dataflow/worker/graph/RemoveFlattenInstructionsFunctionTest.java b/runners/google-cloud-dataflow-java/worker/src/test/java/org/apache/beam/runners/dataflow/worker/graph/RemoveFlattenInstructionsFunctionTest.java
index bfe3983..b00fb29 100644
--- a/runners/google-cloud-dataflow-java/worker/src/test/java/org/apache/beam/runners/dataflow/worker/graph/RemoveFlattenInstructionsFunctionTest.java
+++ b/runners/google-cloud-dataflow-java/worker/src/test/java/org/apache/beam/runners/dataflow/worker/graph/RemoveFlattenInstructionsFunctionTest.java
@@ -46,6 +46,8 @@ import org.junit.runners.JUnit4;
/** Tests for {@link RemoveFlattenInstructionsFunction}. */
@RunWith(JUnit4.class)
public class RemoveFlattenInstructionsFunctionTest {
+ private static final String PCOLLECTION_ID = "fakeId";
+
@Test
public void testEmptyNetwork() {
assertTrue(
@@ -59,24 +61,28 @@ public class RemoveFlattenInstructionsFunctionTest {
Node a =
ParallelInstructionNode.create(
new ParallelInstruction().setName("A"), Nodes.ExecutionLocation.UNKNOWN);
- Node aPCollection = InstructionOutputNode.create(new InstructionOutput().setName("A.out"));
+ Node aPCollection =
+ InstructionOutputNode.create(new InstructionOutput().setName("A.out"), PCOLLECTION_ID);
Edge aOutput = DefaultEdge.create();
Node b =
ParallelInstructionNode.create(
new ParallelInstruction().setName("B"), Nodes.ExecutionLocation.UNKNOWN);
Edge bOutput = DefaultEdge.create();
- Node bPCollection = InstructionOutputNode.create(new InstructionOutput().setName("B.out"));
+ Node bPCollection =
+ InstructionOutputNode.create(new InstructionOutput().setName("B.out"), PCOLLECTION_ID);
Node flatten =
ParallelInstructionNode.create(
new ParallelInstruction().setName("Flatten").setFlatten(new FlattenInstruction()),
Nodes.ExecutionLocation.UNKNOWN);
Node flattenPCollection =
- InstructionOutputNode.create(new InstructionOutput().setName("Flatten.out"));
+ InstructionOutputNode.create(
+ new InstructionOutput().setName("Flatten.out"), PCOLLECTION_ID);
Node c =
ParallelInstructionNode.create(
new ParallelInstruction().setName("C"), Nodes.ExecutionLocation.UNKNOWN);
Edge cOutput = DefaultEdge.create();
- Node cPCollection = InstructionOutputNode.create(new InstructionOutput().setName("C.out"));
+ Node cPCollection =
+ InstructionOutputNode.create(new InstructionOutput().setName("C.out"), PCOLLECTION_ID);
// A --\
// Flatten --> C
@@ -109,9 +115,12 @@ public class RemoveFlattenInstructionsFunctionTest {
Node a =
ParallelInstructionNode.create(
new ParallelInstruction().setName("A"), Nodes.ExecutionLocation.UNKNOWN);
- Node aOut1PCollection = InstructionOutputNode.create(new InstructionOutput().setName("A.out1"));
- Node aOut2PCollection = InstructionOutputNode.create(new InstructionOutput().setName("A.out2"));
- Node aOut3PCollection = InstructionOutputNode.create(new InstructionOutput().setName("A.out3"));
+ Node aOut1PCollection =
+ InstructionOutputNode.create(new InstructionOutput().setName("A.out1"), PCOLLECTION_ID);
+ Node aOut2PCollection =
+ InstructionOutputNode.create(new InstructionOutput().setName("A.out2"), PCOLLECTION_ID);
+ Node aOut3PCollection =
+ InstructionOutputNode.create(new InstructionOutput().setName("A.out3"), PCOLLECTION_ID);
Edge aOut1 = MultiOutputInfoEdge.create(new MultiOutputInfo().setTag("out1"));
Edge aOut2 = MultiOutputInfoEdge.create(new MultiOutputInfo().setTag("out2"));
Edge aOut3 = MultiOutputInfoEdge.create(new MultiOutputInfo().setTag("out3"));
@@ -119,8 +128,10 @@ public class RemoveFlattenInstructionsFunctionTest {
Node b =
ParallelInstructionNode.create(
new ParallelInstruction().setName("B"), Nodes.ExecutionLocation.UNKNOWN);
- Node bOut1PCollection = InstructionOutputNode.create(new InstructionOutput().setName("B.out1"));
- Node bOut2PCollection = InstructionOutputNode.create(new InstructionOutput().setName("B.out1"));
+ Node bOut1PCollection =
+ InstructionOutputNode.create(new InstructionOutput().setName("B.out1"), PCOLLECTION_ID);
+ Node bOut2PCollection =
+ InstructionOutputNode.create(new InstructionOutput().setName("B.out1"), PCOLLECTION_ID);
Edge bOut1 = MultiOutputInfoEdge.create(new MultiOutputInfo().setTag("out1"));
Edge bOut2 = MultiOutputInfoEdge.create(new MultiOutputInfo().setTag("out2"));
Edge bOut1PCollectionEdge = DefaultEdge.create();
@@ -129,22 +140,26 @@ public class RemoveFlattenInstructionsFunctionTest {
new ParallelInstruction().setName("Flatten").setFlatten(new FlattenInstruction()),
Nodes.ExecutionLocation.UNKNOWN);
Node flattenPCollection =
- InstructionOutputNode.create(new InstructionOutput().setName("Flatten.out"));
+ InstructionOutputNode.create(
+ new InstructionOutput().setName("Flatten.out"), PCOLLECTION_ID);
Node c =
ParallelInstructionNode.create(
new ParallelInstruction().setName("C"), Nodes.ExecutionLocation.UNKNOWN);
Edge cOutput = DefaultEdge.create();
- Node cPCollection = InstructionOutputNode.create(new InstructionOutput().setName("C.out"));
+ Node cPCollection =
+ InstructionOutputNode.create(new InstructionOutput().setName("C.out"), PCOLLECTION_ID);
Node d =
ParallelInstructionNode.create(
new ParallelInstruction().setName("D"), Nodes.ExecutionLocation.UNKNOWN);
Edge dOutput = DefaultEdge.create();
- Node dPCollection = InstructionOutputNode.create(new InstructionOutput().setName("D.out"));
+ Node dPCollection =
+ InstructionOutputNode.create(new InstructionOutput().setName("D.out"), PCOLLECTION_ID);
Node e =
ParallelInstructionNode.create(
new ParallelInstruction().setName("E"), Nodes.ExecutionLocation.UNKNOWN);
Edge eOutput = DefaultEdge.create();
- Node ePCollection = InstructionOutputNode.create(new InstructionOutput().setName("E.out"));
+ Node ePCollection =
+ InstructionOutputNode.create(new InstructionOutput().setName("E.out"), PCOLLECTION_ID);
// /-out1-> C
// A -out2-\
@@ -196,13 +211,16 @@ public class RemoveFlattenInstructionsFunctionTest {
Node a =
ParallelInstructionNode.create(
new ParallelInstruction().setName("A"), Nodes.ExecutionLocation.UNKNOWN);
- Node aPCollection = InstructionOutputNode.create(new InstructionOutput().setName("A.out"));
+ Node aPCollection =
+ InstructionOutputNode.create(new InstructionOutput().setName("A.out"), PCOLLECTION_ID);
Edge aOutput = DefaultEdge.create();
Node b =
ParallelInstructionNode.create(
new ParallelInstruction().setName("B"), Nodes.ExecutionLocation.UNKNOWN);
- Node bOut1PCollection = InstructionOutputNode.create(new InstructionOutput().setName("B.out1"));
- Node bOut2PCollection = InstructionOutputNode.create(new InstructionOutput().setName("B.out1"));
+ Node bOut1PCollection =
+ InstructionOutputNode.create(new InstructionOutput().setName("B.out1"), PCOLLECTION_ID);
+ Node bOut2PCollection =
+ InstructionOutputNode.create(new InstructionOutput().setName("B.out1"), PCOLLECTION_ID);
Edge bOut1 = MultiOutputInfoEdge.create(new MultiOutputInfo().setTag("out1"));
Edge bOut2 = MultiOutputInfoEdge.create(new MultiOutputInfo().setTag("out2"));
Node flatten1 =
@@ -210,18 +228,21 @@ public class RemoveFlattenInstructionsFunctionTest {
new ParallelInstruction().setName("Flatten1").setFlatten(new FlattenInstruction()),
Nodes.ExecutionLocation.UNKNOWN);
Node flatten1PCollection =
- InstructionOutputNode.create(new InstructionOutput().setName("Flatten1.out"));
+ InstructionOutputNode.create(
+ new InstructionOutput().setName("Flatten1.out"), PCOLLECTION_ID);
Node flatten2 =
ParallelInstructionNode.create(
new ParallelInstruction().setName("Flatten2").setFlatten(new FlattenInstruction()),
Nodes.ExecutionLocation.UNKNOWN);
Node flatten2PCollection =
- InstructionOutputNode.create(new InstructionOutput().setName("Flatten2.out"));
+ InstructionOutputNode.create(
+ new InstructionOutput().setName("Flatten2.out"), PCOLLECTION_ID);
Node c =
ParallelInstructionNode.create(
new ParallelInstruction().setName("C"), Nodes.ExecutionLocation.UNKNOWN);
Edge cOutput = DefaultEdge.create();
- Node cPCollection = InstructionOutputNode.create(new InstructionOutput().setName("C.out"));
+ Node cPCollection =
+ InstructionOutputNode.create(new InstructionOutput().setName("C.out"), PCOLLECTION_ID);
// A ------\
// Flatten1 --\
@@ -262,29 +283,34 @@ public class RemoveFlattenInstructionsFunctionTest {
Node a =
ParallelInstructionNode.create(
new ParallelInstruction().setName("A"), Nodes.ExecutionLocation.UNKNOWN);
- Node aPCollection = InstructionOutputNode.create(new InstructionOutput().setName("A.out"));
+ Node aPCollection =
+ InstructionOutputNode.create(new InstructionOutput().setName("A.out"), PCOLLECTION_ID);
Edge aOutput = DefaultEdge.create();
Node b =
ParallelInstructionNode.create(
new ParallelInstruction().setName("B"), Nodes.ExecutionLocation.UNKNOWN);
Edge bOutput = DefaultEdge.create();
- Node bPCollection = InstructionOutputNode.create(new InstructionOutput().setName("B.out"));
+ Node bPCollection =
+ InstructionOutputNode.create(new InstructionOutput().setName("B.out"), PCOLLECTION_ID);
Node flatten =
ParallelInstructionNode.create(
new ParallelInstruction().setName("Flatten").setFlatten(new FlattenInstruction()),
Nodes.ExecutionLocation.UNKNOWN);
Node flattenPCollection =
- InstructionOutputNode.create(new InstructionOutput().setName("Flatten.out"));
+ InstructionOutputNode.create(
+ new InstructionOutput().setName("Flatten.out"), PCOLLECTION_ID);
Node c =
ParallelInstructionNode.create(
new ParallelInstruction().setName("C"), Nodes.ExecutionLocation.UNKNOWN);
Edge cOutput = DefaultEdge.create();
- Node cPCollection = InstructionOutputNode.create(new InstructionOutput().setName("C.out"));
+ Node cPCollection =
+ InstructionOutputNode.create(new InstructionOutput().setName("C.out"), PCOLLECTION_ID);
Node d =
ParallelInstructionNode.create(
new ParallelInstruction().setName("D"), Nodes.ExecutionLocation.UNKNOWN);
Edge dOutput = DefaultEdge.create();
- Node dPCollection = InstructionOutputNode.create(new InstructionOutput().setName("D.out"));
+ Node dPCollection =
+ InstructionOutputNode.create(new InstructionOutput().setName("D.out"), PCOLLECTION_ID);
// A --\
// -> Flatten --> C
diff --git a/runners/google-cloud-dataflow-java/worker/src/test/java/org/apache/beam/runners/dataflow/worker/graph/ReplacePgbkWithPrecombineFunctionTest.java b/runners/google-cloud-dataflow-java/worker/src/test/java/org/apache/beam/runners/dataflow/worker/graph/ReplacePgbkWithPrecombineFunctionTest.java
index 801f4af..6f0b826 100644
--- a/runners/google-cloud-dataflow-java/worker/src/test/java/org/apache/beam/runners/dataflow/worker/graph/ReplacePgbkWithPrecombineFunctionTest.java
+++ b/runners/google-cloud-dataflow-java/worker/src/test/java/org/apache/beam/runners/dataflow/worker/graph/ReplacePgbkWithPrecombineFunctionTest.java
@@ -148,6 +148,6 @@ public final class ReplacePgbkWithPrecombineFunctionTest {
/** Creates an {@link InstructionOutputNode} to act as a PCollection. */
private static InstructionOutputNode createInstructionOutputNode(String name) {
- return InstructionOutputNode.create(new InstructionOutput().setName(name));
+ return InstructionOutputNode.create(new InstructionOutput().setName(name), "fakeId");
}
}