You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@beam.apache.org by lc...@apache.org on 2016/10/17 18:48:36 UTC
[1/2] incubator-beam git commit: Move the step output ids to use a
flat namespace. Also add a logical mapping from tuple tag to the flat
namespace for DoFns.
Repository: incubator-beam
Updated Branches:
refs/heads/master b8e6eea69 -> d69b324c4
Move the step output ids to use a flat namespace.
Also add a logical mapping from tuple tag to the flat namespace for DoFns.
Project: http://git-wip-us.apache.org/repos/asf/incubator-beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-beam/commit/17782007
Tree: http://git-wip-us.apache.org/repos/asf/incubator-beam/tree/17782007
Diff: http://git-wip-us.apache.org/repos/asf/incubator-beam/diff/17782007
Branch: refs/heads/master
Commit: 177820074d20e6ac72949f763f52cfb481904fc5
Parents: b8e6eea
Author: Luke Cwik <lc...@google.com>
Authored: Thu Oct 13 15:33:49 2016 -0700
Committer: Luke Cwik <lc...@google.com>
Committed: Mon Oct 17 11:47:42 2016 -0700
----------------------------------------------------------------------
.../dataflow/DataflowPipelineTranslator.java | 111 ++++++++++++-------
.../beam/runners/dataflow/DataflowRunner.java | 12 +-
.../dataflow/internal/ReadTranslator.java | 2 +-
.../beam/runners/dataflow/util/DoFnInfo.java | 30 +++--
.../DataflowPipelineTranslatorTest.java | 38 ++++++-
.../runners/dataflow/DataflowRunnerTest.java | 6 +-
6 files changed, 136 insertions(+), 63 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/17782007/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/DataflowPipelineTranslator.java
----------------------------------------------------------------------
diff --git a/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/DataflowPipelineTranslator.java b/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/DataflowPipelineTranslator.java
index 0d72881..c0366fc 100644
--- a/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/DataflowPipelineTranslator.java
+++ b/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/DataflowPipelineTranslator.java
@@ -41,6 +41,10 @@ import com.google.api.services.dataflow.model.Environment;
import com.google.api.services.dataflow.model.Job;
import com.google.api.services.dataflow.model.Step;
import com.google.api.services.dataflow.model.WorkerPool;
+import com.google.common.base.Supplier;
+import com.google.common.collect.BiMap;
+import com.google.common.collect.ImmutableBiMap;
+import com.google.common.collect.ImmutableMap;
import java.io.IOException;
import java.util.ArrayList;
import java.util.Arrays;
@@ -49,6 +53,7 @@ import java.util.HashMap;
import java.util.LinkedList;
import java.util.List;
import java.util.Map;
+import java.util.concurrent.atomic.AtomicLong;
import javax.annotation.Nullable;
import org.apache.beam.runners.dataflow.DataflowRunner.GroupByKeyAndSortValuesOnly;
import org.apache.beam.runners.dataflow.internal.ReadTranslator;
@@ -300,32 +305,30 @@ public class DataflowPipelineTranslator {
public void addInput(String name, List<? extends Map<String, Object>> elements);
/**
- * Adds an output with the given name to the previously added
- * Dataflow step, producing the specified output {@code PValue},
+ * Adds an output to the previously added Dataflow step,
+ * producing the specified output {@code PValue},
* including its {@code Coder} if a {@code TypedPValue}. If the
* {@code PValue} is a {@code PCollection}, wraps its coder inside
- * a {@code WindowedValueCoder}.
+ * a {@code WindowedValueCoder}. Returns a pipeline level unique id.
*/
- public void addOutput(String name, PValue value);
+ public long addOutput(PValue value);
/**
- * Adds an output with the given name to the previously added
- * Dataflow step, producing the specified output {@code PValue},
+ * Adds an output to the previously added Dataflow step,
+ * producing the specified output {@code PValue},
* including its {@code Coder} if a {@code TypedPValue}. If the
* {@code PValue} is a {@code PCollection}, wraps its coder inside
- * a {@code ValueOnlyCoder}.
+ * a {@code ValueOnlyCoder}. Returns a pipeline level unique id.
*/
- public void addValueOnlyOutput(String name, PValue value);
+ public long addValueOnlyOutput(PValue value);
/**
- * Adds an output with the given name to the previously added
- * CollectionToSingleton Dataflow step, consuming the specified
- * input {@code PValue} and producing the specified output
+ * Adds an output to the previously added CollectionToSingleton Dataflow step,
+ * consuming the specified input {@code PValue} and producing the specified output
* {@code PValue}. This step requires special treatment for its
- * output encoding.
+ * output encoding. Returns a pipeline level unique id.
*/
- public void addCollectionToSingletonOutput(String name,
- PValue inputValue,
+ public long addCollectionToSingletonOutput(PValue inputValue,
PValue outputValue);
/**
@@ -341,6 +344,19 @@ public class DataflowPipelineTranslator {
* Translates a Pipeline into the Dataflow representation.
*/
class Translator extends PipelineVisitor.Defaults implements TranslationContext {
+ /**
+ * An id generator to be used when giving unique ids for pipeline level constructs.
+ * This is purposely wrapped inside of a {@link Supplier} to prevent the incorrect
+ * usage of the {@link AtomicLong} that is contained.
+ */
+ private final Supplier<Long> idGenerator = new Supplier<Long>() {
+ private final AtomicLong generator = new AtomicLong(1L);
+ @Override
+ public Long get() {
+ return generator.getAndIncrement();
+ }
+ };
+
/** The Pipeline to translate. */
private final Pipeline pipeline;
@@ -634,7 +650,7 @@ public class DataflowPipelineTranslator {
}
@Override
- public void addOutput(String name, PValue value) {
+ public long addOutput(PValue value) {
Coder<?> coder;
if (value instanceof TypedPValue) {
coder = ((TypedPValue<?>) value).getCoder();
@@ -648,11 +664,11 @@ public class DataflowPipelineTranslator {
// No output coder to encode.
coder = null;
}
- addOutput(name, value, coder);
+ return addOutput(value, coder);
}
@Override
- public void addValueOnlyOutput(String name, PValue value) {
+ public long addValueOnlyOutput(PValue value) {
Coder<?> coder;
if (value instanceof TypedPValue) {
coder = ((TypedPValue<?>) value).getCoder();
@@ -665,12 +681,11 @@ public class DataflowPipelineTranslator {
// No output coder to encode.
coder = null;
}
- addOutput(name, value, coder);
+ return addOutput(value, coder);
}
@Override
- public void addCollectionToSingletonOutput(String name,
- PValue inputValue,
+ public long addCollectionToSingletonOutput(PValue inputValue,
PValue outputValue) {
Coder<?> inputValueCoder =
checkNotNull(outputCoders.get(inputValue));
@@ -683,7 +698,7 @@ public class DataflowPipelineTranslator {
// IterableCoder of the inputValueCoder. This is a property
// of the backend "CollectionToSingleton" step.
Coder<?> outputValueCoder = IterableCoder.of(inputValueCoder);
- addOutput(name, outputValue, outputValueCoder);
+ return addOutput(outputValue, outputValueCoder);
}
/**
@@ -691,8 +706,9 @@ public class DataflowPipelineTranslator {
* Dataflow step, producing the specified output {@code PValue}
* with the given {@code Coder} (if not {@code null}).
*/
- private void addOutput(String name, PValue value, Coder<?> valueCoder) {
- registerOutputName(value, name);
+ private long addOutput(PValue value, Coder<?> valueCoder) {
+ long id = idGenerator.get();
+ registerOutputName(value, Long.toString(id));
Map<String, Object> properties = getProperties();
@Nullable List<Map<String, Object>> outputInfoList = null;
@@ -709,7 +725,7 @@ public class DataflowPipelineTranslator {
}
Map<String, Object> outputInfo = new HashMap<>();
- addString(outputInfo, PropertyNames.OUTPUT_NAME, name);
+ addString(outputInfo, PropertyNames.OUTPUT_NAME, Long.toString(id));
addString(outputInfo, PropertyNames.USER_NAME, value.getName());
if (value instanceof PCollection
&& runner.doesPCollectionRequireIndexedFormat((PCollection<?>) value)) {
@@ -724,6 +740,7 @@ public class DataflowPipelineTranslator {
}
outputInfoList.add(outputInfo);
+ return id;
}
private void addDisplayData(String stepName, HasDisplayData hasDisplayData) {
@@ -805,7 +822,6 @@ public class DataflowPipelineTranslator {
context.addStep(transform, "CollectionToSingleton");
context.addInput(PropertyNames.PARALLEL_INPUT, context.getInput(transform));
context.addCollectionToSingletonOutput(
- PropertyNames.OUTPUT,
context.getInput(transform),
context.getOutput(transform));
}
@@ -837,7 +853,7 @@ public class DataflowPipelineTranslator {
context.addInput(
PropertyNames.SERIALIZED_FN,
byteArrayToJsonString(serializeToByteArray(fn)));
- context.addOutput(PropertyNames.OUTPUT, context.getOutput(transform));
+ context.addOutput(context.getOutput(transform));
}
});
@@ -861,7 +877,7 @@ public class DataflowPipelineTranslator {
inputs.add(context.asOutputReference(input));
}
context.addInput(PropertyNames.INPUTS, inputs);
- context.addOutput(PropertyNames.OUTPUT, context.getOutput(transform));
+ context.addOutput(context.getOutput(transform));
}
});
@@ -880,7 +896,7 @@ public class DataflowPipelineTranslator {
TranslationContext context) {
context.addStep(transform, "GroupByKey");
context.addInput(PropertyNames.PARALLEL_INPUT, context.getInput(transform));
- context.addOutput(PropertyNames.OUTPUT, context.getOutput(transform));
+ context.addOutput(context.getOutput(transform));
context.addInput(PropertyNames.SORT_VALUES, true);
// TODO: Add support for combiner lifting once the need arises.
@@ -904,7 +920,7 @@ public class DataflowPipelineTranslator {
TranslationContext context) {
context.addStep(transform, "GroupByKey");
context.addInput(PropertyNames.PARALLEL_INPUT, context.getInput(transform));
- context.addOutput(PropertyNames.OUTPUT, context.getOutput(transform));
+ context.addOutput(context.getOutput(transform));
WindowingStrategy<?, ?> windowingStrategy =
context.getInput(transform).getWindowingStrategy();
@@ -941,9 +957,16 @@ public class DataflowPipelineTranslator {
TranslationContext context) {
context.addStep(transform, "ParallelDo");
translateInputs(context.getInput(transform), transform.getSideInputs(), context);
- translateFn(transform.getFn(), context.getInput(transform).getWindowingStrategy(),
- transform.getSideInputs(), context.getInput(transform).getCoder(), context);
- translateOutputs(context.getOutput(transform), context);
+ BiMap<Long, TupleTag<?>> outputMap =
+ translateOutputs(context.getOutput(transform), context);
+ translateFn(
+ transform.getFn(),
+ context.getInput(transform).getWindowingStrategy(),
+ transform.getSideInputs(),
+ context.getInput(transform).getCoder(),
+ context,
+ outputMap.inverse().get(transform.getMainOutputTag()),
+ outputMap);
}
});
@@ -962,11 +985,17 @@ public class DataflowPipelineTranslator {
TranslationContext context) {
context.addStep(transform, "ParallelDo");
translateInputs(context.getInput(transform), transform.getSideInputs(), context);
+ long mainOutput = context.addOutput(context.getOutput(transform));
translateFn(
transform.getFn(),
context.getInput(transform).getWindowingStrategy(),
- transform.getSideInputs(), context.getInput(transform).getCoder(), context);
- context.addOutput(PropertyNames.OUTPUT, context.getOutput(transform));
+ transform.getSideInputs(),
+ context.getInput(transform).getCoder(),
+ context,
+ mainOutput,
+ ImmutableMap.<Long, TupleTag<?>>of(mainOutput,
+ new TupleTag<>(PropertyNames.OUTPUT)));
+
}
});
@@ -983,7 +1012,7 @@ public class DataflowPipelineTranslator {
Window.Bound<T> transform, TranslationContext context) {
context.addStep(transform, "Bucket");
context.addInput(PropertyNames.PARALLEL_INPUT, context.getInput(transform));
- context.addOutput(PropertyNames.OUTPUT, context.getOutput(transform));
+ context.addOutput(context.getOutput(transform));
WindowingStrategy<?, ?> strategy = context.getOutput(transform).getWindowingStrategy();
byte[] serializedBytes = serializeToByteArray(strategy);
@@ -1028,22 +1057,26 @@ public class DataflowPipelineTranslator {
WindowingStrategy windowingStrategy,
Iterable<PCollectionView<?>> sideInputs,
Coder inputCoder,
- TranslationContext context) {
+ TranslationContext context,
+ long mainOutput,
+ Map<Long, TupleTag<?>> outputMap) {
context.addInput(PropertyNames.USER_FN, fn.getClass().getName());
context.addInput(
PropertyNames.SERIALIZED_FN,
byteArrayToJsonString(serializeToByteArray(
- new DoFnInfo(fn, windowingStrategy, sideInputs, inputCoder))));
+ new DoFnInfo(fn, windowingStrategy, sideInputs, inputCoder, mainOutput, outputMap))));
}
- private static void translateOutputs(
+ private static BiMap<Long, TupleTag<?>> translateOutputs(
PCollectionTuple outputs,
TranslationContext context) {
+ ImmutableBiMap.Builder<Long, TupleTag<?>> mapBuilder = ImmutableBiMap.builder();
for (Map.Entry<TupleTag<?>, PCollection<?>> entry
: outputs.getAll().entrySet()) {
TupleTag<?> tag = entry.getKey();
PCollection<?> output = entry.getValue();
- context.addOutput(tag.getId(), output);
+ mapBuilder.put(context.addOutput(output), tag);
}
+ return mapBuilder.build();
}
}
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/17782007/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/DataflowRunner.java
----------------------------------------------------------------------
diff --git a/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/DataflowRunner.java b/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/DataflowRunner.java
index 646a145..55a01f7 100644
--- a/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/DataflowRunner.java
+++ b/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/DataflowRunner.java
@@ -1824,7 +1824,7 @@ public class DataflowRunner extends PipelineRunner<DataflowPipelineJob> {
// outputting to all the outputs defined above.
PCollectionTuple outputTuple = input
.apply("GBKaSVForData", new GroupByKeyHashAndSortByKeyAndWindow<K, V, W>(ismCoder))
- .apply(ParDo.of(new ToIsmRecordForMapLikeDoFn<K, V, W>(
+ .apply(ParDo.of(new ToIsmRecordForMapLikeDoFn<>(
outputForSizeTag, outputForEntrySetTag,
windowCoder, inputCoder.getKeyCoder(), ismCoder, uniqueKeysExpected))
.withOutputTags(mainOutputTag,
@@ -2116,7 +2116,7 @@ public class DataflowRunner extends PipelineRunner<DataflowPipelineJob> {
if (overriddenTransform.getIdLabel() != null) {
context.addInput(PropertyNames.PUBSUB_ID_LABEL, overriddenTransform.getIdLabel());
}
- context.addValueOnlyOutput(PropertyNames.OUTPUT, context.getOutput(transform));
+ context.addValueOnlyOutput(context.getOutput(transform));
}
}
@@ -2215,10 +2215,10 @@ public class DataflowRunner extends PipelineRunner<DataflowPipelineJob> {
source.validate();
if (source.requiresDeduping()) {
- return Pipeline.applyTransform(input, new ReadWithIds<T>(source))
+ return Pipeline.applyTransform(input, new ReadWithIds<>(source))
.apply(new Deduplicate<T>());
} else {
- return Pipeline.applyTransform(input, new ReadWithIds<T>(source))
+ return Pipeline.applyTransform(input, new ReadWithIds<>(source))
.apply("StripIds", ParDo.of(new ValueWithRecordId.StripIdsDoFn<T>()));
}
}
@@ -2348,7 +2348,7 @@ public class DataflowRunner extends PipelineRunner<DataflowPipelineJob> {
public static <T> StreamingPCollectionViewWriterFn<T> create(
PCollectionView<?> view, Coder<T> dataCoder) {
- return new StreamingPCollectionViewWriterFn<T>(view, dataCoder);
+ return new StreamingPCollectionViewWriterFn<>(view, dataCoder);
}
private StreamingPCollectionViewWriterFn(PCollectionView<?> view, Coder<T> dataCoder) {
@@ -2648,7 +2648,7 @@ public class DataflowRunner extends PipelineRunner<DataflowPipelineJob> {
private static class Concatenate<T> extends CombineFn<T, List<T>, List<T>> {
@Override
public List<T> createAccumulator() {
- return new ArrayList<T>();
+ return new ArrayList<>();
}
@Override
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/17782007/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/internal/ReadTranslator.java
----------------------------------------------------------------------
diff --git a/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/internal/ReadTranslator.java b/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/internal/ReadTranslator.java
index 094f405..83836c0 100755
--- a/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/internal/ReadTranslator.java
+++ b/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/internal/ReadTranslator.java
@@ -62,7 +62,7 @@ public class ReadTranslator implements TransformTranslator<Read.Bounded<?>> {
PropertyNames.SOURCE_STEP_INPUT,
cloudSourceToDictionary(
CustomSources.serializeToCloudSource(source, context.getPipelineOptions())));
- context.addValueOnlyOutput(PropertyNames.OUTPUT, context.getOutput(transform));
+ context.addValueOnlyOutput(context.getOutput(transform));
} catch (Exception e) {
throw new RuntimeException(e);
}
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/17782007/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/util/DoFnInfo.java
----------------------------------------------------------------------
diff --git a/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/util/DoFnInfo.java b/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/util/DoFnInfo.java
index 949c381..b211c04 100644
--- a/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/util/DoFnInfo.java
+++ b/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/util/DoFnInfo.java
@@ -18,10 +18,12 @@
package org.apache.beam.runners.dataflow.util;
import java.io.Serializable;
+import java.util.Map;
import org.apache.beam.sdk.coders.Coder;
import org.apache.beam.sdk.transforms.OldDoFn;
import org.apache.beam.sdk.util.WindowingStrategy;
import org.apache.beam.sdk.values.PCollectionView;
+import org.apache.beam.sdk.values.TupleTag;
/**
* Wrapper class holding the necessary information to serialize a {@link OldDoFn}.
@@ -34,20 +36,21 @@ public class DoFnInfo<InputT, OutputT> implements Serializable {
private final WindowingStrategy<?, ?> windowingStrategy;
private final Iterable<PCollectionView<?>> sideInputViews;
private final Coder<InputT> inputCoder;
+ private final long mainOutput;
+ private final Map<Long, TupleTag<?>> outputMap;
- public DoFnInfo(OldDoFn<InputT, OutputT> doFn, WindowingStrategy<?, ?> windowingStrategy) {
- this.doFn = doFn;
- this.windowingStrategy = windowingStrategy;
- this.sideInputViews = null;
- this.inputCoder = null;
- }
-
- public DoFnInfo(OldDoFn<InputT, OutputT> doFn, WindowingStrategy<?, ?> windowingStrategy,
- Iterable<PCollectionView<?>> sideInputViews, Coder<InputT> inputCoder) {
+ public DoFnInfo(OldDoFn<InputT, OutputT> doFn,
+ WindowingStrategy<?, ?> windowingStrategy,
+ Iterable<PCollectionView<?>> sideInputViews,
+ Coder<InputT> inputCoder,
+ long mainOutput,
+ Map<Long, TupleTag<?>> outputMap) {
this.doFn = doFn;
this.windowingStrategy = windowingStrategy;
this.sideInputViews = sideInputViews;
this.inputCoder = inputCoder;
+ this.mainOutput = mainOutput;
+ this.outputMap = outputMap;
}
public OldDoFn<InputT, OutputT> getDoFn() {
@@ -65,5 +68,12 @@ public class DoFnInfo<InputT, OutputT> implements Serializable {
public Coder<InputT> getInputCoder() {
return inputCoder;
}
-}
+ public long getMainOutput() {
+ return mainOutput;
+ }
+
+ public Map<Long, TupleTag<?>> getOutputMap() {
+ return outputMap;
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/17782007/runners/google-cloud-dataflow-java/src/test/java/org/apache/beam/runners/dataflow/DataflowPipelineTranslatorTest.java
----------------------------------------------------------------------
diff --git a/runners/google-cloud-dataflow-java/src/test/java/org/apache/beam/runners/dataflow/DataflowPipelineTranslatorTest.java b/runners/google-cloud-dataflow-java/src/test/java/org/apache/beam/runners/dataflow/DataflowPipelineTranslatorTest.java
index 98d2fb0..762844b 100644
--- a/runners/google-cloud-dataflow-java/src/test/java/org/apache/beam/runners/dataflow/DataflowPipelineTranslatorTest.java
+++ b/runners/google-cloud-dataflow-java/src/test/java/org/apache/beam/runners/dataflow/DataflowPipelineTranslatorTest.java
@@ -46,12 +46,16 @@ import com.google.common.collect.ImmutableSet;
import com.google.common.collect.Iterables;
import java.io.IOException;
import java.io.Serializable;
+import java.util.ArrayList;
import java.util.Collection;
import java.util.Collections;
import java.util.HashMap;
+import java.util.HashSet;
import java.util.LinkedList;
import java.util.List;
import java.util.Map;
+import java.util.Set;
+import org.apache.beam.runners.dataflow.DataflowPipelineTranslator.JobSpecification;
import org.apache.beam.runners.dataflow.DataflowPipelineTranslator.TranslationContext;
import org.apache.beam.runners.dataflow.options.DataflowPipelineOptions;
import org.apache.beam.runners.dataflow.options.DataflowPipelineWorkerPoolOptions;
@@ -465,6 +469,7 @@ public class DataflowPipelineTranslatorTest implements Serializable {
(DataflowRunner) pipeline.getRunner(),
Collections.<DataflowPackage>emptyList())
.getJob();
+ assertAllStepOutputsHaveUniqueIds(job);
List<Step> steps = job.getSteps();
assertEquals(4, steps.size());
@@ -523,6 +528,7 @@ public class DataflowPipelineTranslatorTest implements Serializable {
assertEquals(13, job.getSteps().size());
Step step = job.getSteps().get(1);
assertEquals(stepName, getString(step.getProperties(), PropertyNames.USER_NAME));
+ assertAllStepOutputsHaveUniqueIds(job);
return step;
}
@@ -637,7 +643,7 @@ public class DataflowPipelineTranslatorTest implements Serializable {
}
@Test
- public void testMultiGraphPipelineSerialization() throws IOException {
+ public void testMultiGraphPipelineSerialization() throws Exception {
Pipeline p = Pipeline.create(buildPipelineOptions());
PCollection<Integer> input = p.begin()
@@ -650,8 +656,9 @@ public class DataflowPipelineTranslatorTest implements Serializable {
PipelineOptionsFactory.as(DataflowPipelineOptions.class));
// Check that translation doesn't fail.
- t.translate(
+ JobSpecification jobSpecification = t.translate(
p, (DataflowRunner) p.getRunner(), Collections.<DataflowPackage>emptyList());
+ assertAllStepOutputsHaveUniqueIds(jobSpecification.getJob());
}
@Test
@@ -692,10 +699,11 @@ public class DataflowPipelineTranslatorTest implements Serializable {
applyRead(pipeline, "gs://bucket/foo[0-9]/baz");
// Check that translation doesn't fail.
- t.translate(
+ JobSpecification jobSpecification = t.translate(
pipeline,
(DataflowRunner) pipeline.getRunner(),
Collections.<DataflowPackage>emptyList());
+ assertAllStepOutputsHaveUniqueIds(jobSpecification.getJob());
}
private void applyRead(Pipeline pipeline, String path) {
@@ -744,6 +752,7 @@ public class DataflowPipelineTranslatorTest implements Serializable {
(DataflowRunner) pipeline.getRunner(),
Collections.<DataflowPackage>emptyList())
.getJob();
+ assertAllStepOutputsHaveUniqueIds(job);
List<Step> steps = job.getSteps();
assertEquals(2, steps.size());
@@ -753,7 +762,6 @@ public class DataflowPipelineTranslatorTest implements Serializable {
Step collectionToSingletonStep = steps.get(1);
assertEquals("CollectionToSingleton", collectionToSingletonStep.getKind());
-
}
@Test
@@ -776,6 +784,7 @@ public class DataflowPipelineTranslatorTest implements Serializable {
(DataflowRunner) pipeline.getRunner(),
Collections.<DataflowPackage>emptyList())
.getJob();
+ assertAllStepOutputsHaveUniqueIds(job);
List<Step> steps = job.getSteps();
assertEquals(2, steps.size());
@@ -806,6 +815,7 @@ public class DataflowPipelineTranslatorTest implements Serializable {
(DataflowRunner) pipeline.getRunner(),
Collections.<DataflowPackage>emptyList())
.getJob();
+ assertAllStepOutputsHaveUniqueIds(job);
List<Step> steps = job.getSteps();
assertEquals(5, steps.size());
@@ -839,6 +849,7 @@ public class DataflowPipelineTranslatorTest implements Serializable {
(DataflowRunner) pipeline.getRunner(),
Collections.<DataflowPackage>emptyList())
.getJob();
+ assertAllStepOutputsHaveUniqueIds(job);
List<Step> steps = job.getSteps();
assertEquals(3, steps.size());
@@ -902,6 +913,7 @@ public class DataflowPipelineTranslatorTest implements Serializable {
(DataflowRunner) pipeline.getRunner(),
Collections.<DataflowPackage>emptyList())
.getJob();
+ assertAllStepOutputsHaveUniqueIds(job);
List<Step> steps = job.getSteps();
assertEquals(3, steps.size());
@@ -963,4 +975,22 @@ public class DataflowPipelineTranslatorTest implements Serializable {
assertEquals(expectedFn1DisplayData, ImmutableSet.copyOf(fn1displayData));
assertEquals(expectedFn2DisplayData, ImmutableSet.copyOf(fn2displayData));
}
+
+ private static void assertAllStepOutputsHaveUniqueIds(Job job)
+ throws Exception {
+ List<Long> outputIds = new ArrayList<>();
+ for (Step step : job.getSteps()) {
+ List<Map<String, Object>> outputInfoList =
+ (List<Map<String, Object>>) step.getProperties().get(PropertyNames.OUTPUT_INFO);
+ if (outputInfoList != null) {
+ for (Map<String, Object> outputInfo : outputInfoList) {
+ outputIds.add(Long.parseLong(Structs.getString(outputInfo, PropertyNames.OUTPUT_NAME)));
+ }
+ }
+ }
+ Set<Long> uniqueOutputNames = new HashSet<>(outputIds);
+ outputIds.removeAll(uniqueOutputNames);
+ assertTrue(String.format("Found duplicate output ids %s", outputIds),
+ outputIds.size() == 0);
+ }
}
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/17782007/runners/google-cloud-dataflow-java/src/test/java/org/apache/beam/runners/dataflow/DataflowRunnerTest.java
----------------------------------------------------------------------
diff --git a/runners/google-cloud-dataflow-java/src/test/java/org/apache/beam/runners/dataflow/DataflowRunnerTest.java b/runners/google-cloud-dataflow-java/src/test/java/org/apache/beam/runners/dataflow/DataflowRunnerTest.java
index b0ee231..ddb7cf8 100644
--- a/runners/google-cloud-dataflow-java/src/test/java/org/apache/beam/runners/dataflow/DataflowRunnerTest.java
+++ b/runners/google-cloud-dataflow-java/src/test/java/org/apache/beam/runners/dataflow/DataflowRunnerTest.java
@@ -895,7 +895,7 @@ public class DataflowRunnerTest {
// Note: This is about the minimum needed to fake out a
// translation. This obviously isn't a real translation.
context.addStep(transform, "TestTranslate");
- context.addOutput("output", context.getOutput(transform));
+ context.addOutput(context.getOutput(transform));
}
});
@@ -1098,7 +1098,7 @@ public class DataflowRunnerTest {
DoFnTester<KV<Integer, Iterable<KV<KV<Long, IntervalWindow>, WindowedValue<Long>>>>,
IsmRecord<WindowedValue<Long>>> doFnTester =
- DoFnTester.of(new BatchViewAsMultimap.ToIsmRecordForMapLikeDoFn<Long, Long, IntervalWindow>(
+ DoFnTester.of(new BatchViewAsMultimap.ToIsmRecordForMapLikeDoFn<>(
outputForSizeTag,
outputForEntrySetTag,
windowCoder,
@@ -1198,7 +1198,7 @@ public class DataflowRunnerTest {
DoFnTester<KV<Integer, Iterable<KV<KV<Long, IntervalWindow>, WindowedValue<Long>>>>,
IsmRecord<WindowedValue<Long>>> doFnTester =
- DoFnTester.of(new BatchViewAsMultimap.ToIsmRecordForMapLikeDoFn<Long, Long, IntervalWindow>(
+ DoFnTester.of(new BatchViewAsMultimap.ToIsmRecordForMapLikeDoFn<>(
outputForSizeTag,
outputForEntrySetTag,
windowCoder,
[2/2] incubator-beam git commit: Move the step output ids to use a
flat namespace.
Posted by lc...@apache.org.
Move the step output ids to use a flat namespace.
This closes #1099
Project: http://git-wip-us.apache.org/repos/asf/incubator-beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-beam/commit/d69b324c
Tree: http://git-wip-us.apache.org/repos/asf/incubator-beam/tree/d69b324c
Diff: http://git-wip-us.apache.org/repos/asf/incubator-beam/diff/d69b324c
Branch: refs/heads/master
Commit: d69b324c482b7960194781e86666b2cfa9d4702a
Parents: b8e6eea 1778200
Author: Luke Cwik <lc...@google.com>
Authored: Mon Oct 17 11:48:18 2016 -0700
Committer: Luke Cwik <lc...@google.com>
Committed: Mon Oct 17 11:48:18 2016 -0700
----------------------------------------------------------------------
.../dataflow/DataflowPipelineTranslator.java | 111 ++++++++++++-------
.../beam/runners/dataflow/DataflowRunner.java | 12 +-
.../dataflow/internal/ReadTranslator.java | 2 +-
.../beam/runners/dataflow/util/DoFnInfo.java | 30 +++--
.../DataflowPipelineTranslatorTest.java | 38 ++++++-
.../runners/dataflow/DataflowRunnerTest.java | 6 +-
6 files changed, 136 insertions(+), 63 deletions(-)
----------------------------------------------------------------------