You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@beam.apache.org by lc...@apache.org on 2018/11/12 22:09:05 UTC
[beam] branch master updated: [BEAM-6037] Make Spark runner
pipeline translation based on URNs (#7005)
This is an automated email from the ASF dual-hosted git repository.
lcwik pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/beam.git
The following commit(s) were added to refs/heads/master by this push:
new 9718c3e [BEAM-6037] Make Spark runner pipeline translation based on URNs (#7005)
9718c3e is described below
commit 9718c3e6822b204a99e0d7b5e9964f4276a75656
Author: Ismaël Mejía <ie...@gmail.com>
AuthorDate: Mon Nov 12 23:08:58 2018 +0100
[BEAM-6037] Make Spark runner pipeline translation based on URNs (#7005)
---
.../core/construction/PTransformTranslation.java | 3 +
.../runners/spark/SparkNativePipelineVisitor.java | 4 +-
.../org/apache/beam/runners/spark/SparkRunner.java | 22 +++---
.../apache/beam/runners/spark/io/ConsoleIO.java | 1 +
.../apache/beam/runners/spark/io/CreateStream.java | 1 +
.../spark/translation/SparkPipelineTranslator.java | 6 +-
.../spark/translation/TransformTranslator.java | 47 ++++++------
.../streaming/StreamingTransformTranslator.java | 85 ++++++++++++++++------
.../runners/spark/SparkRunnerDebuggerTest.java | 4 +-
9 files changed, 110 insertions(+), 63 deletions(-)
diff --git a/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/PTransformTranslation.java b/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/PTransformTranslation.java
index bf4ffbd..86a414c 100644
--- a/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/PTransformTranslation.java
+++ b/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/PTransformTranslation.java
@@ -38,6 +38,7 @@ import javax.annotation.Nullable;
import org.apache.beam.model.pipeline.v1.RunnerApi;
import org.apache.beam.model.pipeline.v1.RunnerApi.FunctionSpec;
import org.apache.beam.model.pipeline.v1.RunnerApi.StandardPTransforms;
+import org.apache.beam.model.pipeline.v1.RunnerApi.StandardPTransforms.CombineComponents;
import org.apache.beam.model.pipeline.v1.RunnerApi.StandardPTransforms.SplittableParDoComponents;
import org.apache.beam.runners.core.construction.ParDoTranslation.ParDoTranslator;
import org.apache.beam.sdk.Pipeline;
@@ -88,6 +89,8 @@ public class PTransformTranslation {
getUrn(StandardPTransforms.Composites.COMBINE_PER_KEY);
public static final String COMBINE_GLOBALLY_TRANSFORM_URN =
getUrn(StandardPTransforms.Composites.COMBINE_GLOBALLY);
+ public static final String COMBINE_GROUPED_VALUES_TRANSFORM_URN =
+ getUrn(CombineComponents.COMBINE_GROUPED_VALUES);
public static final String RESHUFFLE_URN = getUrn(StandardPTransforms.Composites.RESHUFFLE);
public static final String WRITE_FILES_TRANSFORM_URN =
getUrn(StandardPTransforms.Composites.WRITE_FILES);
diff --git a/runners/spark/src/main/java/org/apache/beam/runners/spark/SparkNativePipelineVisitor.java b/runners/spark/src/main/java/org/apache/beam/runners/spark/SparkNativePipelineVisitor.java
index 8e2f917..4662d81 100644
--- a/runners/spark/src/main/java/org/apache/beam/runners/spark/SparkNativePipelineVisitor.java
+++ b/runners/spark/src/main/java/org/apache/beam/runners/spark/SparkNativePipelineVisitor.java
@@ -91,9 +91,7 @@ public class SparkNativePipelineVisitor extends SparkRunner.Evaluator {
@SuppressWarnings("unchecked")
TransformT transform = (TransformT) node.getTransform();
@SuppressWarnings("unchecked")
- Class<TransformT> transformClass = (Class<TransformT>) transform.getClass();
- @SuppressWarnings("unchecked")
- TransformEvaluator<TransformT> evaluator = translate(node, transform, transformClass);
+ TransformEvaluator<TransformT> evaluator = translate(node, transform);
if (shouldDebug(node)) {
transforms.add(new NativeTransform(node, evaluator, transform, false));
}
diff --git a/runners/spark/src/main/java/org/apache/beam/runners/spark/SparkRunner.java b/runners/spark/src/main/java/org/apache/beam/runners/spark/SparkRunner.java
index ca86bde..1eeb1c1 100644
--- a/runners/spark/src/main/java/org/apache/beam/runners/spark/SparkRunner.java
+++ b/runners/spark/src/main/java/org/apache/beam/runners/spark/SparkRunner.java
@@ -374,13 +374,11 @@ public final class SparkRunner extends PipelineRunner<SparkPipelineResult> {
@Override
public CompositeBehavior enterCompositeTransform(TransformHierarchy.Node node) {
- if (node.getTransform() != null) {
- @SuppressWarnings("unchecked")
- Class<PTransform<?, ?>> transformClass =
- (Class<PTransform<?, ?>>) node.getTransform().getClass();
- if (translator.hasTranslation(transformClass) && !shouldDefer(node)) {
+ PTransform<?, ?> transform = node.getTransform();
+ if (transform != null) {
+ if (translator.hasTranslation(transform) && !shouldDefer(node)) {
LOG.info("Entering directly-translatable composite transform: '{}'", node.getFullName());
- LOG.debug("Composite transform class: '{}'", transformClass);
+ LOG.debug("Composite transform class: '{}'", transform);
doVisitTransform(node);
return CompositeBehavior.DO_NOT_ENTER_TRANSFORM;
}
@@ -433,9 +431,7 @@ public final class SparkRunner extends PipelineRunner<SparkPipelineResult> {
@SuppressWarnings("unchecked")
TransformT transform = (TransformT) node.getTransform();
@SuppressWarnings("unchecked")
- Class<TransformT> transformClass = (Class<TransformT>) (Class<?>) transform.getClass();
- @SuppressWarnings("unchecked")
- TransformEvaluator<TransformT> evaluator = translate(node, transform, transformClass);
+ TransformEvaluator<TransformT> evaluator = translate(node, transform);
LOG.info("Evaluating {}", transform);
AppliedPTransform<?, ?, ?> appliedTransform = node.toAppliedPTransform(getPipeline());
ctxt.setCurrentTransform(appliedTransform);
@@ -449,8 +445,8 @@ public final class SparkRunner extends PipelineRunner<SparkPipelineResult> {
*/
protected <TransformT extends PTransform<? super PInput, POutput>>
TransformEvaluator<TransformT> translate(
- TransformHierarchy.Node node, TransformT transform, Class<TransformT> transformClass) {
- //--- determine if node is bounded/unbounded.
+ TransformHierarchy.Node node, TransformT transform) {
+ // --- determine if node is bounded/unbounded.
// usually, the input determines if the PCollection to apply the next transformation to
// is BOUNDED or UNBOUNDED, meaning RDD/DStream.
Map<TupleTag<?>, PValue> pValues;
@@ -464,8 +460,8 @@ public final class SparkRunner extends PipelineRunner<SparkPipelineResult> {
// translate accordingly.
LOG.debug("Translating {} as {}", transform, isNodeBounded);
return isNodeBounded.equals(PCollection.IsBounded.BOUNDED)
- ? translator.translateBounded(transformClass)
- : translator.translateUnbounded(transformClass);
+ ? translator.translateBounded(transform)
+ : translator.translateUnbounded(transform);
}
protected PCollection.IsBounded isBoundedCollection(Collection<PValue> pValues) {
diff --git a/runners/spark/src/main/java/org/apache/beam/runners/spark/io/ConsoleIO.java b/runners/spark/src/main/java/org/apache/beam/runners/spark/io/ConsoleIO.java
index dd15712..bad6dfb 100644
--- a/runners/spark/src/main/java/org/apache/beam/runners/spark/io/ConsoleIO.java
+++ b/runners/spark/src/main/java/org/apache/beam/runners/spark/io/ConsoleIO.java
@@ -49,6 +49,7 @@ public final class ConsoleIO {
* @param <T> the type of the elements in the {@link PCollection}
*/
public static class Unbound<T> extends PTransform<PCollection<T>, PDone> {
+ public static final String TRANSFORM_URN = "beam:transform:spark:consoleio_write_unbound:v1";
private final int num;
diff --git a/runners/spark/src/main/java/org/apache/beam/runners/spark/io/CreateStream.java b/runners/spark/src/main/java/org/apache/beam/runners/spark/io/CreateStream.java
index 9d58eec..b352a3a 100644
--- a/runners/spark/src/main/java/org/apache/beam/runners/spark/io/CreateStream.java
+++ b/runners/spark/src/main/java/org/apache/beam/runners/spark/io/CreateStream.java
@@ -87,6 +87,7 @@ import org.joda.time.Instant;
*/
//TODO: write a proper Builder enforcing all those rules mentioned.
public final class CreateStream<T> extends PTransform<PBegin, PCollection<T>> {
+ public static final String TRANSFORM_URN = "beam:transform:spark:createstream:v1";
private final Duration batchDuration;
private final Queue<Iterable<TimestampedValue<T>>> batches = new ArrayDeque<>();
diff --git a/runners/spark/src/main/java/org/apache/beam/runners/spark/translation/SparkPipelineTranslator.java b/runners/spark/src/main/java/org/apache/beam/runners/spark/translation/SparkPipelineTranslator.java
index 5ef8e28..cc40af5 100644
--- a/runners/spark/src/main/java/org/apache/beam/runners/spark/translation/SparkPipelineTranslator.java
+++ b/runners/spark/src/main/java/org/apache/beam/runners/spark/translation/SparkPipelineTranslator.java
@@ -22,11 +22,11 @@ import org.apache.beam.sdk.transforms.PTransform;
/** Translator to support translation between Beam transformations and Spark transformations. */
public interface SparkPipelineTranslator {
- boolean hasTranslation(Class<? extends PTransform<?, ?>> clazz);
+ boolean hasTranslation(PTransform<?, ?> transform);
<TransformT extends PTransform<?, ?>> TransformEvaluator<TransformT> translateBounded(
- Class<TransformT> clazz);
+ PTransform<?, ?> transform);
<TransformT extends PTransform<?, ?>> TransformEvaluator<TransformT> translateUnbounded(
- Class<TransformT> clazz);
+ PTransform<?, ?> transform);
}
diff --git a/runners/spark/src/main/java/org/apache/beam/runners/spark/translation/TransformTranslator.java b/runners/spark/src/main/java/org/apache/beam/runners/spark/translation/TransformTranslator.java
index 332bc2d..d075943 100644
--- a/runners/spark/src/main/java/org/apache/beam/runners/spark/translation/TransformTranslator.java
+++ b/runners/spark/src/main/java/org/apache/beam/runners/spark/translation/TransformTranslator.java
@@ -24,10 +24,12 @@ import static org.apache.beam.runners.spark.translation.TranslationUtils.avoidRd
import com.google.common.base.Optional;
import com.google.common.collect.FluentIterable;
import com.google.common.collect.Lists;
-import com.google.common.collect.Maps;
import java.util.Collection;
+import java.util.HashMap;
import java.util.Map;
+import javax.annotation.Nullable;
import org.apache.beam.runners.core.SystemReduceFn;
+import org.apache.beam.runners.core.construction.PTransformTranslation;
import org.apache.beam.runners.core.metrics.MetricsContainerStepMap;
import org.apache.beam.runners.spark.SparkPipelineOptions;
import org.apache.beam.runners.spark.aggregators.AggregatorsAccumulator;
@@ -554,48 +556,51 @@ public final class TransformTranslator {
};
}
- private static final Map<Class<? extends PTransform>, TransformEvaluator<?>> EVALUATORS =
- Maps.newHashMap();
+ private static final Map<String, TransformEvaluator<?>> EVALUATORS = new HashMap<>();
static {
- EVALUATORS.put(Read.Bounded.class, readBounded());
- EVALUATORS.put(ParDo.MultiOutput.class, parDo());
- EVALUATORS.put(GroupByKey.class, groupByKey());
- EVALUATORS.put(Combine.GroupedValues.class, combineGrouped());
- EVALUATORS.put(Combine.Globally.class, combineGlobally());
- EVALUATORS.put(Combine.PerKey.class, combinePerKey());
- EVALUATORS.put(Flatten.PCollections.class, flattenPColl());
- // EVALUATORS.put(View.AsSingleton.class, viewAsSingleton());
- // EVALUATORS.put(View.AsIterable.class, viewAsIter());
- EVALUATORS.put(View.CreatePCollectionView.class, createPCollView());
- EVALUATORS.put(Window.Assign.class, window());
- EVALUATORS.put(Reshuffle.class, reshuffle());
+ EVALUATORS.put(PTransformTranslation.READ_TRANSFORM_URN, readBounded());
+ EVALUATORS.put(PTransformTranslation.PAR_DO_TRANSFORM_URN, parDo());
+ EVALUATORS.put(PTransformTranslation.GROUP_BY_KEY_TRANSFORM_URN, groupByKey());
+ EVALUATORS.put(PTransformTranslation.COMBINE_GROUPED_VALUES_TRANSFORM_URN, combineGrouped());
+ EVALUATORS.put(PTransformTranslation.COMBINE_GLOBALLY_TRANSFORM_URN, combineGlobally());
+ EVALUATORS.put(PTransformTranslation.COMBINE_PER_KEY_TRANSFORM_URN, combinePerKey());
+ EVALUATORS.put(PTransformTranslation.FLATTEN_TRANSFORM_URN, flattenPColl());
+ EVALUATORS.put(PTransformTranslation.CREATE_VIEW_TRANSFORM_URN, createPCollView());
+ EVALUATORS.put(PTransformTranslation.ASSIGN_WINDOWS_TRANSFORM_URN, window());
+ EVALUATORS.put(PTransformTranslation.RESHUFFLE_URN, reshuffle());
+ }
+
+ @Nullable
+ private static TransformEvaluator<?> getTranslator(PTransform<?, ?> transform) {
+ @Nullable String urn = PTransformTranslation.urnForTransformOrNull(transform);
+ return urn == null ? null : EVALUATORS.get(urn);
}
/** Translator matches Beam transformation with the appropriate evaluator. */
public static class Translator implements SparkPipelineTranslator {
@Override
- public boolean hasTranslation(Class<? extends PTransform<?, ?>> clazz) {
- return EVALUATORS.containsKey(clazz);
+ public boolean hasTranslation(PTransform<?, ?> transform) {
+ return EVALUATORS.containsKey(PTransformTranslation.urnForTransformOrNull(transform));
}
@Override
public <TransformT extends PTransform<?, ?>> TransformEvaluator<TransformT> translateBounded(
- Class<TransformT> clazz) {
+ PTransform<?, ?> transform) {
@SuppressWarnings("unchecked")
TransformEvaluator<TransformT> transformEvaluator =
- (TransformEvaluator<TransformT>) EVALUATORS.get(clazz);
+ (TransformEvaluator<TransformT>) getTranslator(transform);
checkState(
transformEvaluator != null,
"No TransformEvaluator registered for BOUNDED transform %s",
- clazz);
+ transform);
return transformEvaluator;
}
@Override
public <TransformT extends PTransform<?, ?>> TransformEvaluator<TransformT> translateUnbounded(
- Class<TransformT> clazz) {
+ PTransform<?, ?> transform) {
throw new IllegalStateException(
"TransformTranslator used in a batch pipeline only " + "supports BOUNDED transforms.");
}
diff --git a/runners/spark/src/main/java/org/apache/beam/runners/spark/translation/streaming/StreamingTransformTranslator.java b/runners/spark/src/main/java/org/apache/beam/runners/spark/translation/streaming/StreamingTransformTranslator.java
index f20eda5..fab3641 100644
--- a/runners/spark/src/main/java/org/apache/beam/runners/spark/translation/streaming/StreamingTransformTranslator.java
+++ b/runners/spark/src/main/java/org/apache/beam/runners/spark/translation/streaming/StreamingTransformTranslator.java
@@ -21,17 +21,21 @@ import static com.google.common.base.Preconditions.checkArgument;
import static com.google.common.base.Preconditions.checkState;
import static org.apache.beam.runners.spark.translation.TranslationUtils.rejectStateAndTimers;
+import com.google.auto.service.AutoService;
import com.google.common.collect.ImmutableMap;
-import com.google.common.collect.Maps;
import java.util.ArrayList;
import java.util.Collections;
+import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Queue;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.stream.Collectors;
import java.util.stream.StreamSupport;
+import javax.annotation.Nullable;
+import org.apache.beam.runners.core.construction.PTransformTranslation;
import org.apache.beam.runners.core.construction.SerializablePipelineOptions;
+import org.apache.beam.runners.core.construction.TransformPayloadTranslatorRegistrar;
import org.apache.beam.runners.core.metrics.MetricsContainerStepMap;
import org.apache.beam.runners.spark.coders.CoderHelpers;
import org.apache.beam.runners.spark.io.ConsoleIO;
@@ -231,7 +235,7 @@ public final class StreamingTransformTranslator {
// create a single RDD stream.
Queue<JavaRDD<WindowedValue<T>>> q = new LinkedBlockingQueue<>();
q.offer(((BoundedDataset) dataset).getRDD());
- //TODO: this is not recoverable from checkpoint!
+ // TODO: this is not recoverable from checkpoint!
JavaDStream<WindowedValue<T>> dStream = context.getStreamingContext().queueStream(q);
dStreams.add(dStream);
}
@@ -493,19 +497,24 @@ public final class StreamingTransformTranslator {
};
}
- private static final Map<Class<? extends PTransform>, TransformEvaluator<?>> EVALUATORS =
- Maps.newHashMap();
+ private static final Map<String, TransformEvaluator<?>> EVALUATORS = new HashMap<>();
static {
- EVALUATORS.put(Read.Unbounded.class, readUnbounded());
- EVALUATORS.put(GroupByKey.class, groupByKey());
- EVALUATORS.put(Combine.GroupedValues.class, combineGrouped());
- EVALUATORS.put(ParDo.MultiOutput.class, parDo());
- EVALUATORS.put(ConsoleIO.Write.Unbound.class, print());
- EVALUATORS.put(CreateStream.class, createFromQueue());
- EVALUATORS.put(Window.Assign.class, window());
- EVALUATORS.put(Flatten.PCollections.class, flattenPColl());
- EVALUATORS.put(Reshuffle.class, reshuffle());
+ EVALUATORS.put(PTransformTranslation.READ_TRANSFORM_URN, readUnbounded());
+ EVALUATORS.put(PTransformTranslation.GROUP_BY_KEY_TRANSFORM_URN, groupByKey());
+ EVALUATORS.put(PTransformTranslation.COMBINE_GROUPED_VALUES_TRANSFORM_URN, combineGrouped());
+ EVALUATORS.put(PTransformTranslation.PAR_DO_TRANSFORM_URN, parDo());
+ EVALUATORS.put(ConsoleIO.Write.Unbound.TRANSFORM_URN, print());
+ EVALUATORS.put(CreateStream.TRANSFORM_URN, createFromQueue());
+ EVALUATORS.put(PTransformTranslation.ASSIGN_WINDOWS_TRANSFORM_URN, window());
+ EVALUATORS.put(PTransformTranslation.FLATTEN_TRANSFORM_URN, flattenPColl());
+ EVALUATORS.put(PTransformTranslation.RESHUFFLE_URN, reshuffle());
+ }
+
+ @Nullable
+ private static TransformEvaluator<?> getTranslator(PTransform<?, ?> transform) {
+ @Nullable String urn = PTransformTranslation.urnForTransformOrNull(transform);
+ return urn == null ? null : EVALUATORS.get(urn);
}
/** Translator matches Beam transformation with the appropriate evaluator. */
@@ -518,33 +527,67 @@ public final class StreamingTransformTranslator {
}
@Override
- public boolean hasTranslation(Class<? extends PTransform<?, ?>> clazz) {
+ public boolean hasTranslation(PTransform<?, ?> transform) {
// streaming includes rdd/bounded transformations as well
- return EVALUATORS.containsKey(clazz);
+ return EVALUATORS.containsKey(PTransformTranslation.urnForTransformOrNull(transform));
}
@Override
public <TransformT extends PTransform<?, ?>> TransformEvaluator<TransformT> translateBounded(
- Class<TransformT> clazz) {
- TransformEvaluator<TransformT> transformEvaluator = batchTranslator.translateBounded(clazz);
+ PTransform<?, ?> transform) {
+ TransformEvaluator<TransformT> transformEvaluator =
+ batchTranslator.translateBounded(transform);
checkState(
transformEvaluator != null,
"No TransformEvaluator registered for BOUNDED transform %s",
- clazz);
+ transform);
return transformEvaluator;
}
@Override
public <TransformT extends PTransform<?, ?>> TransformEvaluator<TransformT> translateUnbounded(
- Class<TransformT> clazz) {
+ PTransform<?, ?> transform) {
@SuppressWarnings("unchecked")
TransformEvaluator<TransformT> transformEvaluator =
- (TransformEvaluator<TransformT>) EVALUATORS.get(clazz);
+ (TransformEvaluator<TransformT>) getTranslator(transform);
checkState(
transformEvaluator != null,
"No TransformEvaluator registered for UNBOUNDED transform %s",
- clazz);
+ transform);
return transformEvaluator;
}
}
+
+ /** Registers classes specialized by the Spark runner. */
+ @AutoService(TransformPayloadTranslatorRegistrar.class)
+ public static class SparkTransformsRegistrar implements TransformPayloadTranslatorRegistrar {
+ @Override
+ public Map<
+ ? extends Class<? extends PTransform>,
+ ? extends PTransformTranslation.TransformPayloadTranslator>
+ getTransformPayloadTranslators() {
+ return ImmutableMap.of(
+ ConsoleIO.Write.Unbound.class, new SparkConsoleIOWriteUnboundedPayloadTranslator(),
+ CreateStream.class, new SparkCreateStreamPayloadTranslator());
+ }
+ }
+
+ private static class SparkConsoleIOWriteUnboundedPayloadTranslator
+ extends PTransformTranslation.TransformPayloadTranslator.NotSerializable<
+ ConsoleIO.Write.Unbound<?>> {
+
+ @Override
+ public String getUrn(ConsoleIO.Write.Unbound<?> transform) {
+ return ConsoleIO.Write.Unbound.TRANSFORM_URN;
+ }
+ }
+
+ private static class SparkCreateStreamPayloadTranslator
+ extends PTransformTranslation.TransformPayloadTranslator.NotSerializable<CreateStream<?>> {
+
+ @Override
+ public String getUrn(CreateStream<?> transform) {
+ return CreateStream.TRANSFORM_URN;
+ }
+ }
}
diff --git a/runners/spark/src/test/java/org/apache/beam/runners/spark/SparkRunnerDebuggerTest.java b/runners/spark/src/test/java/org/apache/beam/runners/spark/SparkRunnerDebuggerTest.java
index 2d12cd8..f84945f 100644
--- a/runners/spark/src/test/java/org/apache/beam/runners/spark/SparkRunnerDebuggerTest.java
+++ b/runners/spark/src/test/java/org/apache/beam/runners/spark/SparkRunnerDebuggerTest.java
@@ -84,7 +84,7 @@ public class SparkRunnerDebuggerTest {
+ "_.mapPartitions(new org.apache.beam.sdk.transforms.Contextful())\n"
+ "_.combineByKey(..., new org.apache.beam.sdk.transforms.Count$CountFn(), ...)\n"
+ "_.groupByKey()\n"
- + "_.map(new org.apache.beam.sdk.transforms.Sum$SumLongFn())\n"
+ + "_.mapPartitions(new org.apache.beam.sdk.transforms.Combine$GroupedValues$1())\n"
+ "_.mapPartitions(new org.apache.beam.sdk.transforms.Contextful())\n"
+ "sparkContext.union(...)\n"
+ "_.mapPartitions("
@@ -141,7 +141,7 @@ public class SparkRunnerDebuggerTest {
+ "SparkRunnerDebuggerTest$FormatKVFn())\n"
+ "_.mapPartitions(new org.apache.beam.sdk.transforms.Contextful())\n"
+ "_.groupByKey()\n"
- + "_.map(new org.apache.beam.sdk.transforms.Combine$IterableCombineFn())\n"
+ + "_.mapPartitions(new org.apache.beam.sdk.transforms.Combine$GroupedValues$1())\n"
+ "_.mapPartitions(new org.apache.beam.sdk.transforms.Distinct$3())\n"
+ "_.mapPartitions(new org.apache.beam.sdk.transforms.Contextful())\n"
+ "_.<org.apache.beam.sdk.io.kafka.AutoValue_KafkaIO_Write>";