You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@beam.apache.org by ec...@apache.org on 2019/01/04 10:38:26 UTC
[beam] 04/50: Start pipeline translation
This is an automated email from the ASF dual-hosted git repository.
echauchot pushed a commit to branch spark-runner_structured-streaming
in repository https://gitbox.apache.org/repos/asf/beam.git
commit 1c977888ed4a3572a7af8d476fbcaa48cc36e5b4
Author: Etienne Chauchot <ec...@apache.org>
AuthorDate: Fri Nov 16 19:06:04 2018 +0100
Start pipeline translation
---
.../structuredstreaming/SparkPipelineResult.java | 29 +++
.../spark/structuredstreaming/SparkRunner.java | 108 +++++++++
.../translation/BatchPipelineTranslator.java | 20 ++
.../translation/EvaluationContext.java | 261 ---------------------
.../translation/PipelineTranslator.java | 94 ++++++++
.../translation/SparkTransformOverrides.java | 52 ++++
.../translation/StreamingPipelineTranslator.java | 5 +
7 files changed, 308 insertions(+), 261 deletions(-)
diff --git a/runners/spark-structured-streaming/src/main/java/org/apache/beam/runners/spark/structuredstreaming/SparkPipelineResult.java b/runners/spark-structured-streaming/src/main/java/org/apache/beam/runners/spark/structuredstreaming/SparkPipelineResult.java
new file mode 100644
index 0000000..82d1b90
--- /dev/null
+++ b/runners/spark-structured-streaming/src/main/java/org/apache/beam/runners/spark/structuredstreaming/SparkPipelineResult.java
@@ -0,0 +1,29 @@
+package org.apache.beam.runners.spark.structuredstreaming;
+
+import java.io.IOException;
+import org.apache.beam.sdk.PipelineResult;
+import org.apache.beam.sdk.metrics.MetricResults;
+import org.joda.time.Duration;
+
+public class SparkPipelineResult implements PipelineResult {
+
+ @Override public State getState() {
+ return null;
+ }
+
+ @Override public State cancel() throws IOException {
+ return null;
+ }
+
+ @Override public State waitUntilFinish(Duration duration) {
+ return null;
+ }
+
+ @Override public State waitUntilFinish() {
+ return null;
+ }
+
+ @Override public MetricResults metrics() {
+ return null;
+ }
+}
diff --git a/runners/spark-structured-streaming/src/main/java/org/apache/beam/runners/spark/structuredstreaming/SparkRunner.java b/runners/spark-structured-streaming/src/main/java/org/apache/beam/runners/spark/structuredstreaming/SparkRunner.java
new file mode 100644
index 0000000..62cd7d3
--- /dev/null
+++ b/runners/spark-structured-streaming/src/main/java/org/apache/beam/runners/spark/structuredstreaming/SparkRunner.java
@@ -0,0 +1,108 @@
+package org.apache.beam.runners.spark.structuredstreaming;
+
+import static org.apache.beam.runners.core.construction.PipelineResources.detectClassPathResourcesToStage;
+
+import org.apache.beam.runners.spark.structuredstreaming.translation.BatchPipelineTranslator;
+import org.apache.beam.runners.spark.structuredstreaming.translation.PipelineTranslator;
+import org.apache.beam.runners.spark.structuredstreaming.translation.StreamingPipelineTranslator;
+import org.apache.beam.sdk.Pipeline;
+import org.apache.beam.sdk.PipelineRunner;
+import org.apache.beam.sdk.options.PipelineOptions;
+import org.apache.beam.sdk.options.PipelineOptionsFactory;
+import org.apache.beam.sdk.options.PipelineOptionsValidator;
+import org.apache.spark.sql.SparkSession;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * The SparkRunner translate operations defined on a pipeline to a representation executable by
+ * Spark, and then submitting the job to Spark to be executed. If we wanted to run a Beam pipeline
+ * with the default options of a single threaded spark instance in local mode, we would do the
+ * following:
+ *
+ * <p>{@code Pipeline p = [logic for pipeline creation] SparkPipelineResult result =
+ * (SparkPipelineResult) p.run(); }
+ *
+ * <p>To create a pipeline runner to run against a different spark cluster, with a custom master url
+ * we would do the following:
+ *
+ * <p>{@code Pipeline p = [logic for pipeline creation] SparkPipelineOptions options =
+ * SparkPipelineOptionsFactory.create(); options.setSparkMaster("spark://host:port");
+ * SparkPipelineResult result = (SparkPipelineResult) p.run(); }
+ */
+public final class SparkRunner extends PipelineRunner<SparkPipelineResult> {
+
+ private static final Logger LOG = LoggerFactory.getLogger(SparkRunner.class);
+
+ /** Options used in this pipeline runner. */
+ private final SparkPipelineOptions options;
+
+ /**
+ * Creates and returns a new SparkRunner with default options. In particular, against a spark
+ * instance running in local mode.
+ *
+ * @return A pipeline runner with default options.
+ */
+ public static SparkRunner create() {
+ SparkPipelineOptions options = PipelineOptionsFactory.as(SparkPipelineOptions.class);
+ options.setRunner(SparkRunner.class);
+ return new SparkRunner(options);
+ }
+
+ /**
+ * Creates and returns a new SparkRunner with specified options.
+ *
+ * @param options The SparkPipelineOptions to use when executing the job.
+ * @return A pipeline runner that will execute with specified options.
+ */
+ public static SparkRunner create(SparkPipelineOptions options) {
+ return new SparkRunner(options);
+ }
+
+ /**
+ * Creates and returns a new SparkRunner with specified options.
+ *
+ * @param options The PipelineOptions to use when executing the job.
+ * @return A pipeline runner that will execute with specified options.
+ */
+ public static SparkRunner fromOptions(PipelineOptions options) {
+ SparkPipelineOptions sparkOptions = PipelineOptionsValidator
+ .validate(SparkPipelineOptions.class, options);
+
+ if (sparkOptions.getFilesToStage() == null) {
+ sparkOptions.setFilesToStage(detectClassPathResourcesToStage(SparkRunner.class.getClassLoader()));
+ LOG.info("PipelineOptions.filesToStage was not specified. "
+ + "Defaulting to files from the classpath: will stage {} files. "
+ + "Enable logging at DEBUG level to see which files will be staged.",
+ sparkOptions.getFilesToStage().size());
+ LOG.debug("Classpath elements: {}", sparkOptions.getFilesToStage());
+ }
+
+ return new SparkRunner(sparkOptions);
+ }
+
+ /**
+ * No parameter constructor defaults to running this pipeline in Spark's local mode, in a single
+ * thread.
+ */
+ private SparkRunner(SparkPipelineOptions options) {
+ this.options = options;
+ }
+
+ @Override public SparkPipelineResult run(final Pipeline pipeline) {
+ translatePipeline(pipeline);
+ executePipeline(pipeline);
+ return new SparkPipelineResult();
+ }
+
+ private void translatePipeline(Pipeline pipeline){
+ PipelineTranslator.detectTranslationMode(pipeline, options);
+ PipelineTranslator.replaceTransforms(pipeline, options);
+ PipelineTranslator.prepareFilesToStageForRemoteClusterExecution(options);
+ PipelineTranslator translator = options.isStreaming() ? new StreamingPipelineTranslator() : new BatchPipelineTranslator()
+ //init translator with subclass based on mode and env
+ translator.translate(pipeline);
+ }
+ public void executePipeline(Pipeline pipeline) {}
+
+}
diff --git a/runners/spark-structured-streaming/src/main/java/org/apache/beam/runners/spark/structuredstreaming/translation/BatchPipelineTranslator.java b/runners/spark-structured-streaming/src/main/java/org/apache/beam/runners/spark/structuredstreaming/translation/BatchPipelineTranslator.java
new file mode 100644
index 0000000..e66555c
--- /dev/null
+++ b/runners/spark-structured-streaming/src/main/java/org/apache/beam/runners/spark/structuredstreaming/translation/BatchPipelineTranslator.java
@@ -0,0 +1,20 @@
+package org.apache.beam.runners.spark.structuredstreaming.translation;
+
+import org.apache.beam.sdk.Pipeline;
+import org.apache.beam.sdk.runners.TransformHierarchy;
+import org.apache.beam.sdk.values.PValue;
+
+public class BatchPipelineTranslator extends PipelineTranslator {
+
+
+ @Override public CompositeBehavior enterCompositeTransform(TransformHierarchy.Node node) {
+ return super.enterCompositeTransform(node);
+ }
+
+
+ @Override public void visitPrimitiveTransform(TransformHierarchy.Node node) {
+ super.visitPrimitiveTransform(node);
+ }
+
+
+}
diff --git a/runners/spark-structured-streaming/src/main/java/org/apache/beam/runners/spark/structuredstreaming/translation/EvaluationContext.java b/runners/spark-structured-streaming/src/main/java/org/apache/beam/runners/spark/structuredstreaming/translation/EvaluationContext.java
deleted file mode 100644
index 47a3098..0000000
--- a/runners/spark-structured-streaming/src/main/java/org/apache/beam/runners/spark/structuredstreaming/translation/EvaluationContext.java
+++ /dev/null
@@ -1,261 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.beam.runners.spark.structuredstreaming.translation;
-
-import static com.google.common.base.Preconditions.checkArgument;
-
-import com.google.common.collect.Iterables;
-import java.util.HashMap;
-import java.util.LinkedHashMap;
-import java.util.LinkedHashSet;
-import java.util.Map;
-import java.util.Set;
-import java.util.stream.Collectors;
-import org.apache.beam.runners.core.construction.SerializablePipelineOptions;
-import org.apache.beam.runners.core.construction.TransformInputs;
-import org.apache.beam.runners.spark.structuredstreaming.SparkPipelineOptions;
-import org.apache.beam.sdk.Pipeline;
-import org.apache.beam.sdk.coders.Coder;
-import org.apache.beam.sdk.options.PipelineOptions;
-import org.apache.beam.sdk.runners.AppliedPTransform;
-import org.apache.beam.sdk.transforms.PTransform;
-import org.apache.beam.sdk.transforms.windowing.BoundedWindow;
-import org.apache.beam.sdk.util.WindowedValue;
-import org.apache.beam.sdk.values.PCollection;
-import org.apache.beam.sdk.values.PCollectionView;
-import org.apache.beam.sdk.values.PValue;
-import org.apache.beam.sdk.values.TupleTag;
-import org.apache.spark.api.java.JavaRDD;
-import org.apache.spark.api.java.JavaSparkContext;
-import org.apache.spark.sql.SparkSession;
-import org.apache.spark.streaming.api.java.JavaStreamingContext;
-
-/**
- * The EvaluationContext allows us to define pipeline instructions and translate between {@code
- * PObject<T>}s or {@code PCollection<T>}s and Ts or DStreams/RDDs of Ts.
- */
-public class EvaluationContext {
- private SparkSession sparkSession;
- private final Pipeline pipeline;
- private final Map<PValue, Dataset> datasets = new LinkedHashMap<>();
- private final Map<PValue, Dataset> pcollections = new LinkedHashMap<>();
- private final Set<Dataset> leaves = new LinkedHashSet<>();
- private final Map<PValue, Object> pobjects = new LinkedHashMap<>();
- private AppliedPTransform<?, ?, ?> currentTransform;
- private final SparkPCollectionView pviews = new SparkPCollectionView();
- private final Map<PCollection, Long> cacheCandidates = new HashMap<>();
- private final PipelineOptions options;
- private final SerializablePipelineOptions serializableOptions;
-
- public EvaluationContext(JavaSparkContext jsc, Pipeline pipeline, PipelineOptions options) {
- this.jsc = jsc;
- this.pipeline = pipeline;
- this.options = options;
- this.serializableOptions = new SerializablePipelineOptions(options);
- }
-
- public EvaluationContext(
- JavaSparkContext jsc, Pipeline pipeline, PipelineOptions options, JavaStreamingContext jssc) {
- this(jsc, pipeline, options);
- this.jssc = jssc;
- }
-
- public JavaSparkContext getSparkContext() {
- return jsc;
- }
-
- public JavaStreamingContext getStreamingContext() {
- return jssc;
- }
-
- public Pipeline getPipeline() {
- return pipeline;
- }
-
- public PipelineOptions getOptions() {
- return options;
- }
-
- public SerializablePipelineOptions getSerializableOptions() {
- return serializableOptions;
- }
-
- public void setCurrentTransform(AppliedPTransform<?, ?, ?> transform) {
- this.currentTransform = transform;
- }
-
- public AppliedPTransform<?, ?, ?> getCurrentTransform() {
- return currentTransform;
- }
-
- public <T extends PValue> T getInput(PTransform<T, ?> transform) {
- @SuppressWarnings("unchecked")
- T input =
- (T) Iterables.getOnlyElement(TransformInputs.nonAdditionalInputs(getCurrentTransform()));
- return input;
- }
-
- public <T> Map<TupleTag<?>, PValue> getInputs(PTransform<?, ?> transform) {
- checkArgument(currentTransform != null, "can only be called with non-null currentTransform");
- checkArgument(
- currentTransform.getTransform() == transform, "can only be called with current transform");
- return currentTransform.getInputs();
- }
-
- public <T extends PValue> T getOutput(PTransform<?, T> transform) {
- @SuppressWarnings("unchecked")
- T output = (T) Iterables.getOnlyElement(getOutputs(transform).values());
- return output;
- }
-
- public Map<TupleTag<?>, PValue> getOutputs(PTransform<?, ?> transform) {
- checkArgument(currentTransform != null, "can only be called with non-null currentTransform");
- checkArgument(
- currentTransform.getTransform() == transform, "can only be called with current transform");
- return currentTransform.getOutputs();
- }
-
- public Map<TupleTag<?>, Coder<?>> getOutputCoders() {
- return currentTransform
- .getOutputs()
- .entrySet()
- .stream()
- .filter(e -> e.getValue() instanceof PCollection)
- .collect(Collectors.toMap(e -> e.getKey(), e -> ((PCollection) e.getValue()).getCoder()));
- }
-
- private boolean shouldCache(PValue pvalue) {
- if ((pvalue instanceof PCollection)
- && cacheCandidates.containsKey(pvalue)
- && cacheCandidates.get(pvalue) > 1) {
- return true;
- }
- return false;
- }
-
- public void putDataset(
- PTransform<?, ? extends PValue> transform, Dataset dataset, boolean forceCache) {
- putDataset(getOutput(transform), dataset, forceCache);
- }
-
- public void putDataset(PTransform<?, ? extends PValue> transform, Dataset dataset) {
- putDataset(transform, dataset, false);
- }
-
- public void putDataset(PValue pvalue, Dataset dataset, boolean forceCache) {
- try {
- dataset.setName(pvalue.getName());
- } catch (IllegalStateException e) {
- // name not set, ignore
- }
- if ((forceCache || shouldCache(pvalue)) && pvalue instanceof PCollection) {
- // we cache only PCollection
- Coder<?> coder = ((PCollection<?>) pvalue).getCoder();
- Coder<? extends BoundedWindow> wCoder =
- ((PCollection<?>) pvalue).getWindowingStrategy().getWindowFn().windowCoder();
- dataset.cache(storageLevel(), WindowedValue.getFullCoder(coder, wCoder));
- }
- datasets.put(pvalue, dataset);
- leaves.add(dataset);
- }
-
- public Dataset borrowDataset(PTransform<? extends PValue, ?> transform) {
- return borrowDataset(getInput(transform));
- }
-
- public Dataset borrowDataset(PValue pvalue) {
- Dataset dataset = datasets.get(pvalue);
- leaves.remove(dataset);
- return dataset;
- }
-
- /**
- * Computes the outputs for all RDDs that are leaves in the DAG and do not have any actions (like
- * saving to a file) registered on them (i.e. they are performed for side effects).
- */
- public void computeOutputs() {
- for (Dataset dataset : leaves) {
- dataset.action(); // force computation.
- }
- }
-
- /**
- * Retrieve an object of Type T associated with the PValue passed in.
- *
- * @param value PValue to retrieve associated data for.
- * @param <T> Type of object to return.
- * @return Native object.
- */
- @SuppressWarnings("TypeParameterUnusedInFormals")
- public <T> T get(PValue value) {
- if (pobjects.containsKey(value)) {
- T result = (T) pobjects.get(value);
- return result;
- }
- if (pcollections.containsKey(value)) {
- JavaRDD<?> rdd = ((BoundedDataset) pcollections.get(value)).getRDD();
- T res = (T) Iterables.getOnlyElement(rdd.collect());
- pobjects.put(value, res);
- return res;
- }
- throw new IllegalStateException("Cannot resolve un-known PObject: " + value);
- }
-
- /**
- * Return the current views creates in the pipeline.
- *
- * @return SparkPCollectionView
- */
- public SparkPCollectionView getPViews() {
- return pviews;
- }
-
- /**
- * Adds/Replaces a view to the current views creates in the pipeline.
- *
- * @param view - Identifier of the view
- * @param value - Actual value of the view
- * @param coder - Coder of the value
- */
- public void putPView(
- PCollectionView<?> view,
- Iterable<WindowedValue<?>> value,
- Coder<Iterable<WindowedValue<?>>> coder) {
- pviews.putPView(view, value, coder);
- }
-
- /**
- * Get the map of cache candidates hold by the evaluation context.
- *
- * @return The current {@link Map} of cache candidates.
- */
- public Map<PCollection, Long> getCacheCandidates() {
- return this.cacheCandidates;
- }
-
- <T> Iterable<WindowedValue<T>> getWindowedValues(PCollection<T> pcollection) {
- @SuppressWarnings("unchecked")
- BoundedDataset<T> boundedDataset = (BoundedDataset<T>) datasets.get(pcollection);
- leaves.remove(boundedDataset);
- return boundedDataset.getValues(pcollection);
- }
-
- public String storageLevel() {
- return serializableOptions.get().as(SparkPipelineOptions.class).getStorageLevel();
- }
-}
diff --git a/runners/spark-structured-streaming/src/main/java/org/apache/beam/runners/spark/structuredstreaming/translation/PipelineTranslator.java b/runners/spark-structured-streaming/src/main/java/org/apache/beam/runners/spark/structuredstreaming/translation/PipelineTranslator.java
new file mode 100644
index 0000000..f0ce1e5
--- /dev/null
+++ b/runners/spark-structured-streaming/src/main/java/org/apache/beam/runners/spark/structuredstreaming/translation/PipelineTranslator.java
@@ -0,0 +1,94 @@
+package org.apache.beam.runners.spark.structuredstreaming.translation;
+
+import org.apache.beam.runners.core.construction.PipelineResources;
+import org.apache.beam.runners.spark.SparkTransformOverrides;
+import org.apache.beam.runners.spark.structuredstreaming.SparkPipelineOptions;
+import org.apache.beam.sdk.Pipeline;
+import org.apache.beam.sdk.runners.TransformHierarchy;
+import org.apache.beam.sdk.values.PCollection;
+import org.apache.beam.sdk.values.PValue;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * Does all the translation work: mode detection, nodes translation.
+ */
+
+public class PipelineTranslator extends Pipeline.PipelineVisitor.Defaults{
+
+
+ /**
+ * Local configurations work in the same JVM and have no problems with improperly formatted files
+ * on classpath (eg. directories with .class files or empty directories). Prepare files for
+ * staging only when using remote cluster (passing the master address explicitly).
+ */
+ public static void prepareFilesToStageForRemoteClusterExecution(SparkPipelineOptions options) {
+ if (!options.getSparkMaster().matches("local\\[?\\d*\\]?")) {
+ options.setFilesToStage(
+ PipelineResources.prepareFilesForStaging(
+ options.getFilesToStage(), options.getTempLocation()));
+ }
+ }
+
+ public static void replaceTransforms(Pipeline pipeline, SparkPipelineOptions options){
+ pipeline.replaceAll(SparkTransformOverrides.getDefaultOverrides(options.isStreaming()));
+
+ }
+
+
+ /** Visit the pipeline to determine the translation mode (batch/streaming) and update options accordingly. */
+ public static void detectTranslationMode(Pipeline pipeline, SparkPipelineOptions options) {
+ TranslationModeDetector detector = new TranslationModeDetector();
+ pipeline.traverseTopologically(detector);
+ if (detector.getTranslationMode().equals(TranslationMode.STREAMING)) {
+ // set streaming mode if it's a streaming pipeline
+ options.setStreaming(true);
+ }
+ }
+
+
+
+
+
+ /** The translation mode of the Beam Pipeline. */
+ private enum TranslationMode {
+
+ /** Uses the batch mode. */
+ BATCH,
+
+ /** Uses the streaming mode. */
+ STREAMING
+ }
+
+ /** Traverses the Pipeline to determine the {@link TranslationMode} for this pipeline. */
+ private static class TranslationModeDetector extends Pipeline.PipelineVisitor.Defaults {
+ private static final Logger LOG = LoggerFactory.getLogger(TranslationModeDetector.class);
+
+ private TranslationMode translationMode;
+
+ TranslationModeDetector(TranslationMode defaultMode) {
+ this.translationMode = defaultMode;
+ }
+
+ TranslationModeDetector() {
+ this(TranslationMode.BATCH);
+ }
+
+ TranslationMode getTranslationMode() {
+ return translationMode;
+ }
+
+ @Override
+ public void visitValue(PValue value, TransformHierarchy.Node producer) {
+ if (translationMode.equals(TranslationMode.BATCH)) {
+ if (value instanceof PCollection
+ && ((PCollection) value).isBounded() == PCollection.IsBounded.UNBOUNDED) {
+ LOG.info(
+ "Found unbounded PCollection {}. Switching to streaming execution.", value.getName());
+ translationMode = TranslationMode.STREAMING;
+ }
+ }
+ }
+ }
+
+}
diff --git a/runners/spark-structured-streaming/src/main/java/org/apache/beam/runners/spark/structuredstreaming/translation/SparkTransformOverrides.java b/runners/spark-structured-streaming/src/main/java/org/apache/beam/runners/spark/structuredstreaming/translation/SparkTransformOverrides.java
new file mode 100644
index 0000000..897ac01
--- /dev/null
+++ b/runners/spark-structured-streaming/src/main/java/org/apache/beam/runners/spark/structuredstreaming/translation/SparkTransformOverrides.java
@@ -0,0 +1,52 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.runners.spark;
+
+import com.google.common.collect.ImmutableList;
+import java.util.List;
+import org.apache.beam.runners.core.construction.PTransformMatchers;
+import org.apache.beam.runners.core.construction.PTransformTranslation;
+import org.apache.beam.runners.core.construction.SplittableParDo;
+import org.apache.beam.runners.core.construction.SplittableParDoNaiveBounded;
+import org.apache.beam.runners.core.construction.UnsupportedOverrideFactory;
+import org.apache.beam.sdk.runners.PTransformOverride;
+import org.apache.beam.sdk.transforms.PTransform;
+
+/** {@link PTransform} overrides for Flink runner. */
+public class SparkTransformOverrides {
+ public static List<PTransformOverride> getDefaultOverrides(boolean streaming) {
+ ImmutableList.Builder<PTransformOverride> builder = ImmutableList.builder();
+ // TODO: [BEAM-5358] Support @RequiresStableInput on Spark runner
+ builder.add(
+ PTransformOverride.of(
+ PTransformMatchers.requiresStableInputParDoMulti(),
+ UnsupportedOverrideFactory.withMessage(
+ "Spark runner currently doesn't support @RequiresStableInput annotation.")));
+ if (!streaming) {
+ builder
+ .add(
+ PTransformOverride.of(
+ PTransformMatchers.splittableParDo(), new SplittableParDo.OverrideFactory()))
+ .add(
+ PTransformOverride.of(
+ PTransformMatchers.urnEqualTo(PTransformTranslation.SPLITTABLE_PROCESS_KEYED_URN),
+ new SplittableParDoNaiveBounded.OverrideFactory()));
+ }
+ return builder.build();
+ }
+}
diff --git a/runners/spark-structured-streaming/src/main/java/org/apache/beam/runners/spark/structuredstreaming/translation/StreamingPipelineTranslator.java b/runners/spark-structured-streaming/src/main/java/org/apache/beam/runners/spark/structuredstreaming/translation/StreamingPipelineTranslator.java
new file mode 100644
index 0000000..2058b37
--- /dev/null
+++ b/runners/spark-structured-streaming/src/main/java/org/apache/beam/runners/spark/structuredstreaming/translation/StreamingPipelineTranslator.java
@@ -0,0 +1,5 @@
+package org.apache.beam.runners.spark.structuredstreaming.translation;
+
+public class StreamingPipelineTranslator extends PipelineTranslator {
+
+}