You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@beam.apache.org by lc...@apache.org on 2018/11/12 22:09:05 UTC

[beam] branch master updated: [BEAM-6037] Make Spark runner pipeline translation based on URNs (#7005)

This is an automated email from the ASF dual-hosted git repository.

lcwik pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/beam.git


The following commit(s) were added to refs/heads/master by this push:
     new 9718c3e  [BEAM-6037] Make Spark runner pipeline translation based on URNs (#7005)
9718c3e is described below

commit 9718c3e6822b204a99e0d7b5e9964f4276a75656
Author: Ismaël Mejía <ie...@gmail.com>
AuthorDate: Mon Nov 12 23:08:58 2018 +0100

    [BEAM-6037] Make Spark runner pipeline translation based on URNs (#7005)
---
 .../core/construction/PTransformTranslation.java   |  3 +
 .../runners/spark/SparkNativePipelineVisitor.java  |  4 +-
 .../org/apache/beam/runners/spark/SparkRunner.java | 22 +++---
 .../apache/beam/runners/spark/io/ConsoleIO.java    |  1 +
 .../apache/beam/runners/spark/io/CreateStream.java |  1 +
 .../spark/translation/SparkPipelineTranslator.java |  6 +-
 .../spark/translation/TransformTranslator.java     | 47 ++++++------
 .../streaming/StreamingTransformTranslator.java    | 85 ++++++++++++++++------
 .../runners/spark/SparkRunnerDebuggerTest.java     |  4 +-
 9 files changed, 110 insertions(+), 63 deletions(-)

diff --git a/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/PTransformTranslation.java b/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/PTransformTranslation.java
index bf4ffbd..86a414c 100644
--- a/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/PTransformTranslation.java
+++ b/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/PTransformTranslation.java
@@ -38,6 +38,7 @@ import javax.annotation.Nullable;
 import org.apache.beam.model.pipeline.v1.RunnerApi;
 import org.apache.beam.model.pipeline.v1.RunnerApi.FunctionSpec;
 import org.apache.beam.model.pipeline.v1.RunnerApi.StandardPTransforms;
+import org.apache.beam.model.pipeline.v1.RunnerApi.StandardPTransforms.CombineComponents;
 import org.apache.beam.model.pipeline.v1.RunnerApi.StandardPTransforms.SplittableParDoComponents;
 import org.apache.beam.runners.core.construction.ParDoTranslation.ParDoTranslator;
 import org.apache.beam.sdk.Pipeline;
@@ -88,6 +89,8 @@ public class PTransformTranslation {
       getUrn(StandardPTransforms.Composites.COMBINE_PER_KEY);
   public static final String COMBINE_GLOBALLY_TRANSFORM_URN =
       getUrn(StandardPTransforms.Composites.COMBINE_GLOBALLY);
+  public static final String COMBINE_GROUPED_VALUES_TRANSFORM_URN =
+      getUrn(CombineComponents.COMBINE_GROUPED_VALUES);
   public static final String RESHUFFLE_URN = getUrn(StandardPTransforms.Composites.RESHUFFLE);
   public static final String WRITE_FILES_TRANSFORM_URN =
       getUrn(StandardPTransforms.Composites.WRITE_FILES);
diff --git a/runners/spark/src/main/java/org/apache/beam/runners/spark/SparkNativePipelineVisitor.java b/runners/spark/src/main/java/org/apache/beam/runners/spark/SparkNativePipelineVisitor.java
index 8e2f917..4662d81 100644
--- a/runners/spark/src/main/java/org/apache/beam/runners/spark/SparkNativePipelineVisitor.java
+++ b/runners/spark/src/main/java/org/apache/beam/runners/spark/SparkNativePipelineVisitor.java
@@ -91,9 +91,7 @@ public class SparkNativePipelineVisitor extends SparkRunner.Evaluator {
     @SuppressWarnings("unchecked")
     TransformT transform = (TransformT) node.getTransform();
     @SuppressWarnings("unchecked")
-    Class<TransformT> transformClass = (Class<TransformT>) transform.getClass();
-    @SuppressWarnings("unchecked")
-    TransformEvaluator<TransformT> evaluator = translate(node, transform, transformClass);
+    TransformEvaluator<TransformT> evaluator = translate(node, transform);
     if (shouldDebug(node)) {
       transforms.add(new NativeTransform(node, evaluator, transform, false));
     }
diff --git a/runners/spark/src/main/java/org/apache/beam/runners/spark/SparkRunner.java b/runners/spark/src/main/java/org/apache/beam/runners/spark/SparkRunner.java
index ca86bde..1eeb1c1 100644
--- a/runners/spark/src/main/java/org/apache/beam/runners/spark/SparkRunner.java
+++ b/runners/spark/src/main/java/org/apache/beam/runners/spark/SparkRunner.java
@@ -374,13 +374,11 @@ public final class SparkRunner extends PipelineRunner<SparkPipelineResult> {
 
     @Override
     public CompositeBehavior enterCompositeTransform(TransformHierarchy.Node node) {
-      if (node.getTransform() != null) {
-        @SuppressWarnings("unchecked")
-        Class<PTransform<?, ?>> transformClass =
-            (Class<PTransform<?, ?>>) node.getTransform().getClass();
-        if (translator.hasTranslation(transformClass) && !shouldDefer(node)) {
+      PTransform<?, ?> transform = node.getTransform();
+      if (transform != null) {
+        if (translator.hasTranslation(transform) && !shouldDefer(node)) {
           LOG.info("Entering directly-translatable composite transform: '{}'", node.getFullName());
-          LOG.debug("Composite transform class: '{}'", transformClass);
+          LOG.debug("Composite transform class: '{}'", transform);
           doVisitTransform(node);
           return CompositeBehavior.DO_NOT_ENTER_TRANSFORM;
         }
@@ -433,9 +431,7 @@ public final class SparkRunner extends PipelineRunner<SparkPipelineResult> {
       @SuppressWarnings("unchecked")
       TransformT transform = (TransformT) node.getTransform();
       @SuppressWarnings("unchecked")
-      Class<TransformT> transformClass = (Class<TransformT>) (Class<?>) transform.getClass();
-      @SuppressWarnings("unchecked")
-      TransformEvaluator<TransformT> evaluator = translate(node, transform, transformClass);
+      TransformEvaluator<TransformT> evaluator = translate(node, transform);
       LOG.info("Evaluating {}", transform);
       AppliedPTransform<?, ?, ?> appliedTransform = node.toAppliedPTransform(getPipeline());
       ctxt.setCurrentTransform(appliedTransform);
@@ -449,8 +445,8 @@ public final class SparkRunner extends PipelineRunner<SparkPipelineResult> {
      */
     protected <TransformT extends PTransform<? super PInput, POutput>>
         TransformEvaluator<TransformT> translate(
-            TransformHierarchy.Node node, TransformT transform, Class<TransformT> transformClass) {
-      //--- determine if node is bounded/unbounded.
+            TransformHierarchy.Node node, TransformT transform) {
+      // --- determine if node is bounded/unbounded.
       // usually, the input determines if the PCollection to apply the next transformation to
       // is BOUNDED or UNBOUNDED, meaning RDD/DStream.
       Map<TupleTag<?>, PValue> pValues;
@@ -464,8 +460,8 @@ public final class SparkRunner extends PipelineRunner<SparkPipelineResult> {
       // translate accordingly.
       LOG.debug("Translating {} as {}", transform, isNodeBounded);
       return isNodeBounded.equals(PCollection.IsBounded.BOUNDED)
-          ? translator.translateBounded(transformClass)
-          : translator.translateUnbounded(transformClass);
+          ? translator.translateBounded(transform)
+          : translator.translateUnbounded(transform);
     }
 
     protected PCollection.IsBounded isBoundedCollection(Collection<PValue> pValues) {
diff --git a/runners/spark/src/main/java/org/apache/beam/runners/spark/io/ConsoleIO.java b/runners/spark/src/main/java/org/apache/beam/runners/spark/io/ConsoleIO.java
index dd15712..bad6dfb 100644
--- a/runners/spark/src/main/java/org/apache/beam/runners/spark/io/ConsoleIO.java
+++ b/runners/spark/src/main/java/org/apache/beam/runners/spark/io/ConsoleIO.java
@@ -49,6 +49,7 @@ public final class ConsoleIO {
      * @param <T> the type of the elements in the {@link PCollection}
      */
     public static class Unbound<T> extends PTransform<PCollection<T>, PDone> {
+      public static final String TRANSFORM_URN = "beam:transform:spark:consoleio_write_unbound:v1";
 
       private final int num;
 
diff --git a/runners/spark/src/main/java/org/apache/beam/runners/spark/io/CreateStream.java b/runners/spark/src/main/java/org/apache/beam/runners/spark/io/CreateStream.java
index 9d58eec..b352a3a 100644
--- a/runners/spark/src/main/java/org/apache/beam/runners/spark/io/CreateStream.java
+++ b/runners/spark/src/main/java/org/apache/beam/runners/spark/io/CreateStream.java
@@ -87,6 +87,7 @@ import org.joda.time.Instant;
  */
 //TODO: write a proper Builder enforcing all those rules mentioned.
 public final class CreateStream<T> extends PTransform<PBegin, PCollection<T>> {
+  public static final String TRANSFORM_URN = "beam:transform:spark:createstream:v1";
 
   private final Duration batchDuration;
   private final Queue<Iterable<TimestampedValue<T>>> batches = new ArrayDeque<>();
diff --git a/runners/spark/src/main/java/org/apache/beam/runners/spark/translation/SparkPipelineTranslator.java b/runners/spark/src/main/java/org/apache/beam/runners/spark/translation/SparkPipelineTranslator.java
index 5ef8e28..cc40af5 100644
--- a/runners/spark/src/main/java/org/apache/beam/runners/spark/translation/SparkPipelineTranslator.java
+++ b/runners/spark/src/main/java/org/apache/beam/runners/spark/translation/SparkPipelineTranslator.java
@@ -22,11 +22,11 @@ import org.apache.beam.sdk.transforms.PTransform;
 /** Translator to support translation between Beam transformations and Spark transformations. */
 public interface SparkPipelineTranslator {
 
-  boolean hasTranslation(Class<? extends PTransform<?, ?>> clazz);
+  boolean hasTranslation(PTransform<?, ?> transform);
 
   <TransformT extends PTransform<?, ?>> TransformEvaluator<TransformT> translateBounded(
-      Class<TransformT> clazz);
+      PTransform<?, ?> transform);
 
   <TransformT extends PTransform<?, ?>> TransformEvaluator<TransformT> translateUnbounded(
-      Class<TransformT> clazz);
+      PTransform<?, ?> transform);
 }
diff --git a/runners/spark/src/main/java/org/apache/beam/runners/spark/translation/TransformTranslator.java b/runners/spark/src/main/java/org/apache/beam/runners/spark/translation/TransformTranslator.java
index 332bc2d..d075943 100644
--- a/runners/spark/src/main/java/org/apache/beam/runners/spark/translation/TransformTranslator.java
+++ b/runners/spark/src/main/java/org/apache/beam/runners/spark/translation/TransformTranslator.java
@@ -24,10 +24,12 @@ import static org.apache.beam.runners.spark.translation.TranslationUtils.avoidRd
 import com.google.common.base.Optional;
 import com.google.common.collect.FluentIterable;
 import com.google.common.collect.Lists;
-import com.google.common.collect.Maps;
 import java.util.Collection;
+import java.util.HashMap;
 import java.util.Map;
+import javax.annotation.Nullable;
 import org.apache.beam.runners.core.SystemReduceFn;
+import org.apache.beam.runners.core.construction.PTransformTranslation;
 import org.apache.beam.runners.core.metrics.MetricsContainerStepMap;
 import org.apache.beam.runners.spark.SparkPipelineOptions;
 import org.apache.beam.runners.spark.aggregators.AggregatorsAccumulator;
@@ -554,48 +556,51 @@ public final class TransformTranslator {
     };
   }
 
-  private static final Map<Class<? extends PTransform>, TransformEvaluator<?>> EVALUATORS =
-      Maps.newHashMap();
+  private static final Map<String, TransformEvaluator<?>> EVALUATORS = new HashMap<>();
 
   static {
-    EVALUATORS.put(Read.Bounded.class, readBounded());
-    EVALUATORS.put(ParDo.MultiOutput.class, parDo());
-    EVALUATORS.put(GroupByKey.class, groupByKey());
-    EVALUATORS.put(Combine.GroupedValues.class, combineGrouped());
-    EVALUATORS.put(Combine.Globally.class, combineGlobally());
-    EVALUATORS.put(Combine.PerKey.class, combinePerKey());
-    EVALUATORS.put(Flatten.PCollections.class, flattenPColl());
-    //    EVALUATORS.put(View.AsSingleton.class, viewAsSingleton());
-    //    EVALUATORS.put(View.AsIterable.class, viewAsIter());
-    EVALUATORS.put(View.CreatePCollectionView.class, createPCollView());
-    EVALUATORS.put(Window.Assign.class, window());
-    EVALUATORS.put(Reshuffle.class, reshuffle());
+    EVALUATORS.put(PTransformTranslation.READ_TRANSFORM_URN, readBounded());
+    EVALUATORS.put(PTransformTranslation.PAR_DO_TRANSFORM_URN, parDo());
+    EVALUATORS.put(PTransformTranslation.GROUP_BY_KEY_TRANSFORM_URN, groupByKey());
+    EVALUATORS.put(PTransformTranslation.COMBINE_GROUPED_VALUES_TRANSFORM_URN, combineGrouped());
+    EVALUATORS.put(PTransformTranslation.COMBINE_GLOBALLY_TRANSFORM_URN, combineGlobally());
+    EVALUATORS.put(PTransformTranslation.COMBINE_PER_KEY_TRANSFORM_URN, combinePerKey());
+    EVALUATORS.put(PTransformTranslation.FLATTEN_TRANSFORM_URN, flattenPColl());
+    EVALUATORS.put(PTransformTranslation.CREATE_VIEW_TRANSFORM_URN, createPCollView());
+    EVALUATORS.put(PTransformTranslation.ASSIGN_WINDOWS_TRANSFORM_URN, window());
+    EVALUATORS.put(PTransformTranslation.RESHUFFLE_URN, reshuffle());
+  }
+
+  @Nullable
+  private static TransformEvaluator<?> getTranslator(PTransform<?, ?> transform) {
+    @Nullable String urn = PTransformTranslation.urnForTransformOrNull(transform);
+    return urn == null ? null : EVALUATORS.get(urn);
   }
 
   /** Translator matches Beam transformation with the appropriate evaluator. */
   public static class Translator implements SparkPipelineTranslator {
 
     @Override
-    public boolean hasTranslation(Class<? extends PTransform<?, ?>> clazz) {
-      return EVALUATORS.containsKey(clazz);
+    public boolean hasTranslation(PTransform<?, ?> transform) {
+      return EVALUATORS.containsKey(PTransformTranslation.urnForTransformOrNull(transform));
     }
 
     @Override
     public <TransformT extends PTransform<?, ?>> TransformEvaluator<TransformT> translateBounded(
-        Class<TransformT> clazz) {
+        PTransform<?, ?> transform) {
       @SuppressWarnings("unchecked")
       TransformEvaluator<TransformT> transformEvaluator =
-          (TransformEvaluator<TransformT>) EVALUATORS.get(clazz);
+          (TransformEvaluator<TransformT>) getTranslator(transform);
       checkState(
           transformEvaluator != null,
           "No TransformEvaluator registered for BOUNDED transform %s",
-          clazz);
+          transform);
       return transformEvaluator;
     }
 
     @Override
     public <TransformT extends PTransform<?, ?>> TransformEvaluator<TransformT> translateUnbounded(
-        Class<TransformT> clazz) {
+        PTransform<?, ?> transform) {
       throw new IllegalStateException(
           "TransformTranslator used in a batch pipeline only " + "supports BOUNDED transforms.");
     }
diff --git a/runners/spark/src/main/java/org/apache/beam/runners/spark/translation/streaming/StreamingTransformTranslator.java b/runners/spark/src/main/java/org/apache/beam/runners/spark/translation/streaming/StreamingTransformTranslator.java
index f20eda5..fab3641 100644
--- a/runners/spark/src/main/java/org/apache/beam/runners/spark/translation/streaming/StreamingTransformTranslator.java
+++ b/runners/spark/src/main/java/org/apache/beam/runners/spark/translation/streaming/StreamingTransformTranslator.java
@@ -21,17 +21,21 @@ import static com.google.common.base.Preconditions.checkArgument;
 import static com.google.common.base.Preconditions.checkState;
 import static org.apache.beam.runners.spark.translation.TranslationUtils.rejectStateAndTimers;
 
+import com.google.auto.service.AutoService;
 import com.google.common.collect.ImmutableMap;
-import com.google.common.collect.Maps;
 import java.util.ArrayList;
 import java.util.Collections;
+import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
 import java.util.Queue;
 import java.util.concurrent.LinkedBlockingQueue;
 import java.util.stream.Collectors;
 import java.util.stream.StreamSupport;
+import javax.annotation.Nullable;
+import org.apache.beam.runners.core.construction.PTransformTranslation;
 import org.apache.beam.runners.core.construction.SerializablePipelineOptions;
+import org.apache.beam.runners.core.construction.TransformPayloadTranslatorRegistrar;
 import org.apache.beam.runners.core.metrics.MetricsContainerStepMap;
 import org.apache.beam.runners.spark.coders.CoderHelpers;
 import org.apache.beam.runners.spark.io.ConsoleIO;
@@ -231,7 +235,7 @@ public final class StreamingTransformTranslator {
             // create a single RDD stream.
             Queue<JavaRDD<WindowedValue<T>>> q = new LinkedBlockingQueue<>();
             q.offer(((BoundedDataset) dataset).getRDD());
-            //TODO: this is not recoverable from checkpoint!
+            // TODO: this is not recoverable from checkpoint!
             JavaDStream<WindowedValue<T>> dStream = context.getStreamingContext().queueStream(q);
             dStreams.add(dStream);
           }
@@ -493,19 +497,24 @@ public final class StreamingTransformTranslator {
     };
   }
 
-  private static final Map<Class<? extends PTransform>, TransformEvaluator<?>> EVALUATORS =
-      Maps.newHashMap();
+  private static final Map<String, TransformEvaluator<?>> EVALUATORS = new HashMap<>();
 
   static {
-    EVALUATORS.put(Read.Unbounded.class, readUnbounded());
-    EVALUATORS.put(GroupByKey.class, groupByKey());
-    EVALUATORS.put(Combine.GroupedValues.class, combineGrouped());
-    EVALUATORS.put(ParDo.MultiOutput.class, parDo());
-    EVALUATORS.put(ConsoleIO.Write.Unbound.class, print());
-    EVALUATORS.put(CreateStream.class, createFromQueue());
-    EVALUATORS.put(Window.Assign.class, window());
-    EVALUATORS.put(Flatten.PCollections.class, flattenPColl());
-    EVALUATORS.put(Reshuffle.class, reshuffle());
+    EVALUATORS.put(PTransformTranslation.READ_TRANSFORM_URN, readUnbounded());
+    EVALUATORS.put(PTransformTranslation.GROUP_BY_KEY_TRANSFORM_URN, groupByKey());
+    EVALUATORS.put(PTransformTranslation.COMBINE_GROUPED_VALUES_TRANSFORM_URN, combineGrouped());
+    EVALUATORS.put(PTransformTranslation.PAR_DO_TRANSFORM_URN, parDo());
+    EVALUATORS.put(ConsoleIO.Write.Unbound.TRANSFORM_URN, print());
+    EVALUATORS.put(CreateStream.TRANSFORM_URN, createFromQueue());
+    EVALUATORS.put(PTransformTranslation.ASSIGN_WINDOWS_TRANSFORM_URN, window());
+    EVALUATORS.put(PTransformTranslation.FLATTEN_TRANSFORM_URN, flattenPColl());
+    EVALUATORS.put(PTransformTranslation.RESHUFFLE_URN, reshuffle());
+  }
+
+  @Nullable
+  private static TransformEvaluator<?> getTranslator(PTransform<?, ?> transform) {
+    @Nullable String urn = PTransformTranslation.urnForTransformOrNull(transform);
+    return urn == null ? null : EVALUATORS.get(urn);
   }
 
   /** Translator matches Beam transformation with the appropriate evaluator. */
@@ -518,33 +527,67 @@ public final class StreamingTransformTranslator {
     }
 
     @Override
-    public boolean hasTranslation(Class<? extends PTransform<?, ?>> clazz) {
+    public boolean hasTranslation(PTransform<?, ?> transform) {
       // streaming includes rdd/bounded transformations as well
-      return EVALUATORS.containsKey(clazz);
+      return EVALUATORS.containsKey(PTransformTranslation.urnForTransformOrNull(transform));
     }
 
     @Override
     public <TransformT extends PTransform<?, ?>> TransformEvaluator<TransformT> translateBounded(
-        Class<TransformT> clazz) {
-      TransformEvaluator<TransformT> transformEvaluator = batchTranslator.translateBounded(clazz);
+        PTransform<?, ?> transform) {
+      TransformEvaluator<TransformT> transformEvaluator =
+          batchTranslator.translateBounded(transform);
       checkState(
           transformEvaluator != null,
           "No TransformEvaluator registered for BOUNDED transform %s",
-          clazz);
+          transform);
       return transformEvaluator;
     }
 
     @Override
     public <TransformT extends PTransform<?, ?>> TransformEvaluator<TransformT> translateUnbounded(
-        Class<TransformT> clazz) {
+        PTransform<?, ?> transform) {
       @SuppressWarnings("unchecked")
       TransformEvaluator<TransformT> transformEvaluator =
-          (TransformEvaluator<TransformT>) EVALUATORS.get(clazz);
+          (TransformEvaluator<TransformT>) getTranslator(transform);
       checkState(
           transformEvaluator != null,
           "No TransformEvaluator registered for UNBOUNDED transform %s",
-          clazz);
+          transform);
       return transformEvaluator;
     }
   }
+
+  /** Registers classes specialized by the Spark runner. */
+  @AutoService(TransformPayloadTranslatorRegistrar.class)
+  public static class SparkTransformsRegistrar implements TransformPayloadTranslatorRegistrar {
+    @Override
+    public Map<
+            ? extends Class<? extends PTransform>,
+            ? extends PTransformTranslation.TransformPayloadTranslator>
+        getTransformPayloadTranslators() {
+      return ImmutableMap.of(
+          ConsoleIO.Write.Unbound.class, new SparkConsoleIOWriteUnboundedPayloadTranslator(),
+          CreateStream.class, new SparkCreateStreamPayloadTranslator());
+    }
+  }
+
+  private static class SparkConsoleIOWriteUnboundedPayloadTranslator
+      extends PTransformTranslation.TransformPayloadTranslator.NotSerializable<
+          ConsoleIO.Write.Unbound<?>> {
+
+    @Override
+    public String getUrn(ConsoleIO.Write.Unbound<?> transform) {
+      return ConsoleIO.Write.Unbound.TRANSFORM_URN;
+    }
+  }
+
+  private static class SparkCreateStreamPayloadTranslator
+      extends PTransformTranslation.TransformPayloadTranslator.NotSerializable<CreateStream<?>> {
+
+    @Override
+    public String getUrn(CreateStream<?> transform) {
+      return CreateStream.TRANSFORM_URN;
+    }
+  }
 }
diff --git a/runners/spark/src/test/java/org/apache/beam/runners/spark/SparkRunnerDebuggerTest.java b/runners/spark/src/test/java/org/apache/beam/runners/spark/SparkRunnerDebuggerTest.java
index 2d12cd8..f84945f 100644
--- a/runners/spark/src/test/java/org/apache/beam/runners/spark/SparkRunnerDebuggerTest.java
+++ b/runners/spark/src/test/java/org/apache/beam/runners/spark/SparkRunnerDebuggerTest.java
@@ -84,7 +84,7 @@ public class SparkRunnerDebuggerTest {
             + "_.mapPartitions(new org.apache.beam.sdk.transforms.Contextful())\n"
             + "_.combineByKey(..., new org.apache.beam.sdk.transforms.Count$CountFn(), ...)\n"
             + "_.groupByKey()\n"
-            + "_.map(new org.apache.beam.sdk.transforms.Sum$SumLongFn())\n"
+            + "_.mapPartitions(new org.apache.beam.sdk.transforms.Combine$GroupedValues$1())\n"
             + "_.mapPartitions(new org.apache.beam.sdk.transforms.Contextful())\n"
             + "sparkContext.union(...)\n"
             + "_.mapPartitions("
@@ -141,7 +141,7 @@ public class SparkRunnerDebuggerTest {
             + "SparkRunnerDebuggerTest$FormatKVFn())\n"
             + "_.mapPartitions(new org.apache.beam.sdk.transforms.Contextful())\n"
             + "_.groupByKey()\n"
-            + "_.map(new org.apache.beam.sdk.transforms.Combine$IterableCombineFn())\n"
+            + "_.mapPartitions(new org.apache.beam.sdk.transforms.Combine$GroupedValues$1())\n"
             + "_.mapPartitions(new org.apache.beam.sdk.transforms.Distinct$3())\n"
             + "_.mapPartitions(new org.apache.beam.sdk.transforms.Contextful())\n"
             + "_.<org.apache.beam.sdk.io.kafka.AutoValue_KafkaIO_Write>";