You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@beam.apache.org by ec...@apache.org on 2018/11/27 15:39:14 UTC

[beam] 04/20: Start pipeline translation

This is an automated email from the ASF dual-hosted git repository.

echauchot pushed a commit to branch spark-runner_structured-streaming
in repository https://gitbox.apache.org/repos/asf/beam.git

commit ad3230463eac22179cd50eade618d9c2879caf76
Author: Etienne Chauchot <ec...@apache.org>
AuthorDate: Fri Nov 16 19:06:04 2018 +0100

    Start pipeline translation
---
 .../structuredstreaming/SparkPipelineResult.java   |  29 +++
 .../spark/structuredstreaming/SparkRunner.java     | 108 +++++++++
 .../translation/BatchPipelineTranslator.java       |  20 ++
 .../translation/EvaluationContext.java             | 261 ---------------------
 .../translation/PipelineTranslator.java            |  94 ++++++++
 .../translation/SparkTransformOverrides.java       |  52 ++++
 .../translation/StreamingPipelineTranslator.java   |   5 +
 7 files changed, 308 insertions(+), 261 deletions(-)

diff --git a/runners/spark-structured-streaming/src/main/java/org/apache/beam/runners/spark/structuredstreaming/SparkPipelineResult.java b/runners/spark-structured-streaming/src/main/java/org/apache/beam/runners/spark/structuredstreaming/SparkPipelineResult.java
new file mode 100644
index 0000000..82d1b90
--- /dev/null
+++ b/runners/spark-structured-streaming/src/main/java/org/apache/beam/runners/spark/structuredstreaming/SparkPipelineResult.java
@@ -0,0 +1,29 @@
+package org.apache.beam.runners.spark.structuredstreaming;
+
+import java.io.IOException;
+import org.apache.beam.sdk.PipelineResult;
+import org.apache.beam.sdk.metrics.MetricResults;
+import org.joda.time.Duration;
+
+public class SparkPipelineResult implements PipelineResult {
+
+  @Override public State getState() {
+    return null;
+  }
+
+  @Override public State cancel() throws IOException {
+    return null;
+  }
+
+  @Override public State waitUntilFinish(Duration duration) {
+    return null;
+  }
+
+  @Override public State waitUntilFinish() {
+    return null;
+  }
+
+  @Override public MetricResults metrics() {
+    return null;
+  }
+}
diff --git a/runners/spark-structured-streaming/src/main/java/org/apache/beam/runners/spark/structuredstreaming/SparkRunner.java b/runners/spark-structured-streaming/src/main/java/org/apache/beam/runners/spark/structuredstreaming/SparkRunner.java
new file mode 100644
index 0000000..62cd7d3
--- /dev/null
+++ b/runners/spark-structured-streaming/src/main/java/org/apache/beam/runners/spark/structuredstreaming/SparkRunner.java
@@ -0,0 +1,108 @@
+package org.apache.beam.runners.spark.structuredstreaming;
+
+import static org.apache.beam.runners.core.construction.PipelineResources.detectClassPathResourcesToStage;
+
+import org.apache.beam.runners.spark.structuredstreaming.translation.BatchPipelineTranslator;
+import org.apache.beam.runners.spark.structuredstreaming.translation.PipelineTranslator;
+import org.apache.beam.runners.spark.structuredstreaming.translation.StreamingPipelineTranslator;
+import org.apache.beam.sdk.Pipeline;
+import org.apache.beam.sdk.PipelineRunner;
+import org.apache.beam.sdk.options.PipelineOptions;
+import org.apache.beam.sdk.options.PipelineOptionsFactory;
+import org.apache.beam.sdk.options.PipelineOptionsValidator;
+import org.apache.spark.sql.SparkSession;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * The SparkRunner translate operations defined on a pipeline to a representation executable by
+ * Spark, and then submitting the job to Spark to be executed. If we wanted to run a Beam pipeline
+ * with the default options of a single threaded spark instance in local mode, we would do the
+ * following:
+ *
+ * <p>{@code Pipeline p = [logic for pipeline creation] SparkPipelineResult result =
+ * (SparkPipelineResult) p.run(); }
+ *
+ * <p>To create a pipeline runner to run against a different spark cluster, with a custom master url
+ * we would do the following:
+ *
+ * <p>{@code Pipeline p = [logic for pipeline creation] SparkPipelineOptions options =
+ * SparkPipelineOptionsFactory.create(); options.setSparkMaster("spark://host:port");
+ * SparkPipelineResult result = (SparkPipelineResult) p.run(); }
+ */
+public final class SparkRunner extends PipelineRunner<SparkPipelineResult> {
+
+  private static final Logger LOG = LoggerFactory.getLogger(SparkRunner.class);
+
+  /** Options used in this pipeline runner. */
+  private final SparkPipelineOptions options;
+
+  /**
+   * Creates and returns a new SparkRunner with default options. In particular, against a spark
+   * instance running in local mode.
+   *
+   * @return A pipeline runner with default options.
+   */
+  public static SparkRunner create() {
+    SparkPipelineOptions options = PipelineOptionsFactory.as(SparkPipelineOptions.class);
+    options.setRunner(SparkRunner.class);
+    return new SparkRunner(options);
+  }
+
+  /**
+   * Creates and returns a new SparkRunner with specified options.
+   *
+   * @param options The SparkPipelineOptions to use when executing the job.
+   * @return A pipeline runner that will execute with specified options.
+   */
+  public static SparkRunner create(SparkPipelineOptions options) {
+    return new SparkRunner(options);
+  }
+
+  /**
+   * Creates and returns a new SparkRunner with specified options.
+   *
+   * @param options The PipelineOptions to use when executing the job.
+   * @return A pipeline runner that will execute with specified options.
+   */
+  public static SparkRunner fromOptions(PipelineOptions options) {
+    SparkPipelineOptions sparkOptions = PipelineOptionsValidator
+        .validate(SparkPipelineOptions.class, options);
+
+    if (sparkOptions.getFilesToStage() == null) {
+      sparkOptions.setFilesToStage(detectClassPathResourcesToStage(SparkRunner.class.getClassLoader()));
+      LOG.info("PipelineOptions.filesToStage was not specified. "
+              + "Defaulting to files from the classpath: will stage {} files. "
+              + "Enable logging at DEBUG level to see which files will be staged.",
+          sparkOptions.getFilesToStage().size());
+      LOG.debug("Classpath elements: {}", sparkOptions.getFilesToStage());
+    }
+
+    return new SparkRunner(sparkOptions);
+  }
+
+  /**
+   * No parameter constructor defaults to running this pipeline in Spark's local mode, in a single
+   * thread.
+   */
+  private SparkRunner(SparkPipelineOptions options) {
+    this.options = options;
+  }
+
+  @Override public SparkPipelineResult run(final Pipeline pipeline) {
+    translatePipeline(pipeline);
+    executePipeline(pipeline);
+    return new SparkPipelineResult();
+  }
+
+  private void translatePipeline(Pipeline pipeline){
+    PipelineTranslator.detectTranslationMode(pipeline, options);
+    PipelineTranslator.replaceTransforms(pipeline, options);
+    PipelineTranslator.prepareFilesToStageForRemoteClusterExecution(options);
+    PipelineTranslator translator = options.isStreaming() ? new StreamingPipelineTranslator() : new BatchPipelineTranslator()
+    //init translator with subclass based on mode and env
+    translator.translate(pipeline);
+  }
+  public void executePipeline(Pipeline pipeline) {}
+
+}
diff --git a/runners/spark-structured-streaming/src/main/java/org/apache/beam/runners/spark/structuredstreaming/translation/BatchPipelineTranslator.java b/runners/spark-structured-streaming/src/main/java/org/apache/beam/runners/spark/structuredstreaming/translation/BatchPipelineTranslator.java
new file mode 100644
index 0000000..e66555c
--- /dev/null
+++ b/runners/spark-structured-streaming/src/main/java/org/apache/beam/runners/spark/structuredstreaming/translation/BatchPipelineTranslator.java
@@ -0,0 +1,20 @@
+package org.apache.beam.runners.spark.structuredstreaming.translation;
+
+import org.apache.beam.sdk.Pipeline;
+import org.apache.beam.sdk.runners.TransformHierarchy;
+import org.apache.beam.sdk.values.PValue;
+
+public class BatchPipelineTranslator extends PipelineTranslator {
+
+
+  @Override public CompositeBehavior enterCompositeTransform(TransformHierarchy.Node node) {
+    return super.enterCompositeTransform(node);
+  }
+
+
+  @Override public void visitPrimitiveTransform(TransformHierarchy.Node node) {
+    super.visitPrimitiveTransform(node);
+  }
+
+
+}
diff --git a/runners/spark-structured-streaming/src/main/java/org/apache/beam/runners/spark/structuredstreaming/translation/EvaluationContext.java b/runners/spark-structured-streaming/src/main/java/org/apache/beam/runners/spark/structuredstreaming/translation/EvaluationContext.java
deleted file mode 100644
index 47a3098..0000000
--- a/runners/spark-structured-streaming/src/main/java/org/apache/beam/runners/spark/structuredstreaming/translation/EvaluationContext.java
+++ /dev/null
@@ -1,261 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.beam.runners.spark.structuredstreaming.translation;
-
-import static com.google.common.base.Preconditions.checkArgument;
-
-import com.google.common.collect.Iterables;
-import java.util.HashMap;
-import java.util.LinkedHashMap;
-import java.util.LinkedHashSet;
-import java.util.Map;
-import java.util.Set;
-import java.util.stream.Collectors;
-import org.apache.beam.runners.core.construction.SerializablePipelineOptions;
-import org.apache.beam.runners.core.construction.TransformInputs;
-import org.apache.beam.runners.spark.structuredstreaming.SparkPipelineOptions;
-import org.apache.beam.sdk.Pipeline;
-import org.apache.beam.sdk.coders.Coder;
-import org.apache.beam.sdk.options.PipelineOptions;
-import org.apache.beam.sdk.runners.AppliedPTransform;
-import org.apache.beam.sdk.transforms.PTransform;
-import org.apache.beam.sdk.transforms.windowing.BoundedWindow;
-import org.apache.beam.sdk.util.WindowedValue;
-import org.apache.beam.sdk.values.PCollection;
-import org.apache.beam.sdk.values.PCollectionView;
-import org.apache.beam.sdk.values.PValue;
-import org.apache.beam.sdk.values.TupleTag;
-import org.apache.spark.api.java.JavaRDD;
-import org.apache.spark.api.java.JavaSparkContext;
-import org.apache.spark.sql.SparkSession;
-import org.apache.spark.streaming.api.java.JavaStreamingContext;
-
-/**
- * The EvaluationContext allows us to define pipeline instructions and translate between {@code
- * PObject<T>}s or {@code PCollection<T>}s and Ts or DStreams/RDDs of Ts.
- */
-public class EvaluationContext {
-  private SparkSession sparkSession;
-  private final Pipeline pipeline;
-  private final Map<PValue, Dataset> datasets = new LinkedHashMap<>();
-  private final Map<PValue, Dataset> pcollections = new LinkedHashMap<>();
-  private final Set<Dataset> leaves = new LinkedHashSet<>();
-  private final Map<PValue, Object> pobjects = new LinkedHashMap<>();
-  private AppliedPTransform<?, ?, ?> currentTransform;
-  private final SparkPCollectionView pviews = new SparkPCollectionView();
-  private final Map<PCollection, Long> cacheCandidates = new HashMap<>();
-  private final PipelineOptions options;
-  private final SerializablePipelineOptions serializableOptions;
-
-  public EvaluationContext(JavaSparkContext jsc, Pipeline pipeline, PipelineOptions options) {
-    this.jsc = jsc;
-    this.pipeline = pipeline;
-    this.options = options;
-    this.serializableOptions = new SerializablePipelineOptions(options);
-  }
-
-  public EvaluationContext(
-      JavaSparkContext jsc, Pipeline pipeline, PipelineOptions options, JavaStreamingContext jssc) {
-    this(jsc, pipeline, options);
-    this.jssc = jssc;
-  }
-
-  public JavaSparkContext getSparkContext() {
-    return jsc;
-  }
-
-  public JavaStreamingContext getStreamingContext() {
-    return jssc;
-  }
-
-  public Pipeline getPipeline() {
-    return pipeline;
-  }
-
-  public PipelineOptions getOptions() {
-    return options;
-  }
-
-  public SerializablePipelineOptions getSerializableOptions() {
-    return serializableOptions;
-  }
-
-  public void setCurrentTransform(AppliedPTransform<?, ?, ?> transform) {
-    this.currentTransform = transform;
-  }
-
-  public AppliedPTransform<?, ?, ?> getCurrentTransform() {
-    return currentTransform;
-  }
-
-  public <T extends PValue> T getInput(PTransform<T, ?> transform) {
-    @SuppressWarnings("unchecked")
-    T input =
-        (T) Iterables.getOnlyElement(TransformInputs.nonAdditionalInputs(getCurrentTransform()));
-    return input;
-  }
-
-  public <T> Map<TupleTag<?>, PValue> getInputs(PTransform<?, ?> transform) {
-    checkArgument(currentTransform != null, "can only be called with non-null currentTransform");
-    checkArgument(
-        currentTransform.getTransform() == transform, "can only be called with current transform");
-    return currentTransform.getInputs();
-  }
-
-  public <T extends PValue> T getOutput(PTransform<?, T> transform) {
-    @SuppressWarnings("unchecked")
-    T output = (T) Iterables.getOnlyElement(getOutputs(transform).values());
-    return output;
-  }
-
-  public Map<TupleTag<?>, PValue> getOutputs(PTransform<?, ?> transform) {
-    checkArgument(currentTransform != null, "can only be called with non-null currentTransform");
-    checkArgument(
-        currentTransform.getTransform() == transform, "can only be called with current transform");
-    return currentTransform.getOutputs();
-  }
-
-  public Map<TupleTag<?>, Coder<?>> getOutputCoders() {
-    return currentTransform
-        .getOutputs()
-        .entrySet()
-        .stream()
-        .filter(e -> e.getValue() instanceof PCollection)
-        .collect(Collectors.toMap(e -> e.getKey(), e -> ((PCollection) e.getValue()).getCoder()));
-  }
-
-  private boolean shouldCache(PValue pvalue) {
-    if ((pvalue instanceof PCollection)
-        && cacheCandidates.containsKey(pvalue)
-        && cacheCandidates.get(pvalue) > 1) {
-      return true;
-    }
-    return false;
-  }
-
-  public void putDataset(
-      PTransform<?, ? extends PValue> transform, Dataset dataset, boolean forceCache) {
-    putDataset(getOutput(transform), dataset, forceCache);
-  }
-
-  public void putDataset(PTransform<?, ? extends PValue> transform, Dataset dataset) {
-    putDataset(transform, dataset, false);
-  }
-
-  public void putDataset(PValue pvalue, Dataset dataset, boolean forceCache) {
-    try {
-      dataset.setName(pvalue.getName());
-    } catch (IllegalStateException e) {
-      // name not set, ignore
-    }
-    if ((forceCache || shouldCache(pvalue)) && pvalue instanceof PCollection) {
-      // we cache only PCollection
-      Coder<?> coder = ((PCollection<?>) pvalue).getCoder();
-      Coder<? extends BoundedWindow> wCoder =
-          ((PCollection<?>) pvalue).getWindowingStrategy().getWindowFn().windowCoder();
-      dataset.cache(storageLevel(), WindowedValue.getFullCoder(coder, wCoder));
-    }
-    datasets.put(pvalue, dataset);
-    leaves.add(dataset);
-  }
-
-  public Dataset borrowDataset(PTransform<? extends PValue, ?> transform) {
-    return borrowDataset(getInput(transform));
-  }
-
-  public Dataset borrowDataset(PValue pvalue) {
-    Dataset dataset = datasets.get(pvalue);
-    leaves.remove(dataset);
-    return dataset;
-  }
-
-  /**
-   * Computes the outputs for all RDDs that are leaves in the DAG and do not have any actions (like
-   * saving to a file) registered on them (i.e. they are performed for side effects).
-   */
-  public void computeOutputs() {
-    for (Dataset dataset : leaves) {
-      dataset.action(); // force computation.
-    }
-  }
-
-  /**
-   * Retrieve an object of Type T associated with the PValue passed in.
-   *
-   * @param value PValue to retrieve associated data for.
-   * @param <T> Type of object to return.
-   * @return Native object.
-   */
-  @SuppressWarnings("TypeParameterUnusedInFormals")
-  public <T> T get(PValue value) {
-    if (pobjects.containsKey(value)) {
-      T result = (T) pobjects.get(value);
-      return result;
-    }
-    if (pcollections.containsKey(value)) {
-      JavaRDD<?> rdd = ((BoundedDataset) pcollections.get(value)).getRDD();
-      T res = (T) Iterables.getOnlyElement(rdd.collect());
-      pobjects.put(value, res);
-      return res;
-    }
-    throw new IllegalStateException("Cannot resolve un-known PObject: " + value);
-  }
-
-  /**
-   * Return the current views creates in the pipeline.
-   *
-   * @return SparkPCollectionView
-   */
-  public SparkPCollectionView getPViews() {
-    return pviews;
-  }
-
-  /**
-   * Adds/Replaces a view to the current views creates in the pipeline.
-   *
-   * @param view - Identifier of the view
-   * @param value - Actual value of the view
-   * @param coder - Coder of the value
-   */
-  public void putPView(
-      PCollectionView<?> view,
-      Iterable<WindowedValue<?>> value,
-      Coder<Iterable<WindowedValue<?>>> coder) {
-    pviews.putPView(view, value, coder);
-  }
-
-  /**
-   * Get the map of cache candidates hold by the evaluation context.
-   *
-   * @return The current {@link Map} of cache candidates.
-   */
-  public Map<PCollection, Long> getCacheCandidates() {
-    return this.cacheCandidates;
-  }
-
-  <T> Iterable<WindowedValue<T>> getWindowedValues(PCollection<T> pcollection) {
-    @SuppressWarnings("unchecked")
-    BoundedDataset<T> boundedDataset = (BoundedDataset<T>) datasets.get(pcollection);
-    leaves.remove(boundedDataset);
-    return boundedDataset.getValues(pcollection);
-  }
-
-  public String storageLevel() {
-    return serializableOptions.get().as(SparkPipelineOptions.class).getStorageLevel();
-  }
-}
diff --git a/runners/spark-structured-streaming/src/main/java/org/apache/beam/runners/spark/structuredstreaming/translation/PipelineTranslator.java b/runners/spark-structured-streaming/src/main/java/org/apache/beam/runners/spark/structuredstreaming/translation/PipelineTranslator.java
new file mode 100644
index 0000000..f0ce1e5
--- /dev/null
+++ b/runners/spark-structured-streaming/src/main/java/org/apache/beam/runners/spark/structuredstreaming/translation/PipelineTranslator.java
@@ -0,0 +1,94 @@
+package org.apache.beam.runners.spark.structuredstreaming.translation;
+
+import org.apache.beam.runners.core.construction.PipelineResources;
+import org.apache.beam.runners.spark.SparkTransformOverrides;
+import org.apache.beam.runners.spark.structuredstreaming.SparkPipelineOptions;
+import org.apache.beam.sdk.Pipeline;
+import org.apache.beam.sdk.runners.TransformHierarchy;
+import org.apache.beam.sdk.values.PCollection;
+import org.apache.beam.sdk.values.PValue;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * Does all the translation work: mode detection, nodes translation.
+ */
+
+public class PipelineTranslator extends Pipeline.PipelineVisitor.Defaults{
+
+
+  /**
+   * Local configurations work in the same JVM and have no problems with improperly formatted files
+   * on classpath (eg. directories with .class files or empty directories). Prepare files for
+   * staging only when using remote cluster (passing the master address explicitly).
+   */
+  public static void prepareFilesToStageForRemoteClusterExecution(SparkPipelineOptions options) {
+    if (!options.getSparkMaster().matches("local\\[?\\d*\\]?")) {
+      options.setFilesToStage(
+          PipelineResources.prepareFilesForStaging(
+              options.getFilesToStage(), options.getTempLocation()));
+    }
+  }
+
+  public static void replaceTransforms(Pipeline pipeline, SparkPipelineOptions options){
+    pipeline.replaceAll(SparkTransformOverrides.getDefaultOverrides(options.isStreaming()));
+
+  }
+
+
+  /** Visit the pipeline to determine the translation mode (batch/streaming) and update options accordingly. */
+  public static void detectTranslationMode(Pipeline pipeline, SparkPipelineOptions options) {
+    TranslationModeDetector detector = new TranslationModeDetector();
+    pipeline.traverseTopologically(detector);
+    if (detector.getTranslationMode().equals(TranslationMode.STREAMING)) {
+      // set streaming mode if it's a streaming pipeline
+      options.setStreaming(true);
+    }
+  }
+
+
+
+
+
+  /** The translation mode of the Beam Pipeline. */
+  private enum TranslationMode {
+
+    /** Uses the batch mode. */
+    BATCH,
+
+    /** Uses the streaming mode. */
+    STREAMING
+  }
+
+  /** Traverses the Pipeline to determine the {@link TranslationMode} for this pipeline. */
+  private static class TranslationModeDetector extends Pipeline.PipelineVisitor.Defaults {
+    private static final Logger LOG = LoggerFactory.getLogger(TranslationModeDetector.class);
+
+    private TranslationMode translationMode;
+
+    TranslationModeDetector(TranslationMode defaultMode) {
+      this.translationMode = defaultMode;
+    }
+
+    TranslationModeDetector() {
+      this(TranslationMode.BATCH);
+    }
+
+    TranslationMode getTranslationMode() {
+      return translationMode;
+    }
+
+    @Override
+    public void visitValue(PValue value, TransformHierarchy.Node producer) {
+      if (translationMode.equals(TranslationMode.BATCH)) {
+        if (value instanceof PCollection
+            && ((PCollection) value).isBounded() == PCollection.IsBounded.UNBOUNDED) {
+          LOG.info(
+              "Found unbounded PCollection {}. Switching to streaming execution.", value.getName());
+          translationMode = TranslationMode.STREAMING;
+        }
+      }
+    }
+  }
+
+}
diff --git a/runners/spark-structured-streaming/src/main/java/org/apache/beam/runners/spark/structuredstreaming/translation/SparkTransformOverrides.java b/runners/spark-structured-streaming/src/main/java/org/apache/beam/runners/spark/structuredstreaming/translation/SparkTransformOverrides.java
new file mode 100644
index 0000000..897ac01
--- /dev/null
+++ b/runners/spark-structured-streaming/src/main/java/org/apache/beam/runners/spark/structuredstreaming/translation/SparkTransformOverrides.java
@@ -0,0 +1,52 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.runners.spark;
+
+import com.google.common.collect.ImmutableList;
+import java.util.List;
+import org.apache.beam.runners.core.construction.PTransformMatchers;
+import org.apache.beam.runners.core.construction.PTransformTranslation;
+import org.apache.beam.runners.core.construction.SplittableParDo;
+import org.apache.beam.runners.core.construction.SplittableParDoNaiveBounded;
+import org.apache.beam.runners.core.construction.UnsupportedOverrideFactory;
+import org.apache.beam.sdk.runners.PTransformOverride;
+import org.apache.beam.sdk.transforms.PTransform;
+
+/** {@link PTransform} overrides for Flink runner. */
+public class SparkTransformOverrides {
+  public static List<PTransformOverride> getDefaultOverrides(boolean streaming) {
+    ImmutableList.Builder<PTransformOverride> builder = ImmutableList.builder();
+    // TODO: [BEAM-5358] Support @RequiresStableInput on Spark runner
+    builder.add(
+        PTransformOverride.of(
+            PTransformMatchers.requiresStableInputParDoMulti(),
+            UnsupportedOverrideFactory.withMessage(
+                "Spark runner currently doesn't support @RequiresStableInput annotation.")));
+    if (!streaming) {
+      builder
+          .add(
+              PTransformOverride.of(
+                  PTransformMatchers.splittableParDo(), new SplittableParDo.OverrideFactory()))
+          .add(
+              PTransformOverride.of(
+                  PTransformMatchers.urnEqualTo(PTransformTranslation.SPLITTABLE_PROCESS_KEYED_URN),
+                  new SplittableParDoNaiveBounded.OverrideFactory()));
+    }
+    return builder.build();
+  }
+}
diff --git a/runners/spark-structured-streaming/src/main/java/org/apache/beam/runners/spark/structuredstreaming/translation/StreamingPipelineTranslator.java b/runners/spark-structured-streaming/src/main/java/org/apache/beam/runners/spark/structuredstreaming/translation/StreamingPipelineTranslator.java
new file mode 100644
index 0000000..2058b37
--- /dev/null
+++ b/runners/spark-structured-streaming/src/main/java/org/apache/beam/runners/spark/structuredstreaming/translation/StreamingPipelineTranslator.java
@@ -0,0 +1,5 @@
+package org.apache.beam.runners.spark.structuredstreaming.translation;
+
+public class StreamingPipelineTranslator extends PipelineTranslator {
+
+}