You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@beam.apache.org by lc...@apache.org on 2017/11/15 17:00:36 UTC
[2/7] beam git commit: Replace the View.As transforms for Dataflow
batch because the entire implementation is specialized.
Replace the View.As transforms for Dataflow batch because the entire implementation is specialized.
Project: http://git-wip-us.apache.org/repos/asf/beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/beam/commit/c2f815c5
Tree: http://git-wip-us.apache.org/repos/asf/beam/tree/c2f815c5
Diff: http://git-wip-us.apache.org/repos/asf/beam/diff/c2f815c5
Branch: refs/heads/master
Commit: c2f815c581cf5d6cd72b600cef42f436c9702d85
Parents: d5a8aea
Author: Luke Cwik <lc...@google.com>
Authored: Tue Nov 14 07:47:53 2017 -0800
Committer: Luke Cwik <lc...@google.com>
Committed: Wed Nov 15 08:59:51 2017 -0800
----------------------------------------------------------------------
.../runners/dataflow/BatchViewOverrides.java | 53 ++++-----
.../runners/dataflow/CreateDataflowView.java | 19 +++-
.../beam/runners/dataflow/DataflowRunner.java | 110 ++++++++++++++++---
.../dataflow/StreamingViewOverrides.java | 2 +-
.../DataflowPipelineTranslatorTest.java | 12 +-
5 files changed, 142 insertions(+), 54 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/beam/blob/c2f815c5/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/BatchViewOverrides.java
----------------------------------------------------------------------
diff --git a/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/BatchViewOverrides.java b/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/BatchViewOverrides.java
index 9a77b4b..8ed41cb 100644
--- a/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/BatchViewOverrides.java
+++ b/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/BatchViewOverrides.java
@@ -116,7 +116,7 @@ class BatchViewOverrides {
* a warning to users to specify a deterministic key coder.
*/
static class BatchViewAsMap<K, V>
- extends PTransform<PCollection<KV<K, V>>, PCollectionView<Map<K, V>>> {
+ extends PTransform<PCollection<KV<K, V>>, PCollection<?>> {
/**
* A {@link DoFn} which groups elements by window boundaries. For each group,
@@ -193,11 +193,11 @@ class BatchViewOverrides {
}
@Override
- public PCollectionView<Map<K, V>> expand(PCollection<KV<K, V>> input) {
+ public PCollection<?> expand(PCollection<KV<K, V>> input) {
return this.<BoundedWindow>applyInternal(input);
}
- private <W extends BoundedWindow> PCollectionView<Map<K, V>>
+ private <W extends BoundedWindow> PCollection<?>
applyInternal(PCollection<KV<K, V>> input) {
try {
return BatchViewAsMultimap.applyForMapLike(runner, input, view, true /* unique keys */);
@@ -216,7 +216,7 @@ class BatchViewOverrides {
}
/** Transforms the input {@link PCollection} into a singleton {@link Map} per window. */
- private <W extends BoundedWindow> PCollectionView<Map<K, V>>
+ private <W extends BoundedWindow> PCollection<?>
applyForSingletonFallback(PCollection<KV<K, V>> input) {
@SuppressWarnings("unchecked")
Coder<W> windowCoder = (Coder<W>)
@@ -280,7 +280,7 @@ class BatchViewOverrides {
* a warning to users to specify a deterministic key coder.
*/
static class BatchViewAsMultimap<K, V>
- extends PTransform<PCollection<KV<K, V>>, PCollectionView<Map<K, Iterable<V>>>> {
+ extends PTransform<PCollection<KV<K, V>>, PCollection<?>> {
/**
* A {@link PTransform} that groups elements by the hash of window's byte representation
* if the input {@link PCollection} is not within the global window. Otherwise by the hash
@@ -672,11 +672,11 @@ class BatchViewOverrides {
}
@Override
- public PCollectionView<Map<K, Iterable<V>>> expand(PCollection<KV<K, V>> input) {
+ public PCollection<?> expand(PCollection<KV<K, V>> input) {
return this.<BoundedWindow>applyInternal(input);
}
- private <W extends BoundedWindow> PCollectionView<Map<K, Iterable<V>>>
+ private <W extends BoundedWindow> PCollection<?>
applyInternal(PCollection<KV<K, V>> input) {
try {
return applyForMapLike(runner, input, view, false /* unique keys not expected */);
@@ -690,7 +690,7 @@ class BatchViewOverrides {
}
/** Transforms the input {@link PCollection} into a singleton {@link Map} per window. */
- private <W extends BoundedWindow> PCollectionView<Map<K, Iterable<V>>>
+ private <W extends BoundedWindow> PCollection<?>
applyForSingletonFallback(PCollection<KV<K, V>> input) {
@SuppressWarnings("unchecked")
Coder<W> windowCoder = (Coder<W>)
@@ -727,7 +727,7 @@ class BatchViewOverrides {
view);
}
- private static <K, V, W extends BoundedWindow, ViewT> PCollectionView<ViewT> applyForMapLike(
+ private static <K, V, W extends BoundedWindow, ViewT> PCollection<?> applyForMapLike(
DataflowRunner runner,
PCollection<KV<K, V>> input,
PCollectionView<ViewT> view,
@@ -804,9 +804,10 @@ class BatchViewOverrides {
PCollectionList.of(ImmutableList.of(
perHashWithReifiedWindows, windowMapSizeMetadata, windowMapKeysMetadata));
- Pipeline.applyTransform(outputs, Flatten.<IsmRecord<WindowedValue<V>>>pCollections())
- .apply(CreateDataflowView.<IsmRecord<WindowedValue<V>>, ViewT>of(view));
- return view;
+ PCollection<IsmRecord<WindowedValue<V>>> flattenedOutputs =
+ Pipeline.applyTransform(outputs, Flatten.<IsmRecord<WindowedValue<V>>>pCollections());
+ flattenedOutputs.apply(CreateDataflowView.<IsmRecord<WindowedValue<V>>, ViewT>forBatch(view));
+ return flattenedOutputs;
}
@Override
@@ -843,7 +844,7 @@ class BatchViewOverrides {
* </ul>
*/
static class BatchViewAsSingleton<T>
- extends PTransform<PCollection<T>, PCollectionView<T>> {
+ extends PTransform<PCollection<T>, PCollection<?>> {
/**
* A {@link DoFn} that outputs {@link IsmRecord}s. These records are structured as follows:
@@ -900,7 +901,7 @@ class BatchViewOverrides {
}
@Override
- public PCollectionView<T> expand(PCollection<T> input) {
+ public PCollection<?> expand(PCollection<T> input) {
@SuppressWarnings("unchecked")
Coder<BoundedWindow> windowCoder = (Coder<BoundedWindow>)
input.getWindowingStrategy().getWindowFn().windowCoder();
@@ -913,7 +914,7 @@ class BatchViewOverrides {
view);
}
- static <T, FinalT, ViewT, W extends BoundedWindow> PCollectionView<ViewT>
+ static <T, FinalT, ViewT, W extends BoundedWindow> PCollection<?>
applyForSingleton(
DataflowRunner runner,
PCollection<T> input,
@@ -936,8 +937,8 @@ class BatchViewOverrides {
runner.addPCollectionRequiringIndexedFormat(reifiedPerWindowAndSorted);
reifiedPerWindowAndSorted.apply(
- CreateDataflowView.<IsmRecord<WindowedValue<FinalT>>, ViewT>of(view));
- return view;
+ CreateDataflowView.<IsmRecord<WindowedValue<FinalT>>, ViewT>forBatch(view));
+ return reifiedPerWindowAndSorted;
}
@Override
@@ -969,7 +970,7 @@ class BatchViewOverrides {
* </ul>
*/
static class BatchViewAsList<T>
- extends PTransform<PCollection<T>, PCollectionView<List<T>>> {
+ extends PTransform<PCollection<T>, PCollection<?>> {
/**
* A {@link DoFn} which creates {@link IsmRecord}s assuming that each element is within the
* global window. Each {@link IsmRecord} has
@@ -1054,11 +1055,11 @@ class BatchViewOverrides {
}
@Override
- public PCollectionView<List<T>> expand(PCollection<T> input) {
+ public PCollection<?> expand(PCollection<T> input) {
return applyForIterableLike(runner, input, view);
}
- static <T, W extends BoundedWindow, ViewT> PCollectionView<ViewT> applyForIterableLike(
+ static <T, W extends BoundedWindow, ViewT> PCollection<?> applyForIterableLike(
DataflowRunner runner,
PCollection<T> input,
PCollectionView<ViewT> view) {
@@ -1081,8 +1082,8 @@ class BatchViewOverrides {
runner.addPCollectionRequiringIndexedFormat(reifiedPerWindowAndSorted);
reifiedPerWindowAndSorted.apply(
- CreateDataflowView.<IsmRecord<WindowedValue<T>>, ViewT>of(view));
- return view;
+ CreateDataflowView.<IsmRecord<WindowedValue<T>>, ViewT>forBatch(view));
+ return reifiedPerWindowAndSorted;
}
PCollection<IsmRecord<WindowedValue<T>>> reifiedPerWindowAndSorted = input
@@ -1092,8 +1093,8 @@ class BatchViewOverrides {
runner.addPCollectionRequiringIndexedFormat(reifiedPerWindowAndSorted);
reifiedPerWindowAndSorted.apply(
- CreateDataflowView.<IsmRecord<WindowedValue<T>>, ViewT>of(view));
- return view;
+ CreateDataflowView.<IsmRecord<WindowedValue<T>>, ViewT>forBatch(view));
+ return reifiedPerWindowAndSorted;
}
@Override
@@ -1127,7 +1128,7 @@ class BatchViewOverrides {
* </ul>
*/
static class BatchViewAsIterable<T>
- extends PTransform<PCollection<T>, PCollectionView<Iterable<T>>> {
+ extends PTransform<PCollection<T>, PCollection<?>> {
private final DataflowRunner runner;
private final PCollectionView<Iterable<T>> view;
@@ -1140,7 +1141,7 @@ class BatchViewOverrides {
}
@Override
- public PCollectionView<Iterable<T>> expand(PCollection<T> input) {
+ public PCollection<?> expand(PCollection<T> input) {
return BatchViewAsList.applyForIterableLike(runner, input, view);
}
}
http://git-wip-us.apache.org/repos/asf/beam/blob/c2f815c5/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/CreateDataflowView.java
----------------------------------------------------------------------
diff --git a/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/CreateDataflowView.java b/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/CreateDataflowView.java
index 3b01d69..10888c2 100644
--- a/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/CreateDataflowView.java
+++ b/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/CreateDataflowView.java
@@ -25,20 +25,29 @@ import org.apache.beam.sdk.values.PCollectionView;
/** A {@link DataflowRunner} marker class for creating a {@link PCollectionView}. */
public class CreateDataflowView<ElemT, ViewT>
extends PTransform<PCollection<ElemT>, PCollection<ElemT>> {
- public static <ElemT, ViewT> CreateDataflowView<ElemT, ViewT> of(PCollectionView<ViewT> view) {
- return new CreateDataflowView<>(view);
+ public static <ElemT, ViewT> CreateDataflowView<ElemT, ViewT> forBatch(PCollectionView<ViewT> view) {
+ return new CreateDataflowView<>(view, false);
+ }
+
+ public static <ElemT, ViewT> CreateDataflowView<ElemT, ViewT> forStreaming(PCollectionView<ViewT> view) {
+ return new CreateDataflowView<>(view, true);
}
private final PCollectionView<ViewT> view;
+ private final boolean streaming;
- private CreateDataflowView(PCollectionView<ViewT> view) {
+ private CreateDataflowView(PCollectionView<ViewT> view, boolean streaming) {
this.view = view;
+ this.streaming = streaming;
}
@Override
public PCollection<ElemT> expand(PCollection<ElemT> input) {
- return PCollection.createPrimitiveOutputInternal(
- input.getPipeline(), input.getWindowingStrategy(), input.isBounded(), input.getCoder());
+ if (streaming) {
+ return PCollection.createPrimitiveOutputInternal(
+ input.getPipeline(), input.getWindowingStrategy(), input.isBounded(), input.getCoder());
+ }
+ return (PCollection) view.getPCollection();
}
public PCollectionView<ViewT> getView() {
http://git-wip-us.apache.org/repos/asf/beam/blob/c2f815c5/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/DataflowRunner.java
----------------------------------------------------------------------
diff --git a/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/DataflowRunner.java b/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/DataflowRunner.java
index 0a20a0f..72e4f83 100644
--- a/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/DataflowRunner.java
+++ b/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/DataflowRunner.java
@@ -58,6 +58,7 @@ import java.util.Random;
import java.util.Set;
import java.util.SortedSet;
import java.util.TreeSet;
+import java.util.concurrent.atomic.AtomicReference;
import org.apache.beam.model.pipeline.v1.RunnerApi;
import org.apache.beam.runners.core.construction.CoderTranslation;
import org.apache.beam.runners.core.construction.DeduplicatedFlattenFactory;
@@ -124,6 +125,7 @@ import org.apache.beam.sdk.transforms.Reshuffle;
import org.apache.beam.sdk.transforms.SerializableFunction;
import org.apache.beam.sdk.transforms.SimpleFunction;
import org.apache.beam.sdk.transforms.View;
+import org.apache.beam.sdk.transforms.View.CreatePCollectionView;
import org.apache.beam.sdk.transforms.WithKeys;
import org.apache.beam.sdk.transforms.display.DisplayData;
import org.apache.beam.sdk.transforms.reflect.DoFnSignature;
@@ -139,7 +141,6 @@ import org.apache.beam.sdk.values.PBegin;
import org.apache.beam.sdk.values.PCollection;
import org.apache.beam.sdk.values.PCollection.IsBounded;
import org.apache.beam.sdk.values.PCollectionView;
-import org.apache.beam.sdk.values.PCollectionViews;
import org.apache.beam.sdk.values.PDone;
import org.apache.beam.sdk.values.PInput;
import org.apache.beam.sdk.values.PValue;
@@ -395,28 +396,28 @@ public class DataflowRunner extends PipelineRunner<DataflowPipelineJob> {
BatchStatefulParDoOverrides.singleOutputOverrideFactory(options)))
.add(
PTransformOverride.of(
- PTransformMatchers.createViewWithViewFn(PCollectionViews.MapViewFn.class),
- new ReflectiveOneToOneOverrideFactory(
+ PTransformMatchers.classEqualTo(View.AsMap.class),
+ new ReflectiveViewOverrideFactory(
BatchViewOverrides.BatchViewAsMap.class, this)))
.add(
PTransformOverride.of(
- PTransformMatchers.createViewWithViewFn(PCollectionViews.MultimapViewFn.class),
- new ReflectiveOneToOneOverrideFactory(
+ PTransformMatchers.classEqualTo(View.AsMultimap.class),
+ new ReflectiveViewOverrideFactory(
BatchViewOverrides.BatchViewAsMultimap.class, this)))
.add(
PTransformOverride.of(
- PTransformMatchers.createViewWithViewFn(PCollectionViews.SingletonViewFn.class),
- new ReflectiveOneToOneOverrideFactory(
+ PTransformMatchers.classEqualTo(View.AsSingleton.class),
+ new ReflectiveViewOverrideFactory(
BatchViewOverrides.BatchViewAsSingleton.class, this)))
.add(
PTransformOverride.of(
- PTransformMatchers.createViewWithViewFn(PCollectionViews.ListViewFn.class),
- new ReflectiveOneToOneOverrideFactory(
+ PTransformMatchers.classEqualTo(View.AsList.class),
+ new ReflectiveViewOverrideFactory(
BatchViewOverrides.BatchViewAsList.class, this)))
.add(
PTransformOverride.of(
- PTransformMatchers.createViewWithViewFn(PCollectionViews.IterableViewFn.class),
- new ReflectiveOneToOneOverrideFactory(
+ PTransformMatchers.classEqualTo(View.AsIterable.class),
+ new ReflectiveViewOverrideFactory(
BatchViewOverrides.BatchViewAsIterable.class, this)));
}
overridesBuilder
@@ -435,6 +436,81 @@ public class DataflowRunner extends PipelineRunner<DataflowPipelineJob> {
return overridesBuilder.build();
}
+ /**
+ * Replace the View.AsYYY transform with specialized view overrides for Dataflow. It is required that the
+ * new replacement transform uses the supplied PCollectionView and does not create another instance.
+ */
+ private static class ReflectiveViewOverrideFactory<InputT, ViewT>
+ implements PTransformOverrideFactory<PCollection<InputT>,
+ PValue, PTransform<PCollection<InputT>, PValue>> {
+
+ private final Class<PTransform<PCollection<InputT>, PCollectionView<ViewT>>> replacement;
+ private final DataflowRunner runner;
+
+ private ReflectiveViewOverrideFactory(
+ Class<PTransform<PCollection<InputT>, PCollectionView<ViewT>>> replacement,
+ DataflowRunner runner) {
+ this.replacement = replacement;
+ this.runner = runner;
+ }
+
+ @Override
+ public PTransformReplacement<PCollection<InputT>, PValue> getReplacementTransform(
+ final AppliedPTransform<PCollection<InputT>,
+ PValue,
+ PTransform<PCollection<InputT>, PValue>> transform) {
+
+ final AtomicReference<CreatePCollectionView> viewTransformRef = new AtomicReference<>();
+ transform.getPipeline().traverseTopologically(new PipelineVisitor.Defaults() {
+ // Stores whether we have entered the expected composite view transform.
+ private boolean tracking = false;
+
+ @Override
+ public CompositeBehavior enterCompositeTransform(Node node) {
+ if (transform.getTransform() == node.getTransform()) {
+ tracking = true;
+ }
+ return super.enterCompositeTransform(node);
+ }
+
+ @Override
+ public void visitPrimitiveTransform(Node node) {
+ if (tracking && node.getTransform() instanceof CreatePCollectionView) {
+ checkState(
+ viewTransformRef.compareAndSet(null, (CreatePCollectionView) node.getTransform()),
+ "Found more then one instance of a CreatePCollectionView when "
+ + "attempting to replace %s, found [%s, %s]",
+ replacement, viewTransformRef.get(), node.getTransform());
+ }
+ }
+
+ @Override
+ public void leaveCompositeTransform(Node node) {
+ if (transform.getTransform() == node.getTransform()) {
+ tracking = false;
+ }
+ }
+ });
+
+ checkState(viewTransformRef.get() != null,
+ "Expected to find CreatePCollectionView contained within %s",
+ transform.getTransform());
+ PTransform<PCollection<InputT>, PCollectionView<ViewT>> rep =
+ InstanceBuilder.ofType(replacement)
+ .withArg(DataflowRunner.class, runner)
+ .withArg(CreatePCollectionView.class, viewTransformRef.get())
+ .build();
+ return PTransformReplacement.of(PTransformReplacements.getSingletonMainInput(transform), (PTransform) rep);
+ }
+
+ @Override
+ public Map<PValue, ReplacementOutput> mapOutputs(Map<TupleTag<?>, PValue> outputs, PValue newOutput) {
+ // We do not replace any of the outputs because we expect that the new PTransform will re-use the original
+ // PCollectionView that was returned.
+ return ImmutableMap.of();
+ }
+ }
+
private static class ReflectiveOneToOneOverrideFactory<
InputT, OutputT, TransformT extends PTransform<PCollection<InputT>, PCollection<OutputT>>>
extends SingleInputOutputOverrideFactory<
@@ -1258,19 +1334,21 @@ public class DataflowRunner extends PipelineRunner<DataflowPipelineJob> {
*/
private static class Deduplicate<T>
extends PTransform<PCollection<ValueWithRecordId<T>>, PCollection<T>> {
+
// Use a finite set of keys to improve bundling. Without this, the key space
// will be the space of ids which is potentially very large, which results in much
// more per-key overhead.
private static final int NUM_RESHARD_KEYS = 10000;
+
@Override
public PCollection<T> expand(PCollection<ValueWithRecordId<T>> input) {
return input
.apply(WithKeys.of(new SerializableFunction<ValueWithRecordId<T>, Integer>() {
- @Override
- public Integer apply(ValueWithRecordId<T> value) {
- return Arrays.hashCode(value.getId()) % NUM_RESHARD_KEYS;
- }
- }))
+ @Override
+ public Integer apply(ValueWithRecordId<T> value) {
+ return Arrays.hashCode(value.getId()) % NUM_RESHARD_KEYS;
+ }
+ }))
// Reshuffle will dedup based on ids in ValueWithRecordId by passing the data through
// WindmillSink.
.apply(Reshuffle.<Integer, ValueWithRecordId<T>>of())
http://git-wip-us.apache.org/repos/asf/beam/blob/c2f815c5/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/StreamingViewOverrides.java
----------------------------------------------------------------------
diff --git a/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/StreamingViewOverrides.java b/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/StreamingViewOverrides.java
index 1853248..69099c6 100644
--- a/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/StreamingViewOverrides.java
+++ b/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/StreamingViewOverrides.java
@@ -68,7 +68,7 @@ class StreamingViewOverrides {
return input
.apply(Combine.globally(new Concatenate<ElemT>()).withoutDefaults())
.apply(ParDo.of(StreamingPCollectionViewWriterFn.create(view, input.getCoder())))
- .apply(CreateDataflowView.<ElemT, ViewT>of(view));
+ .apply(CreateDataflowView.<ElemT, ViewT>forStreaming(view));
}
}
}
http://git-wip-us.apache.org/repos/asf/beam/blob/c2f815c5/runners/google-cloud-dataflow-java/src/test/java/org/apache/beam/runners/dataflow/DataflowPipelineTranslatorTest.java
----------------------------------------------------------------------
diff --git a/runners/google-cloud-dataflow-java/src/test/java/org/apache/beam/runners/dataflow/DataflowPipelineTranslatorTest.java b/runners/google-cloud-dataflow-java/src/test/java/org/apache/beam/runners/dataflow/DataflowPipelineTranslatorTest.java
index cc43e27..e03abb9 100644
--- a/runners/google-cloud-dataflow-java/src/test/java/org/apache/beam/runners/dataflow/DataflowPipelineTranslatorTest.java
+++ b/runners/google-cloud-dataflow-java/src/test/java/org/apache/beam/runners/dataflow/DataflowPipelineTranslatorTest.java
@@ -991,15 +991,15 @@ public class DataflowPipelineTranslatorTest implements Serializable {
assertAllStepOutputsHaveUniqueIds(job);
List<Step> steps = job.getSteps();
- assertEquals(10, steps.size());
+ assertEquals(5, steps.size());
@SuppressWarnings("unchecked")
List<Map<String, Object>> toIsmRecordOutputs =
- (List<Map<String, Object>>) steps.get(8).getProperties().get(PropertyNames.OUTPUT_INFO);
+ (List<Map<String, Object>>) steps.get(3).getProperties().get(PropertyNames.OUTPUT_INFO);
assertTrue(
Structs.getBoolean(Iterables.getOnlyElement(toIsmRecordOutputs), "use_indexed_format"));
- Step collectionToSingletonStep = steps.get(9);
+ Step collectionToSingletonStep = steps.get(4);
assertEquals("CollectionToSingleton", collectionToSingletonStep.getKind());
}
@@ -1023,16 +1023,16 @@ public class DataflowPipelineTranslatorTest implements Serializable {
assertAllStepOutputsHaveUniqueIds(job);
List<Step> steps = job.getSteps();
- assertEquals(4, steps.size());
+ assertEquals(3, steps.size());
@SuppressWarnings("unchecked")
List<Map<String, Object>> toIsmRecordOutputs =
- (List<Map<String, Object>>) steps.get(2).getProperties().get(PropertyNames.OUTPUT_INFO);
+ (List<Map<String, Object>>) steps.get(1).getProperties().get(PropertyNames.OUTPUT_INFO);
assertTrue(
Structs.getBoolean(Iterables.getOnlyElement(toIsmRecordOutputs), "use_indexed_format"));
- Step collectionToSingletonStep = steps.get(3);
+ Step collectionToSingletonStep = steps.get(2);
assertEquals("CollectionToSingleton", collectionToSingletonStep.getKind());
}