You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@beam.apache.org by lc...@apache.org on 2017/11/15 17:00:36 UTC

[2/7] beam git commit: Replace the View.As transforms for Dataflow batch because the entire implementation is specialized.

Replace the View.As transforms for Dataflow batch because the entire implementation is specialized.


Project: http://git-wip-us.apache.org/repos/asf/beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/beam/commit/c2f815c5
Tree: http://git-wip-us.apache.org/repos/asf/beam/tree/c2f815c5
Diff: http://git-wip-us.apache.org/repos/asf/beam/diff/c2f815c5

Branch: refs/heads/master
Commit: c2f815c581cf5d6cd72b600cef42f436c9702d85
Parents: d5a8aea
Author: Luke Cwik <lc...@google.com>
Authored: Tue Nov 14 07:47:53 2017 -0800
Committer: Luke Cwik <lc...@google.com>
Committed: Wed Nov 15 08:59:51 2017 -0800

----------------------------------------------------------------------
 .../runners/dataflow/BatchViewOverrides.java    |  53 ++++-----
 .../runners/dataflow/CreateDataflowView.java    |  19 +++-
 .../beam/runners/dataflow/DataflowRunner.java   | 110 ++++++++++++++++---
 .../dataflow/StreamingViewOverrides.java        |   2 +-
 .../DataflowPipelineTranslatorTest.java         |  12 +-
 5 files changed, 142 insertions(+), 54 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/beam/blob/c2f815c5/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/BatchViewOverrides.java
----------------------------------------------------------------------
diff --git a/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/BatchViewOverrides.java b/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/BatchViewOverrides.java
index 9a77b4b..8ed41cb 100644
--- a/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/BatchViewOverrides.java
+++ b/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/BatchViewOverrides.java
@@ -116,7 +116,7 @@ class BatchViewOverrides {
    * a warning to users to specify a deterministic key coder.
    */
   static class BatchViewAsMap<K, V>
-      extends PTransform<PCollection<KV<K, V>>, PCollectionView<Map<K, V>>> {
+      extends PTransform<PCollection<KV<K, V>>, PCollection<?>> {
 
     /**
      * A {@link DoFn} which groups elements by window boundaries. For each group,
@@ -193,11 +193,11 @@ class BatchViewOverrides {
     }
 
     @Override
-    public PCollectionView<Map<K, V>> expand(PCollection<KV<K, V>> input) {
+    public PCollection<?> expand(PCollection<KV<K, V>> input) {
       return this.<BoundedWindow>applyInternal(input);
     }
 
-    private <W extends BoundedWindow> PCollectionView<Map<K, V>>
+    private <W extends BoundedWindow> PCollection<?>
     applyInternal(PCollection<KV<K, V>> input) {
       try {
         return BatchViewAsMultimap.applyForMapLike(runner, input, view, true /* unique keys */);
@@ -216,7 +216,7 @@ class BatchViewOverrides {
     }
 
     /** Transforms the input {@link PCollection} into a singleton {@link Map} per window. */
-    private <W extends BoundedWindow> PCollectionView<Map<K, V>>
+    private <W extends BoundedWindow> PCollection<?>
     applyForSingletonFallback(PCollection<KV<K, V>> input) {
       @SuppressWarnings("unchecked")
       Coder<W> windowCoder = (Coder<W>)
@@ -280,7 +280,7 @@ class BatchViewOverrides {
    * a warning to users to specify a deterministic key coder.
    */
   static class BatchViewAsMultimap<K, V>
-      extends PTransform<PCollection<KV<K, V>>, PCollectionView<Map<K, Iterable<V>>>> {
+      extends PTransform<PCollection<KV<K, V>>, PCollection<?>> {
     /**
      * A {@link PTransform} that groups elements by the hash of window's byte representation
      * if the input {@link PCollection} is not within the global window. Otherwise by the hash
@@ -672,11 +672,11 @@ class BatchViewOverrides {
     }
 
     @Override
-    public PCollectionView<Map<K, Iterable<V>>> expand(PCollection<KV<K, V>> input) {
+    public PCollection<?> expand(PCollection<KV<K, V>> input) {
       return this.<BoundedWindow>applyInternal(input);
     }
 
-    private <W extends BoundedWindow> PCollectionView<Map<K, Iterable<V>>>
+    private <W extends BoundedWindow> PCollection<?>
     applyInternal(PCollection<KV<K, V>> input) {
       try {
         return applyForMapLike(runner, input, view, false /* unique keys not expected */);
@@ -690,7 +690,7 @@ class BatchViewOverrides {
     }
 
     /** Transforms the input {@link PCollection} into a singleton {@link Map} per window. */
-    private <W extends BoundedWindow> PCollectionView<Map<K, Iterable<V>>>
+    private <W extends BoundedWindow> PCollection<?>
     applyForSingletonFallback(PCollection<KV<K, V>> input) {
       @SuppressWarnings("unchecked")
       Coder<W> windowCoder = (Coder<W>)
@@ -727,7 +727,7 @@ class BatchViewOverrides {
                   view);
     }
 
-    private static <K, V, W extends BoundedWindow, ViewT> PCollectionView<ViewT> applyForMapLike(
+    private static <K, V, W extends BoundedWindow, ViewT> PCollection<?> applyForMapLike(
         DataflowRunner runner,
         PCollection<KV<K, V>> input,
         PCollectionView<ViewT> view,
@@ -804,9 +804,10 @@ class BatchViewOverrides {
           PCollectionList.of(ImmutableList.of(
               perHashWithReifiedWindows, windowMapSizeMetadata, windowMapKeysMetadata));
 
-      Pipeline.applyTransform(outputs, Flatten.<IsmRecord<WindowedValue<V>>>pCollections())
-          .apply(CreateDataflowView.<IsmRecord<WindowedValue<V>>, ViewT>of(view));
-      return view;
+      PCollection<IsmRecord<WindowedValue<V>>> flattenedOutputs =
+          Pipeline.applyTransform(outputs, Flatten.<IsmRecord<WindowedValue<V>>>pCollections());
+      flattenedOutputs.apply(CreateDataflowView.<IsmRecord<WindowedValue<V>>, ViewT>forBatch(view));
+      return flattenedOutputs;
     }
 
     @Override
@@ -843,7 +844,7 @@ class BatchViewOverrides {
    * </ul>
    */
   static class BatchViewAsSingleton<T>
-      extends PTransform<PCollection<T>, PCollectionView<T>> {
+      extends PTransform<PCollection<T>, PCollection<?>> {
 
     /**
      * A {@link DoFn} that outputs {@link IsmRecord}s. These records are structured as follows:
@@ -900,7 +901,7 @@ class BatchViewOverrides {
     }
 
     @Override
-    public PCollectionView<T> expand(PCollection<T> input) {
+    public PCollection<?> expand(PCollection<T> input) {
       @SuppressWarnings("unchecked")
       Coder<BoundedWindow> windowCoder = (Coder<BoundedWindow>)
           input.getWindowingStrategy().getWindowFn().windowCoder();
@@ -913,7 +914,7 @@ class BatchViewOverrides {
           view);
     }
 
-    static <T, FinalT, ViewT, W extends BoundedWindow> PCollectionView<ViewT>
+    static <T, FinalT, ViewT, W extends BoundedWindow> PCollection<?>
     applyForSingleton(
         DataflowRunner runner,
         PCollection<T> input,
@@ -936,8 +937,8 @@ class BatchViewOverrides {
 
       runner.addPCollectionRequiringIndexedFormat(reifiedPerWindowAndSorted);
       reifiedPerWindowAndSorted.apply(
-          CreateDataflowView.<IsmRecord<WindowedValue<FinalT>>, ViewT>of(view));
-      return view;
+          CreateDataflowView.<IsmRecord<WindowedValue<FinalT>>, ViewT>forBatch(view));
+      return reifiedPerWindowAndSorted;
     }
 
     @Override
@@ -969,7 +970,7 @@ class BatchViewOverrides {
    * </ul>
    */
   static class BatchViewAsList<T>
-      extends PTransform<PCollection<T>, PCollectionView<List<T>>> {
+      extends PTransform<PCollection<T>, PCollection<?>> {
     /**
      * A {@link DoFn} which creates {@link IsmRecord}s assuming that each element is within the
      * global window. Each {@link IsmRecord} has
@@ -1054,11 +1055,11 @@ class BatchViewOverrides {
     }
 
     @Override
-    public PCollectionView<List<T>> expand(PCollection<T> input) {
+    public PCollection<?> expand(PCollection<T> input) {
       return applyForIterableLike(runner, input, view);
     }
 
-    static <T, W extends BoundedWindow, ViewT> PCollectionView<ViewT> applyForIterableLike(
+    static <T, W extends BoundedWindow, ViewT> PCollection<?> applyForIterableLike(
         DataflowRunner runner,
         PCollection<T> input,
         PCollectionView<ViewT> view) {
@@ -1081,8 +1082,8 @@ class BatchViewOverrides {
 
         runner.addPCollectionRequiringIndexedFormat(reifiedPerWindowAndSorted);
         reifiedPerWindowAndSorted.apply(
-            CreateDataflowView.<IsmRecord<WindowedValue<T>>, ViewT>of(view));
-        return view;
+            CreateDataflowView.<IsmRecord<WindowedValue<T>>, ViewT>forBatch(view));
+        return reifiedPerWindowAndSorted;
       }
 
       PCollection<IsmRecord<WindowedValue<T>>> reifiedPerWindowAndSorted = input
@@ -1092,8 +1093,8 @@ class BatchViewOverrides {
 
       runner.addPCollectionRequiringIndexedFormat(reifiedPerWindowAndSorted);
       reifiedPerWindowAndSorted.apply(
-          CreateDataflowView.<IsmRecord<WindowedValue<T>>, ViewT>of(view));
-      return view;
+          CreateDataflowView.<IsmRecord<WindowedValue<T>>, ViewT>forBatch(view));
+      return reifiedPerWindowAndSorted;
     }
 
     @Override
@@ -1127,7 +1128,7 @@ class BatchViewOverrides {
    * </ul>
    */
   static class BatchViewAsIterable<T>
-      extends PTransform<PCollection<T>, PCollectionView<Iterable<T>>> {
+      extends PTransform<PCollection<T>, PCollection<?>> {
 
     private final DataflowRunner runner;
     private final PCollectionView<Iterable<T>> view;
@@ -1140,7 +1141,7 @@ class BatchViewOverrides {
     }
 
     @Override
-    public PCollectionView<Iterable<T>> expand(PCollection<T> input) {
+    public PCollection<?> expand(PCollection<T> input) {
       return BatchViewAsList.applyForIterableLike(runner, input, view);
     }
   }

http://git-wip-us.apache.org/repos/asf/beam/blob/c2f815c5/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/CreateDataflowView.java
----------------------------------------------------------------------
diff --git a/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/CreateDataflowView.java b/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/CreateDataflowView.java
index 3b01d69..10888c2 100644
--- a/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/CreateDataflowView.java
+++ b/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/CreateDataflowView.java
@@ -25,20 +25,29 @@ import org.apache.beam.sdk.values.PCollectionView;
 /** A {@link DataflowRunner} marker class for creating a {@link PCollectionView}. */
 public class CreateDataflowView<ElemT, ViewT>
     extends PTransform<PCollection<ElemT>, PCollection<ElemT>> {
-  public static <ElemT, ViewT> CreateDataflowView<ElemT, ViewT> of(PCollectionView<ViewT> view) {
-    return new CreateDataflowView<>(view);
+  public static <ElemT, ViewT> CreateDataflowView<ElemT, ViewT> forBatch(PCollectionView<ViewT> view) {
+    return new CreateDataflowView<>(view, false);
+  }
+
+  public static <ElemT, ViewT> CreateDataflowView<ElemT, ViewT> forStreaming(PCollectionView<ViewT> view) {
+    return new CreateDataflowView<>(view, true);
   }
 
   private final PCollectionView<ViewT> view;
+  private final boolean streaming;
 
-  private CreateDataflowView(PCollectionView<ViewT> view) {
+  private CreateDataflowView(PCollectionView<ViewT> view, boolean streaming) {
     this.view = view;
+    this.streaming = streaming;
   }
 
   @Override
   public PCollection<ElemT> expand(PCollection<ElemT> input) {
-    return PCollection.createPrimitiveOutputInternal(
-        input.getPipeline(), input.getWindowingStrategy(), input.isBounded(), input.getCoder());
+    if (streaming) {
+      return PCollection.createPrimitiveOutputInternal(
+          input.getPipeline(), input.getWindowingStrategy(), input.isBounded(), input.getCoder());
+    }
+    return (PCollection) view.getPCollection();
   }
 
   public PCollectionView<ViewT> getView() {

http://git-wip-us.apache.org/repos/asf/beam/blob/c2f815c5/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/DataflowRunner.java
----------------------------------------------------------------------
diff --git a/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/DataflowRunner.java b/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/DataflowRunner.java
index 0a20a0f..72e4f83 100644
--- a/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/DataflowRunner.java
+++ b/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/DataflowRunner.java
@@ -58,6 +58,7 @@ import java.util.Random;
 import java.util.Set;
 import java.util.SortedSet;
 import java.util.TreeSet;
+import java.util.concurrent.atomic.AtomicReference;
 import org.apache.beam.model.pipeline.v1.RunnerApi;
 import org.apache.beam.runners.core.construction.CoderTranslation;
 import org.apache.beam.runners.core.construction.DeduplicatedFlattenFactory;
@@ -124,6 +125,7 @@ import org.apache.beam.sdk.transforms.Reshuffle;
 import org.apache.beam.sdk.transforms.SerializableFunction;
 import org.apache.beam.sdk.transforms.SimpleFunction;
 import org.apache.beam.sdk.transforms.View;
+import org.apache.beam.sdk.transforms.View.CreatePCollectionView;
 import org.apache.beam.sdk.transforms.WithKeys;
 import org.apache.beam.sdk.transforms.display.DisplayData;
 import org.apache.beam.sdk.transforms.reflect.DoFnSignature;
@@ -139,7 +141,6 @@ import org.apache.beam.sdk.values.PBegin;
 import org.apache.beam.sdk.values.PCollection;
 import org.apache.beam.sdk.values.PCollection.IsBounded;
 import org.apache.beam.sdk.values.PCollectionView;
-import org.apache.beam.sdk.values.PCollectionViews;
 import org.apache.beam.sdk.values.PDone;
 import org.apache.beam.sdk.values.PInput;
 import org.apache.beam.sdk.values.PValue;
@@ -395,28 +396,28 @@ public class DataflowRunner extends PipelineRunner<DataflowPipelineJob> {
                   BatchStatefulParDoOverrides.singleOutputOverrideFactory(options)))
           .add(
               PTransformOverride.of(
-                  PTransformMatchers.createViewWithViewFn(PCollectionViews.MapViewFn.class),
-                  new ReflectiveOneToOneOverrideFactory(
+                  PTransformMatchers.classEqualTo(View.AsMap.class),
+                  new ReflectiveViewOverrideFactory(
                       BatchViewOverrides.BatchViewAsMap.class, this)))
           .add(
               PTransformOverride.of(
-                  PTransformMatchers.createViewWithViewFn(PCollectionViews.MultimapViewFn.class),
-                  new ReflectiveOneToOneOverrideFactory(
+                  PTransformMatchers.classEqualTo(View.AsMultimap.class),
+                  new ReflectiveViewOverrideFactory(
                       BatchViewOverrides.BatchViewAsMultimap.class, this)))
           .add(
               PTransformOverride.of(
-                  PTransformMatchers.createViewWithViewFn(PCollectionViews.SingletonViewFn.class),
-                  new ReflectiveOneToOneOverrideFactory(
+                  PTransformMatchers.classEqualTo(View.AsSingleton.class),
+                  new ReflectiveViewOverrideFactory(
                       BatchViewOverrides.BatchViewAsSingleton.class, this)))
           .add(
               PTransformOverride.of(
-                  PTransformMatchers.createViewWithViewFn(PCollectionViews.ListViewFn.class),
-                  new ReflectiveOneToOneOverrideFactory(
+                  PTransformMatchers.classEqualTo(View.AsList.class),
+                  new ReflectiveViewOverrideFactory(
                       BatchViewOverrides.BatchViewAsList.class, this)))
           .add(
               PTransformOverride.of(
-                  PTransformMatchers.createViewWithViewFn(PCollectionViews.IterableViewFn.class),
-                  new ReflectiveOneToOneOverrideFactory(
+                  PTransformMatchers.classEqualTo(View.AsIterable.class),
+                  new ReflectiveViewOverrideFactory(
                       BatchViewOverrides.BatchViewAsIterable.class, this)));
     }
     overridesBuilder
@@ -435,6 +436,81 @@ public class DataflowRunner extends PipelineRunner<DataflowPipelineJob> {
     return overridesBuilder.build();
   }
 
+  /**
+   * Replace the View.AsYYY transform with specialized view overrides for Dataflow. It is required that the
+   * new replacement transform uses the supplied PCollectionView and does not create another instance.
+   */
+  private static class ReflectiveViewOverrideFactory<InputT, ViewT>
+      implements PTransformOverrideFactory<PCollection<InputT>,
+      PValue, PTransform<PCollection<InputT>, PValue>> {
+
+    private final Class<PTransform<PCollection<InputT>, PCollectionView<ViewT>>> replacement;
+    private final DataflowRunner runner;
+
+    private ReflectiveViewOverrideFactory(
+        Class<PTransform<PCollection<InputT>, PCollectionView<ViewT>>> replacement,
+        DataflowRunner runner) {
+      this.replacement = replacement;
+      this.runner = runner;
+    }
+
+    @Override
+    public PTransformReplacement<PCollection<InputT>, PValue> getReplacementTransform(
+         final AppliedPTransform<PCollection<InputT>,
+             PValue,
+             PTransform<PCollection<InputT>, PValue>> transform) {
+
+      final AtomicReference<CreatePCollectionView> viewTransformRef = new AtomicReference<>();
+      transform.getPipeline().traverseTopologically(new PipelineVisitor.Defaults() {
+        // Stores whether we have entered the expected composite view transform.
+        private boolean tracking = false;
+
+        @Override
+        public CompositeBehavior enterCompositeTransform(Node node) {
+          if (transform.getTransform() == node.getTransform()) {
+            tracking = true;
+          }
+          return super.enterCompositeTransform(node);
+        }
+
+        @Override
+        public void visitPrimitiveTransform(Node node) {
+          if (tracking && node.getTransform() instanceof CreatePCollectionView) {
+            checkState(
+                viewTransformRef.compareAndSet(null, (CreatePCollectionView) node.getTransform()),
+                "Found more then one instance of a CreatePCollectionView when "
+                    + "attempting to replace %s, found [%s, %s]",
+                replacement, viewTransformRef.get(), node.getTransform());
+          }
+        }
+
+        @Override
+        public void leaveCompositeTransform(Node node) {
+          if (transform.getTransform() == node.getTransform()) {
+            tracking = false;
+          }
+        }
+      });
+
+      checkState(viewTransformRef.get() != null,
+          "Expected to find CreatePCollectionView contained within %s",
+          transform.getTransform());
+      PTransform<PCollection<InputT>, PCollectionView<ViewT>> rep =
+          InstanceBuilder.ofType(replacement)
+              .withArg(DataflowRunner.class, runner)
+              .withArg(CreatePCollectionView.class, viewTransformRef.get())
+              .build();
+      return PTransformReplacement.of(PTransformReplacements.getSingletonMainInput(transform), (PTransform) rep);
+    }
+
+    @Override
+    public Map<PValue, ReplacementOutput> mapOutputs(Map<TupleTag<?>, PValue> outputs, PValue newOutput) {
+      // We do not replace any of the outputs because we expect that the new PTransform will re-use the original
+      // PCollectionView that was returned.
+      return ImmutableMap.of();
+    }
+  }
+
   private static class ReflectiveOneToOneOverrideFactory<
           InputT, OutputT, TransformT extends PTransform<PCollection<InputT>, PCollection<OutputT>>>
       extends SingleInputOutputOverrideFactory<
@@ -1258,19 +1334,21 @@ public class DataflowRunner extends PipelineRunner<DataflowPipelineJob> {
    */
   private static class Deduplicate<T>
       extends PTransform<PCollection<ValueWithRecordId<T>>, PCollection<T>> {
+
     // Use a finite set of keys to improve bundling.  Without this, the key space
     // will be the space of ids which is potentially very large, which results in much
     // more per-key overhead.
     private static final int NUM_RESHARD_KEYS = 10000;
+
     @Override
     public PCollection<T> expand(PCollection<ValueWithRecordId<T>> input) {
       return input
           .apply(WithKeys.of(new SerializableFunction<ValueWithRecordId<T>, Integer>() {
-                    @Override
-                    public Integer apply(ValueWithRecordId<T> value) {
-                      return Arrays.hashCode(value.getId()) % NUM_RESHARD_KEYS;
-                    }
-                  }))
+            @Override
+            public Integer apply(ValueWithRecordId<T> value) {
+              return Arrays.hashCode(value.getId()) % NUM_RESHARD_KEYS;
+            }
+          }))
           // Reshuffle will dedup based on ids in ValueWithRecordId by passing the data through
           // WindmillSink.
           .apply(Reshuffle.<Integer, ValueWithRecordId<T>>of())

http://git-wip-us.apache.org/repos/asf/beam/blob/c2f815c5/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/StreamingViewOverrides.java
----------------------------------------------------------------------
diff --git a/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/StreamingViewOverrides.java b/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/StreamingViewOverrides.java
index 1853248..69099c6 100644
--- a/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/StreamingViewOverrides.java
+++ b/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/StreamingViewOverrides.java
@@ -68,7 +68,7 @@ class StreamingViewOverrides {
         return input
             .apply(Combine.globally(new Concatenate<ElemT>()).withoutDefaults())
             .apply(ParDo.of(StreamingPCollectionViewWriterFn.create(view, input.getCoder())))
-            .apply(CreateDataflowView.<ElemT, ViewT>of(view));
+            .apply(CreateDataflowView.<ElemT, ViewT>forStreaming(view));
       }
     }
   }

http://git-wip-us.apache.org/repos/asf/beam/blob/c2f815c5/runners/google-cloud-dataflow-java/src/test/java/org/apache/beam/runners/dataflow/DataflowPipelineTranslatorTest.java
----------------------------------------------------------------------
diff --git a/runners/google-cloud-dataflow-java/src/test/java/org/apache/beam/runners/dataflow/DataflowPipelineTranslatorTest.java b/runners/google-cloud-dataflow-java/src/test/java/org/apache/beam/runners/dataflow/DataflowPipelineTranslatorTest.java
index cc43e27..e03abb9 100644
--- a/runners/google-cloud-dataflow-java/src/test/java/org/apache/beam/runners/dataflow/DataflowPipelineTranslatorTest.java
+++ b/runners/google-cloud-dataflow-java/src/test/java/org/apache/beam/runners/dataflow/DataflowPipelineTranslatorTest.java
@@ -991,15 +991,15 @@ public class DataflowPipelineTranslatorTest implements Serializable {
     assertAllStepOutputsHaveUniqueIds(job);
 
     List<Step> steps = job.getSteps();
-    assertEquals(10, steps.size());
+    assertEquals(5, steps.size());
 
     @SuppressWarnings("unchecked")
     List<Map<String, Object>> toIsmRecordOutputs =
-        (List<Map<String, Object>>) steps.get(8).getProperties().get(PropertyNames.OUTPUT_INFO);
+        (List<Map<String, Object>>) steps.get(3).getProperties().get(PropertyNames.OUTPUT_INFO);
     assertTrue(
         Structs.getBoolean(Iterables.getOnlyElement(toIsmRecordOutputs), "use_indexed_format"));
 
-    Step collectionToSingletonStep = steps.get(9);
+    Step collectionToSingletonStep = steps.get(4);
     assertEquals("CollectionToSingleton", collectionToSingletonStep.getKind());
   }
 
@@ -1023,16 +1023,16 @@ public class DataflowPipelineTranslatorTest implements Serializable {
     assertAllStepOutputsHaveUniqueIds(job);
 
     List<Step> steps = job.getSteps();
-    assertEquals(4, steps.size());
+    assertEquals(3, steps.size());
 
     @SuppressWarnings("unchecked")
     List<Map<String, Object>> toIsmRecordOutputs =
-        (List<Map<String, Object>>) steps.get(2).getProperties().get(PropertyNames.OUTPUT_INFO);
+        (List<Map<String, Object>>) steps.get(1).getProperties().get(PropertyNames.OUTPUT_INFO);
     assertTrue(
         Structs.getBoolean(Iterables.getOnlyElement(toIsmRecordOutputs), "use_indexed_format"));
 
 
-    Step collectionToSingletonStep = steps.get(3);
+    Step collectionToSingletonStep = steps.get(2);
     assertEquals("CollectionToSingleton", collectionToSingletonStep.getKind());
   }