You are viewing a plain text version of this content. The canonical link for it is here.
Posted to github@beam.apache.org by GitBox <gi...@apache.org> on 2020/08/15 00:26:02 UTC

[GitHub] [beam] ibzib commented on a change in pull request #12157: [BEAM-7587] Spark portable streaming

ibzib commented on a change in pull request #12157:
URL: https://github.com/apache/beam/pull/12157#discussion_r470894541



##########
File path: runners/spark/src/main/java/org/apache/beam/runners/spark/SparkPipelineRunner.java
##########
@@ -71,7 +79,15 @@ public SparkPipelineRunner(SparkPipelineOptions pipelineOptions) {
 
   @Override
   public PortablePipelineResult run(RunnerApi.Pipeline pipeline, JobInfo jobInfo) {
-    SparkBatchPortablePipelineTranslator translator = new SparkBatchPortablePipelineTranslator();
+    boolean isStreaming;
+    SparkPortablePipelineTranslator translator;
+    if (!pipelineOptions.isStreaming() && !hasUnboundedPCollections(pipeline)) {

Review comment:
       Nit: reorder this to make it more readable.
   
   ```java
   boolean isStreaming = pipelineOptions.isStreaming() || hasUnboundedPCollections(pipeline);
   if (isStreaming) {
   ...
   } else {
   ...
   }
   ```

##########
File path: runners/spark/src/main/java/org/apache/beam/runners/spark/SparkPortableStreamingPipelineOptions.java
##########
@@ -0,0 +1,38 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.runners.spark;
+
+import org.apache.beam.sdk.annotations.Experimental;
+import org.apache.beam.sdk.options.Default;
+import org.apache.beam.sdk.options.Description;
+import org.apache.beam.sdk.options.PipelineOptions;
+import org.apache.beam.sdk.options.PortablePipelineOptions;
+
+/**
+ * Spark runner {@link PipelineOptions} handles Spark execution-related configurations, such as the

Review comment:
       Change this.

##########
File path: runners/spark/src/main/java/org/apache/beam/runners/spark/SparkPortableStreamingPipelineOptions.java
##########
@@ -0,0 +1,38 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.runners.spark;
+
+import org.apache.beam.sdk.annotations.Experimental;
+import org.apache.beam.sdk.options.Default;
+import org.apache.beam.sdk.options.Description;
+import org.apache.beam.sdk.options.PipelineOptions;
+import org.apache.beam.sdk.options.PortablePipelineOptions;
+
+/**
+ * Spark runner {@link PipelineOptions} handles Spark execution-related configurations, such as the
+ * master address, batch-interval, and other user-related knobs.
+ */
+@Experimental
+public interface SparkPortableStreamingPipelineOptions
+    extends SparkPipelineOptions, PortablePipelineOptions, PipelineOptions {
+  @Description("Timeout for testing Spark portable streaming, in milliseconds.")
+  @Default.Long(-1L)
+  Long getTimeout();

Review comment:
       Pipeline options all occupy the same namespace. We should give this option a name more specific to its purpose to prevent collisions.
   
   Also, this is just personal preference, but I like when folks suffix their variable names with the units (here, `Ms` or `Millis` or `Milliseconds`) when using raw numbers as time values.

##########
File path: runners/spark/src/main/java/org/apache/beam/runners/spark/translation/SparkStreamingPortablePipelineTranslator.java
##########
@@ -0,0 +1,393 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.runners.spark.translation;
+
+import static org.apache.beam.runners.fnexecution.translation.PipelineTranslatorUtils.createOutputMap;
+import static org.apache.beam.runners.fnexecution.translation.PipelineTranslatorUtils.getWindowingStrategy;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Queue;
+import java.util.Set;
+import java.util.concurrent.LinkedBlockingQueue;
+import java.util.stream.Collectors;
+import java.util.stream.StreamSupport;
+import org.apache.beam.model.pipeline.v1.RunnerApi;
+import org.apache.beam.runners.core.construction.PTransformTranslation;
+import org.apache.beam.runners.core.construction.graph.ExecutableStage;
+import org.apache.beam.runners.core.construction.graph.PipelineNode;
+import org.apache.beam.runners.core.construction.graph.PipelineNode.PTransformNode;
+import org.apache.beam.runners.core.construction.graph.QueryablePipeline;
+import org.apache.beam.runners.fnexecution.provisioning.JobInfo;
+import org.apache.beam.runners.fnexecution.wire.WireCoders;
+import org.apache.beam.runners.spark.SparkPipelineOptions;
+import org.apache.beam.runners.spark.coders.CoderHelpers;
+import org.apache.beam.runners.spark.metrics.MetricsAccumulator;
+import org.apache.beam.runners.spark.stateful.SparkGroupAlsoByWindowViaWindowSet;
+import org.apache.beam.runners.spark.translation.streaming.UnboundedDataset;
+import org.apache.beam.runners.spark.util.GlobalWatermarkHolder;
+import org.apache.beam.runners.spark.util.SparkCompat;
+import org.apache.beam.sdk.coders.ByteArrayCoder;
+import org.apache.beam.sdk.coders.Coder;
+import org.apache.beam.sdk.coders.KvCoder;
+import org.apache.beam.sdk.transforms.join.RawUnionValue;
+import org.apache.beam.sdk.transforms.windowing.BoundedWindow;
+import org.apache.beam.sdk.transforms.windowing.GlobalWindow;
+import org.apache.beam.sdk.transforms.windowing.PaneInfo;
+import org.apache.beam.sdk.transforms.windowing.WindowFn;
+import org.apache.beam.sdk.util.WindowedValue;
+import org.apache.beam.sdk.values.KV;
+import org.apache.beam.sdk.values.TimestampedValue;
+import org.apache.beam.sdk.values.WindowingStrategy;
+import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.collect.BiMap;
+import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.collect.ImmutableMap;
+import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.collect.Iterables;
+import org.apache.spark.api.java.JavaRDD;
+import org.apache.spark.api.java.JavaSparkContext;
+import org.apache.spark.broadcast.Broadcast;
+import org.apache.spark.storage.StorageLevel;
+import org.apache.spark.streaming.api.java.JavaDStream;
+import org.apache.spark.streaming.api.java.JavaInputDStream;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import scala.Tuple2;
+
+/** Translates a bounded portable pipeline into a Spark job. */

Review comment:
       ```suggestion
   /** Translates an unbounded portable pipeline into a Spark job. */
   ```

##########
File path: runners/spark/src/main/java/org/apache/beam/runners/spark/translation/SparkStreamingPortablePipelineTranslator.java
##########
@@ -0,0 +1,393 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.runners.spark.translation;
+
+import static org.apache.beam.runners.fnexecution.translation.PipelineTranslatorUtils.createOutputMap;
+import static org.apache.beam.runners.fnexecution.translation.PipelineTranslatorUtils.getWindowingStrategy;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Queue;
+import java.util.Set;
+import java.util.concurrent.LinkedBlockingQueue;
+import java.util.stream.Collectors;
+import java.util.stream.StreamSupport;
+import org.apache.beam.model.pipeline.v1.RunnerApi;
+import org.apache.beam.runners.core.construction.PTransformTranslation;
+import org.apache.beam.runners.core.construction.graph.ExecutableStage;
+import org.apache.beam.runners.core.construction.graph.PipelineNode;
+import org.apache.beam.runners.core.construction.graph.PipelineNode.PTransformNode;
+import org.apache.beam.runners.core.construction.graph.QueryablePipeline;
+import org.apache.beam.runners.fnexecution.provisioning.JobInfo;
+import org.apache.beam.runners.fnexecution.wire.WireCoders;
+import org.apache.beam.runners.spark.SparkPipelineOptions;
+import org.apache.beam.runners.spark.coders.CoderHelpers;
+import org.apache.beam.runners.spark.metrics.MetricsAccumulator;
+import org.apache.beam.runners.spark.stateful.SparkGroupAlsoByWindowViaWindowSet;
+import org.apache.beam.runners.spark.translation.streaming.UnboundedDataset;
+import org.apache.beam.runners.spark.util.GlobalWatermarkHolder;
+import org.apache.beam.runners.spark.util.SparkCompat;
+import org.apache.beam.sdk.coders.ByteArrayCoder;
+import org.apache.beam.sdk.coders.Coder;
+import org.apache.beam.sdk.coders.KvCoder;
+import org.apache.beam.sdk.transforms.join.RawUnionValue;
+import org.apache.beam.sdk.transforms.windowing.BoundedWindow;
+import org.apache.beam.sdk.transforms.windowing.GlobalWindow;
+import org.apache.beam.sdk.transforms.windowing.PaneInfo;
+import org.apache.beam.sdk.transforms.windowing.WindowFn;
+import org.apache.beam.sdk.util.WindowedValue;
+import org.apache.beam.sdk.values.KV;
+import org.apache.beam.sdk.values.TimestampedValue;
+import org.apache.beam.sdk.values.WindowingStrategy;
+import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.collect.BiMap;
+import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.collect.ImmutableMap;
+import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.collect.Iterables;
+import org.apache.spark.api.java.JavaRDD;
+import org.apache.spark.api.java.JavaSparkContext;
+import org.apache.spark.broadcast.Broadcast;
+import org.apache.spark.storage.StorageLevel;
+import org.apache.spark.streaming.api.java.JavaDStream;
+import org.apache.spark.streaming.api.java.JavaInputDStream;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import scala.Tuple2;
+
+/** Translates a bounded portable pipeline into a Spark job. */
+public class SparkStreamingPortablePipelineTranslator
+    implements SparkPortablePipelineTranslator<SparkStreamingTranslationContext> {
+
+  private static final Logger LOG =
+      LoggerFactory.getLogger(SparkStreamingPortablePipelineTranslator.class);
+
+  private final ImmutableMap<String, PTransformTranslator> urnToTransformTranslator;
+
+  interface PTransformTranslator {
+
+    /** Translates transformNode from Beam into the Spark context. */
+    void translate(
+        PTransformNode transformNode,
+        RunnerApi.Pipeline pipeline,
+        SparkStreamingTranslationContext context);
+  }
+
+  @Override
+  public Set<String> knownUrns() {
+    return urnToTransformTranslator.keySet();
+  }
+
+  public SparkStreamingPortablePipelineTranslator() {
+    ImmutableMap.Builder<String, PTransformTranslator> translatorMap = ImmutableMap.builder();
+    translatorMap.put(
+        PTransformTranslation.IMPULSE_TRANSFORM_URN,
+        SparkStreamingPortablePipelineTranslator::translateImpulse);
+    translatorMap.put(
+        PTransformTranslation.GROUP_BY_KEY_TRANSFORM_URN,
+        SparkStreamingPortablePipelineTranslator::translateGroupByKey);
+    translatorMap.put(
+        ExecutableStage.URN, SparkStreamingPortablePipelineTranslator::translateExecutableStage);
+    translatorMap.put(
+        PTransformTranslation.FLATTEN_TRANSFORM_URN,
+        SparkStreamingPortablePipelineTranslator::translateFlatten);
+    translatorMap.put(
+        PTransformTranslation.RESHUFFLE_URN,
+        SparkStreamingPortablePipelineTranslator::translateReshuffle);
+    this.urnToTransformTranslator = translatorMap.build();
+  }
+
+  /** Translates pipeline from Beam into the Spark context. */
+  @Override
+  public void translate(
+      final RunnerApi.Pipeline pipeline, SparkStreamingTranslationContext context) {
+    QueryablePipeline p =
+        QueryablePipeline.forTransforms(
+            pipeline.getRootTransformIdsList(), pipeline.getComponents());
+    for (PipelineNode.PTransformNode transformNode : p.getTopologicallyOrderedTransforms()) {
+      urnToTransformTranslator
+          .getOrDefault(
+              transformNode.getTransform().getSpec().getUrn(),
+              SparkStreamingPortablePipelineTranslator::urnNotFound)
+          .translate(transformNode, pipeline, context);
+    }
+  }
+
+  private static void urnNotFound(
+      PTransformNode transformNode,
+      RunnerApi.Pipeline pipeline,
+      SparkStreamingTranslationContext context) {
+    throw new IllegalArgumentException(
+        String.format(
+            "Transform %s has unknown URN %s",
+            transformNode.getId(), transformNode.getTransform().getSpec().getUrn()));
+  }
+
+  private static void translateImpulse(
+      PTransformNode transformNode,
+      RunnerApi.Pipeline pipeline,
+      SparkStreamingTranslationContext context) {
+
+    TimestampedValue<byte[]> tsValue = TimestampedValue.atMinimumTimestamp(new byte[0]);
+    Iterable<TimestampedValue<byte[]>> timestampedValues = Collections.singletonList(tsValue);
+    Iterable<WindowedValue<byte[]>> windowedValues =
+        StreamSupport.stream(timestampedValues.spliterator(), false)
+            .map(
+                timestampedValue ->
+                    WindowedValue.of(
+                        timestampedValue.getValue(),
+                        timestampedValue.getTimestamp(),
+                        GlobalWindow.INSTANCE,
+                        PaneInfo.NO_FIRING))
+            .collect(Collectors.toList());
+
+    ByteArrayCoder coder = ByteArrayCoder.of();

Review comment:
       Super nit: this line doesn't deserve a newline following it.

##########
File path: runners/spark/src/main/java/org/apache/beam/runners/spark/SparkPipelineRunner.java
##########
@@ -106,39 +123,95 @@ public PortablePipelineResult run(RunnerApi.Pipeline pipeline, JobInfo jobInfo)
         pipelineOptions.getFilesToStage().size());
     LOG.debug("Staging files: {}", pipelineOptions.getFilesToStage());
 
+    PortablePipelineResult result;
     final JavaSparkContext jsc = SparkContextFactory.getSparkContext(pipelineOptions);
+
     LOG.info(String.format("Running job %s on Spark master %s", jobInfo.jobId(), jsc.master()));
-    AggregatorsAccumulator.init(pipelineOptions, jsc);
 
+    // Initialize accumulators.
+    AggregatorsAccumulator.init(pipelineOptions, jsc);
     MetricsEnvironment.setMetricsSupported(true);
     MetricsAccumulator.init(pipelineOptions, jsc);
 
     final SparkTranslationContext context =
-        new SparkTranslationContext(jsc, pipelineOptions, jobInfo);
+        translator.createTranslationContext(jsc, pipelineOptions, jobInfo);
     final ExecutorService executorService = Executors.newSingleThreadExecutor();
-    final Future<?> submissionFuture =
-        executorService.submit(
-            () -> {
-              translator.translate(fusedPipeline, context);
-              LOG.info(
-                  String.format(
-                      "Job %s: Pipeline translated successfully. Computing outputs",
-                      jobInfo.jobId()));
-              context.computeOutputs();
-              LOG.info(String.format("Job %s finished.", jobInfo.jobId()));
-            });
-
-    PortablePipelineResult result =
-        new SparkPipelineResult.PortableBatchMode(submissionFuture, jsc);
+
+    LOG.info(String.format("Running job %s on Spark master %s", jobInfo.jobId(), jsc.master()));
+
+    if (isStreaming) {
+      final JavaStreamingContext jssc =
+          ((SparkStreamingTranslationContext) context).getStreamingContext();
+
+      jssc.addStreamingListener(
+          new JavaStreamingListenerWrapper(
+              new AggregatorsAccumulator.AccumulatorCheckpointingSparkListener()));
+      jssc.addStreamingListener(
+          new JavaStreamingListenerWrapper(
+              new MetricsAccumulator.AccumulatorCheckpointingSparkListener()));
+
+      // register user-defined listeners.
+      for (JavaStreamingListener listener :
+          pipelineOptions.as(SparkContextOptions.class).getListeners()) {
+        LOG.info("Registered listener {}." + listener.getClass().getSimpleName());
+        jssc.addStreamingListener(new JavaStreamingListenerWrapper(listener));
+      }
+
+      // register Watermarks listener to broadcast the advanced WMs.
+      jssc.addStreamingListener(
+          new JavaStreamingListenerWrapper(
+              new GlobalWatermarkHolder.WatermarkAdvancingStreamingListener()));
+
+      jssc.checkpoint(pipelineOptions.getCheckpointDir());
+
+      // Obtain timeout from options.
+      Long timeout = pipelineOptions.as(SparkPortableStreamingPipelineOptions.class).getTimeout();
+
+      final Future<?> submissionFuture =
+          executorService.submit(
+              () -> {
+                translator.translate(fusedPipeline, context);
+                LOG.info(
+                    String.format(
+                        "Job %s: Pipeline translated successfully. Computing outputs",
+                        jobInfo.jobId()));
+                context.computeOutputs();
+
+                jssc.start();
+                try {
+                  jssc.awaitTerminationOrTimeout(timeout);
+                } catch (InterruptedException e) {
+                  LOG.warn("Streaming context interrupted, shutting down.");
+                  e.printStackTrace();

Review comment:
       Prefer to go through the logger rather than using `printStackTrace`.
   
   All the `LOG` methods take an exception as an additional argument, like this: https://github.com/apache/beam/blob/e725118fb1afb6d869b9b4e19110aae26ddd26ed/runners/spark/src/main/java/org/apache/beam/runners/spark/SparkJobServerDriver.java#L71

##########
File path: runners/spark/src/main/java/org/apache/beam/runners/spark/SparkPortableStreamingPipelineOptions.java
##########
@@ -0,0 +1,38 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.runners.spark;
+
+import org.apache.beam.sdk.annotations.Experimental;
+import org.apache.beam.sdk.options.Default;
+import org.apache.beam.sdk.options.Description;
+import org.apache.beam.sdk.options.PipelineOptions;
+import org.apache.beam.sdk.options.PortablePipelineOptions;
+
+/**
+ * Spark runner {@link PipelineOptions} handles Spark execution-related configurations, such as the
+ * master address, batch-interval, and other user-related knobs.
+ */
+@Experimental
+public interface SparkPortableStreamingPipelineOptions
+    extends SparkPipelineOptions, PortablePipelineOptions, PipelineOptions {
+  @Description("Timeout for testing Spark portable streaming, in milliseconds.")

Review comment:
       If `-1L` has a special meaning (ie infinity), document it here.

##########
File path: runners/spark/job-server/build.gradle
##########
@@ -82,53 +82,116 @@ runShadow {
     jvmArgs += ["-Dorg.slf4j.simpleLogger.defaultLogLevel=${project.property('logLevel')}"]
 }
 
-def portableValidatesRunnerTask(String name) {
-  createPortableValidatesRunnerTask(
-    name: "validatesPortableRunner${name}",
-    jobServerDriver: "org.apache.beam.runners.spark.SparkJobServerDriver",
-    jobServerConfig: "--job-host=localhost,--job-port=0,--artifact-port=0,--expansion-port=0",
-    testClasspathConfiguration: configurations.validatesPortableRunner,
-    numParallelTests: 4,
-    environment: BeamModulePlugin.PortableValidatesRunnerConfiguration.Environment.EMBEDDED,
-    systemProperties: [
-      "beam.spark.test.reuseSparkContext": "false",
-      "spark.ui.enabled": "false",
-      "spark.ui.showConsoleProgress": "false",
-    ],
-    testCategories: {
-      includeCategories 'org.apache.beam.sdk.testing.ValidatesRunner'
-      excludeCategories 'org.apache.beam.sdk.testing.FlattenWithHeterogeneousCoders'
-      excludeCategories 'org.apache.beam.sdk.testing.LargeKeys$Above100MB'
-      excludeCategories 'org.apache.beam.sdk.testing.UsesCommittedMetrics'
-      excludeCategories 'org.apache.beam.sdk.testing.UsesCustomWindowMerging'
-      excludeCategories 'org.apache.beam.sdk.testing.UsesFailureMessage'
-      excludeCategories 'org.apache.beam.sdk.testing.UsesGaugeMetrics'
-      excludeCategories 'org.apache.beam.sdk.testing.UsesParDoLifecycle'
-      excludeCategories 'org.apache.beam.sdk.testing.UsesMapState'
-      excludeCategories 'org.apache.beam.sdk.testing.UsesSetState'
-      excludeCategories 'org.apache.beam.sdk.testing.UsesOrderedListState'
-      excludeCategories 'org.apache.beam.sdk.testing.UsesTimerMap'
-      excludeCategories 'org.apache.beam.sdk.testing.UsesKeyInParDo'
-      excludeCategories 'org.apache.beam.sdk.testing.UsesOnWindowExpiration'
-      excludeCategories 'org.apache.beam.sdk.testing.UsesTestStream'
-      //SplitableDoFnTests
-      excludeCategories 'org.apache.beam.sdk.testing.UsesBoundedSplittableParDo'
-      excludeCategories 'org.apache.beam.sdk.testing.UsesSplittableParDoWithWindowedSideInputs'
-      excludeCategories 'org.apache.beam.sdk.testing.UsesUnboundedSplittableParDo'
-      excludeCategories 'org.apache.beam.sdk.testing.UsesStrictTimerOrdering'
-      excludeCategories 'org.apache.beam.sdk.testing.UsesBundleFinalizer'
-    },
-    testFilter: {
-      // TODO(BEAM-10094)
-      excludeTestsMatching 'org.apache.beam.sdk.transforms.FlattenTest.testFlattenWithDifferentInputAndOutputCoders2'
-    },
-  )
+def portableValidatesRunnerTask(String name, Boolean streaming) {
+  def pipelineOptions = []
+  if (streaming) {
+    pipelineOptions += "--streaming"
+    pipelineOptions += "--timeout=20000"
+    // exclude unsupported tests
+    createPortableValidatesRunnerTask(

Review comment:
       Since the batch and streaming configurations only differ in three fields (pipelineOptions, testCategories, and testFilter),  they should share the rest to avoid code duplication.
   
   FYI `createPortableValidatesRunnerTask` is implicitly creating a `PortableValidatesRunnerConfiguration` object, to share code you will probably have to create one explicitly.
   https://github.com/apache/beam/blob/71c7760f4b5c5bf0d91e2c8403fae99216308a3e/buildSrc/src/main/groovy/org/apache/beam/gradle/BeamModulePlugin.groovy#L245

##########
File path: runners/spark/src/main/java/org/apache/beam/runners/spark/SparkPortableStreamingPipelineOptions.java
##########
@@ -0,0 +1,38 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.runners.spark;
+
+import org.apache.beam.sdk.annotations.Experimental;
+import org.apache.beam.sdk.options.Default;
+import org.apache.beam.sdk.options.Description;
+import org.apache.beam.sdk.options.PipelineOptions;
+import org.apache.beam.sdk.options.PortablePipelineOptions;
+
+/**
+ * Spark runner {@link PipelineOptions} handles Spark execution-related configurations, such as the
+ * master address, batch-interval, and other user-related knobs.
+ */
+@Experimental
+public interface SparkPortableStreamingPipelineOptions
+    extends SparkPipelineOptions, PortablePipelineOptions, PipelineOptions {
+  @Description("Timeout for testing Spark portable streaming, in milliseconds.")
+  @Default.Long(-1L)

Review comment:
       Is `-1L` already treated as a special case by the Spark streaming context, or do we need to handle it specially in Beam?

##########
File path: runners/spark/src/main/java/org/apache/beam/runners/spark/SparkPortableStreamingPipelineOptions.java
##########
@@ -0,0 +1,38 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.runners.spark;
+
+import org.apache.beam.sdk.annotations.Experimental;
+import org.apache.beam.sdk.options.Default;
+import org.apache.beam.sdk.options.Description;
+import org.apache.beam.sdk.options.PipelineOptions;
+import org.apache.beam.sdk.options.PortablePipelineOptions;
+
+/**
+ * Spark runner {@link PipelineOptions} handles Spark execution-related configurations, such as the
+ * master address, batch-interval, and other user-related knobs.
+ */
+@Experimental
+public interface SparkPortableStreamingPipelineOptions
+    extends SparkPipelineOptions, PortablePipelineOptions, PipelineOptions {

Review comment:
       We figured out we don't need to extend `PipelineOptions` directly. Remove it.

##########
File path: runners/spark/src/main/java/org/apache/beam/runners/spark/translation/SparkStreamingPortablePipelineTranslator.java
##########
@@ -0,0 +1,393 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.runners.spark.translation;
+
+import static org.apache.beam.runners.fnexecution.translation.PipelineTranslatorUtils.createOutputMap;
+import static org.apache.beam.runners.fnexecution.translation.PipelineTranslatorUtils.getWindowingStrategy;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Queue;
+import java.util.Set;
+import java.util.concurrent.LinkedBlockingQueue;
+import java.util.stream.Collectors;
+import java.util.stream.StreamSupport;
+import org.apache.beam.model.pipeline.v1.RunnerApi;
+import org.apache.beam.runners.core.construction.PTransformTranslation;
+import org.apache.beam.runners.core.construction.graph.ExecutableStage;
+import org.apache.beam.runners.core.construction.graph.PipelineNode;
+import org.apache.beam.runners.core.construction.graph.PipelineNode.PTransformNode;
+import org.apache.beam.runners.core.construction.graph.QueryablePipeline;
+import org.apache.beam.runners.fnexecution.provisioning.JobInfo;
+import org.apache.beam.runners.fnexecution.wire.WireCoders;
+import org.apache.beam.runners.spark.SparkPipelineOptions;
+import org.apache.beam.runners.spark.coders.CoderHelpers;
+import org.apache.beam.runners.spark.metrics.MetricsAccumulator;
+import org.apache.beam.runners.spark.stateful.SparkGroupAlsoByWindowViaWindowSet;
+import org.apache.beam.runners.spark.translation.streaming.UnboundedDataset;
+import org.apache.beam.runners.spark.util.GlobalWatermarkHolder;
+import org.apache.beam.runners.spark.util.SparkCompat;
+import org.apache.beam.sdk.coders.ByteArrayCoder;
+import org.apache.beam.sdk.coders.Coder;
+import org.apache.beam.sdk.coders.KvCoder;
+import org.apache.beam.sdk.transforms.join.RawUnionValue;
+import org.apache.beam.sdk.transforms.windowing.BoundedWindow;
+import org.apache.beam.sdk.transforms.windowing.GlobalWindow;
+import org.apache.beam.sdk.transforms.windowing.PaneInfo;
+import org.apache.beam.sdk.transforms.windowing.WindowFn;
+import org.apache.beam.sdk.util.WindowedValue;
+import org.apache.beam.sdk.values.KV;
+import org.apache.beam.sdk.values.TimestampedValue;
+import org.apache.beam.sdk.values.WindowingStrategy;
+import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.collect.BiMap;
+import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.collect.ImmutableMap;
+import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.collect.Iterables;
+import org.apache.spark.api.java.JavaRDD;
+import org.apache.spark.api.java.JavaSparkContext;
+import org.apache.spark.broadcast.Broadcast;
+import org.apache.spark.storage.StorageLevel;
+import org.apache.spark.streaming.api.java.JavaDStream;
+import org.apache.spark.streaming.api.java.JavaInputDStream;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import scala.Tuple2;
+
+/** Translates a bounded portable pipeline into a Spark job. */
+public class SparkStreamingPortablePipelineTranslator
+    implements SparkPortablePipelineTranslator<SparkStreamingTranslationContext> {
+
+  private static final Logger LOG =
+      LoggerFactory.getLogger(SparkStreamingPortablePipelineTranslator.class);
+
+  private final ImmutableMap<String, PTransformTranslator> urnToTransformTranslator;
+
+  interface PTransformTranslator {
+
+    /** Translates transformNode from Beam into the Spark context. */
+    void translate(
+        PTransformNode transformNode,
+        RunnerApi.Pipeline pipeline,
+        SparkStreamingTranslationContext context);
+  }
+
+  @Override
+  public Set<String> knownUrns() {
+    return urnToTransformTranslator.keySet();
+  }
+
+  public SparkStreamingPortablePipelineTranslator() {
+    ImmutableMap.Builder<String, PTransformTranslator> translatorMap = ImmutableMap.builder();
+    translatorMap.put(
+        PTransformTranslation.IMPULSE_TRANSFORM_URN,
+        SparkStreamingPortablePipelineTranslator::translateImpulse);
+    translatorMap.put(
+        PTransformTranslation.GROUP_BY_KEY_TRANSFORM_URN,
+        SparkStreamingPortablePipelineTranslator::translateGroupByKey);
+    translatorMap.put(
+        ExecutableStage.URN, SparkStreamingPortablePipelineTranslator::translateExecutableStage);
+    translatorMap.put(
+        PTransformTranslation.FLATTEN_TRANSFORM_URN,
+        SparkStreamingPortablePipelineTranslator::translateFlatten);
+    translatorMap.put(
+        PTransformTranslation.RESHUFFLE_URN,
+        SparkStreamingPortablePipelineTranslator::translateReshuffle);
+    this.urnToTransformTranslator = translatorMap.build();
+  }
+
+  /** Translates pipeline from Beam into the Spark context. */
+  @Override
+  public void translate(
+      final RunnerApi.Pipeline pipeline, SparkStreamingTranslationContext context) {
+    QueryablePipeline p =
+        QueryablePipeline.forTransforms(
+            pipeline.getRootTransformIdsList(), pipeline.getComponents());
+    for (PipelineNode.PTransformNode transformNode : p.getTopologicallyOrderedTransforms()) {
+      urnToTransformTranslator
+          .getOrDefault(
+              transformNode.getTransform().getSpec().getUrn(),
+              SparkStreamingPortablePipelineTranslator::urnNotFound)
+          .translate(transformNode, pipeline, context);
+    }
+  }
+
+  private static void urnNotFound(
+      PTransformNode transformNode,
+      RunnerApi.Pipeline pipeline,
+      SparkStreamingTranslationContext context) {
+    throw new IllegalArgumentException(
+        String.format(
+            "Transform %s has unknown URN %s",
+            transformNode.getId(), transformNode.getTransform().getSpec().getUrn()));
+  }
+
+  private static void translateImpulse(
+      PTransformNode transformNode,
+      RunnerApi.Pipeline pipeline,
+      SparkStreamingTranslationContext context) {
+
+    TimestampedValue<byte[]> tsValue = TimestampedValue.atMinimumTimestamp(new byte[0]);
+    Iterable<TimestampedValue<byte[]>> timestampedValues = Collections.singletonList(tsValue);
+    Iterable<WindowedValue<byte[]>> windowedValues =
+        StreamSupport.stream(timestampedValues.spliterator(), false)
+            .map(
+                timestampedValue ->
+                    WindowedValue.of(
+                        timestampedValue.getValue(),
+                        timestampedValue.getTimestamp(),
+                        GlobalWindow.INSTANCE,
+                        PaneInfo.NO_FIRING))
+            .collect(Collectors.toList());
+
+    ByteArrayCoder coder = ByteArrayCoder.of();
+
+    WindowedValue.FullWindowedValueCoder<byte[]> windowCoder =
+        WindowedValue.FullWindowedValueCoder.of(coder, GlobalWindow.Coder.INSTANCE);
+    JavaRDD<WindowedValue<byte[]>> emptyRDD =
+        context
+            .getSparkContext()
+            .parallelize(CoderHelpers.toByteArrays(windowedValues, windowCoder))
+            .map(CoderHelpers.fromByteFunction(windowCoder));
+
+    // create input DStream from RDD queue
+    Queue<JavaRDD<WindowedValue<byte[]>>> queueRDD = new LinkedBlockingQueue<>();
+    queueRDD.offer(emptyRDD);
+    JavaInputDStream<WindowedValue<byte[]>> emptyStream =
+        context.getStreamingContext().queueStream(queueRDD, true);
+
+    UnboundedDataset<byte[]> output =
+        new UnboundedDataset<>(
+            emptyStream, Collections.singletonList(emptyStream.inputDStream().id()));
+
+    GlobalWatermarkHolder.SparkWatermarks sparkWatermark =
+        new GlobalWatermarkHolder.SparkWatermarks(
+            GlobalWindow.INSTANCE.maxTimestamp(),
+            BoundedWindow.TIMESTAMP_MAX_VALUE,
+            context.getFirstTimestamp());
+    GlobalWatermarkHolder.add(output.getStreamSources().get(0), sparkWatermark);
+
+    context.pushDataset(getOutputId(transformNode), output);
+  }
+
+  private static <K, V> void translateGroupByKey(
+      PTransformNode transformNode,
+      RunnerApi.Pipeline pipeline,
+      SparkStreamingTranslationContext context) {
+    RunnerApi.Components components = pipeline.getComponents();
+    String inputId = getInputId(transformNode);
+    UnboundedDataset<KV<K, V>> inputDataset =
+        (UnboundedDataset<KV<K, V>>) context.popDataset(inputId);
+    List<Integer> streamSources = inputDataset.getStreamSources();
+    JavaDStream<WindowedValue<KV<K, V>>> dStream = inputDataset.getDStream();
+    WindowedValue.WindowedValueCoder<KV<K, V>> inputCoder =
+        getWindowedValueCoder(inputId, components);
+    KvCoder<K, V> inputKvCoder = (KvCoder<K, V>) inputCoder.getValueCoder();
+    Coder<K> inputKeyCoder = inputKvCoder.getKeyCoder();
+    Coder<V> inputValueCoder = inputKvCoder.getValueCoder();
+    final WindowingStrategy windowingStrategy = getWindowingStrategy(inputId, components);
+    final WindowFn<Object, BoundedWindow> windowFn = windowingStrategy.getWindowFn();
+    final WindowedValue.WindowedValueCoder<V> wvCoder =
+        WindowedValue.FullWindowedValueCoder.of(inputValueCoder, windowFn.windowCoder());
+
+    JavaDStream<WindowedValue<KV<K, Iterable<V>>>> outStream =
+        SparkGroupAlsoByWindowViaWindowSet.groupByKeyAndWindow(
+            dStream,
+            inputKeyCoder,
+            wvCoder,
+            windowingStrategy,
+            context.getSerializableOptions(),
+            streamSources,
+            transformNode.getId());
+
+    context.pushDataset(
+        getOutputId(transformNode), new UnboundedDataset<>(outStream, streamSources));
+  }
+
+  private static <InputT, OutputT, SideInputT> void translateExecutableStage(
+      PTransformNode transformNode,
+      RunnerApi.Pipeline pipeline,
+      SparkStreamingTranslationContext context) {
+    RunnerApi.ExecutableStagePayload stagePayload;
+    try {
+      stagePayload =
+          RunnerApi.ExecutableStagePayload.parseFrom(
+              transformNode.getTransform().getSpec().getPayload());
+    } catch (IOException e) {
+      throw new RuntimeException(e);
+    }
+
+    String inputPCollectionId = stagePayload.getInput();
+    UnboundedDataset<InputT> inputDataset =
+        (UnboundedDataset<InputT>) context.popDataset(inputPCollectionId);
+    List<Integer> streamSources = inputDataset.getStreamSources();
+    JavaDStream<WindowedValue<InputT>> inputDStream = inputDataset.getDStream();
+    Map<String, String> outputs = transformNode.getTransform().getOutputsMap();
+    BiMap<String, Integer> outputMap = createOutputMap(outputs.values());
+
+    RunnerApi.Components components = pipeline.getComponents();
+    Coder windowCoder =
+        getWindowingStrategy(inputPCollectionId, components).getWindowFn().windowCoder();
+
+    // TODO: handle side inputs?

Review comment:
       Create a JIRA and point to it here:
   ```suggestion
       // TODO(BEAM-XXXXX): handle side inputs.
   ```

##########
File path: runners/spark/src/main/java/org/apache/beam/runners/spark/translation/SparkStreamingTranslationContext.java
##########
@@ -0,0 +1,50 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.runners.spark.translation;
+
+import org.apache.beam.runners.fnexecution.provisioning.JobInfo;
+import org.apache.beam.runners.spark.SparkPipelineOptions;
+import org.apache.spark.api.java.JavaSparkContext;
+import org.apache.spark.streaming.Duration;
+import org.apache.spark.streaming.api.java.JavaStreamingContext;
+import org.joda.time.Instant;
+
+/**
+ * Translation context used to lazily store Spark data sets during portable pipeline translation and
+ * compute them after translation.
+ */
+public class SparkStreamingTranslationContext extends SparkTranslationContext {
+  private final JavaStreamingContext streamingContext;
+  private final Instant firstTimestamp;
+
+  public SparkStreamingTranslationContext(
+      JavaSparkContext jsc, SparkPipelineOptions options, JobInfo jobInfo) {
+    super(jsc, options, jobInfo);
+    Duration batchDuration = new Duration(options.getBatchIntervalMillis());
+    this.streamingContext = new JavaStreamingContext(jsc, batchDuration);
+    this.firstTimestamp = new Instant();
+  }
+
+  public JavaStreamingContext getStreamingContext() {

Review comment:
       Make these methods package-private.

##########
File path: runners/spark/src/main/java/org/apache/beam/runners/spark/translation/SparkStreamingPortablePipelineTranslator.java
##########
@@ -0,0 +1,393 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.runners.spark.translation;
+
+import static org.apache.beam.runners.fnexecution.translation.PipelineTranslatorUtils.createOutputMap;
+import static org.apache.beam.runners.fnexecution.translation.PipelineTranslatorUtils.getWindowingStrategy;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Queue;
+import java.util.Set;
+import java.util.concurrent.LinkedBlockingQueue;
+import java.util.stream.Collectors;
+import java.util.stream.StreamSupport;
+import org.apache.beam.model.pipeline.v1.RunnerApi;
+import org.apache.beam.runners.core.construction.PTransformTranslation;
+import org.apache.beam.runners.core.construction.graph.ExecutableStage;
+import org.apache.beam.runners.core.construction.graph.PipelineNode;
+import org.apache.beam.runners.core.construction.graph.PipelineNode.PTransformNode;
+import org.apache.beam.runners.core.construction.graph.QueryablePipeline;
+import org.apache.beam.runners.fnexecution.provisioning.JobInfo;
+import org.apache.beam.runners.fnexecution.wire.WireCoders;
+import org.apache.beam.runners.spark.SparkPipelineOptions;
+import org.apache.beam.runners.spark.coders.CoderHelpers;
+import org.apache.beam.runners.spark.metrics.MetricsAccumulator;
+import org.apache.beam.runners.spark.stateful.SparkGroupAlsoByWindowViaWindowSet;
+import org.apache.beam.runners.spark.translation.streaming.UnboundedDataset;
+import org.apache.beam.runners.spark.util.GlobalWatermarkHolder;
+import org.apache.beam.runners.spark.util.SparkCompat;
+import org.apache.beam.sdk.coders.ByteArrayCoder;
+import org.apache.beam.sdk.coders.Coder;
+import org.apache.beam.sdk.coders.KvCoder;
+import org.apache.beam.sdk.transforms.join.RawUnionValue;
+import org.apache.beam.sdk.transforms.windowing.BoundedWindow;
+import org.apache.beam.sdk.transforms.windowing.GlobalWindow;
+import org.apache.beam.sdk.transforms.windowing.PaneInfo;
+import org.apache.beam.sdk.transforms.windowing.WindowFn;
+import org.apache.beam.sdk.util.WindowedValue;
+import org.apache.beam.sdk.values.KV;
+import org.apache.beam.sdk.values.TimestampedValue;
+import org.apache.beam.sdk.values.WindowingStrategy;
+import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.collect.BiMap;
+import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.collect.ImmutableMap;
+import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.collect.Iterables;
+import org.apache.spark.api.java.JavaRDD;
+import org.apache.spark.api.java.JavaSparkContext;
+import org.apache.spark.broadcast.Broadcast;
+import org.apache.spark.storage.StorageLevel;
+import org.apache.spark.streaming.api.java.JavaDStream;
+import org.apache.spark.streaming.api.java.JavaInputDStream;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import scala.Tuple2;
+
+/** Translates a bounded portable pipeline into a Spark job. */
+public class SparkStreamingPortablePipelineTranslator
+    implements SparkPortablePipelineTranslator<SparkStreamingTranslationContext> {
+
+  private static final Logger LOG =
+      LoggerFactory.getLogger(SparkStreamingPortablePipelineTranslator.class);
+
+  private final ImmutableMap<String, PTransformTranslator> urnToTransformTranslator;
+
+  interface PTransformTranslator {
+
+    /** Translates transformNode from Beam into the Spark context. */
+    void translate(
+        PTransformNode transformNode,
+        RunnerApi.Pipeline pipeline,
+        SparkStreamingTranslationContext context);
+  }
+
+  @Override
+  public Set<String> knownUrns() {
+    return urnToTransformTranslator.keySet();
+  }
+
+  public SparkStreamingPortablePipelineTranslator() {
+    ImmutableMap.Builder<String, PTransformTranslator> translatorMap = ImmutableMap.builder();
+    translatorMap.put(
+        PTransformTranslation.IMPULSE_TRANSFORM_URN,
+        SparkStreamingPortablePipelineTranslator::translateImpulse);
+    translatorMap.put(
+        PTransformTranslation.GROUP_BY_KEY_TRANSFORM_URN,
+        SparkStreamingPortablePipelineTranslator::translateGroupByKey);
+    translatorMap.put(
+        ExecutableStage.URN, SparkStreamingPortablePipelineTranslator::translateExecutableStage);
+    translatorMap.put(
+        PTransformTranslation.FLATTEN_TRANSFORM_URN,
+        SparkStreamingPortablePipelineTranslator::translateFlatten);
+    translatorMap.put(
+        PTransformTranslation.RESHUFFLE_URN,
+        SparkStreamingPortablePipelineTranslator::translateReshuffle);
+    this.urnToTransformTranslator = translatorMap.build();
+  }
+
+  /** Translates pipeline from Beam into the Spark context. */
+  @Override
+  public void translate(
+      final RunnerApi.Pipeline pipeline, SparkStreamingTranslationContext context) {
+    QueryablePipeline p =
+        QueryablePipeline.forTransforms(
+            pipeline.getRootTransformIdsList(), pipeline.getComponents());
+    for (PipelineNode.PTransformNode transformNode : p.getTopologicallyOrderedTransforms()) {
+      urnToTransformTranslator
+          .getOrDefault(
+              transformNode.getTransform().getSpec().getUrn(),
+              SparkStreamingPortablePipelineTranslator::urnNotFound)
+          .translate(transformNode, pipeline, context);
+    }
+  }
+
+  private static void urnNotFound(
+      PTransformNode transformNode,
+      RunnerApi.Pipeline pipeline,
+      SparkStreamingTranslationContext context) {
+    throw new IllegalArgumentException(
+        String.format(
+            "Transform %s has unknown URN %s",
+            transformNode.getId(), transformNode.getTransform().getSpec().getUrn()));
+  }
+
+  private static void translateImpulse(
+      PTransformNode transformNode,
+      RunnerApi.Pipeline pipeline,
+      SparkStreamingTranslationContext context) {
+
+    TimestampedValue<byte[]> tsValue = TimestampedValue.atMinimumTimestamp(new byte[0]);
+    Iterable<TimestampedValue<byte[]>> timestampedValues = Collections.singletonList(tsValue);
+    Iterable<WindowedValue<byte[]>> windowedValues =
+        StreamSupport.stream(timestampedValues.spliterator(), false)
+            .map(
+                timestampedValue ->
+                    WindowedValue.of(
+                        timestampedValue.getValue(),
+                        timestampedValue.getTimestamp(),
+                        GlobalWindow.INSTANCE,
+                        PaneInfo.NO_FIRING))
+            .collect(Collectors.toList());
+
+    ByteArrayCoder coder = ByteArrayCoder.of();
+
+    WindowedValue.FullWindowedValueCoder<byte[]> windowCoder =
+        WindowedValue.FullWindowedValueCoder.of(coder, GlobalWindow.Coder.INSTANCE);
+    JavaRDD<WindowedValue<byte[]>> emptyRDD =
+        context
+            .getSparkContext()
+            .parallelize(CoderHelpers.toByteArrays(windowedValues, windowCoder))
+            .map(CoderHelpers.fromByteFunction(windowCoder));
+
+    // create input DStream from RDD queue
+    Queue<JavaRDD<WindowedValue<byte[]>>> queueRDD = new LinkedBlockingQueue<>();
+    queueRDD.offer(emptyRDD);
+    JavaInputDStream<WindowedValue<byte[]>> emptyStream =
+        context.getStreamingContext().queueStream(queueRDD, true);

Review comment:
       Nit: it isn't immediately clear what is `true` here (without the help of an IDE). Use an inline comment:
   ```suggestion
           context.getStreamingContext().queueStream(queueRDD, true /* oneAtATime */);
   ```

##########
File path: runners/spark/src/main/java/org/apache/beam/runners/spark/translation/SparkStreamingPortablePipelineTranslator.java
##########
@@ -0,0 +1,393 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.runners.spark.translation;
+
+import static org.apache.beam.runners.fnexecution.translation.PipelineTranslatorUtils.createOutputMap;
+import static org.apache.beam.runners.fnexecution.translation.PipelineTranslatorUtils.getWindowingStrategy;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Queue;
+import java.util.Set;
+import java.util.concurrent.LinkedBlockingQueue;
+import java.util.stream.Collectors;
+import java.util.stream.StreamSupport;
+import org.apache.beam.model.pipeline.v1.RunnerApi;
+import org.apache.beam.runners.core.construction.PTransformTranslation;
+import org.apache.beam.runners.core.construction.graph.ExecutableStage;
+import org.apache.beam.runners.core.construction.graph.PipelineNode;
+import org.apache.beam.runners.core.construction.graph.PipelineNode.PTransformNode;
+import org.apache.beam.runners.core.construction.graph.QueryablePipeline;
+import org.apache.beam.runners.fnexecution.provisioning.JobInfo;
+import org.apache.beam.runners.fnexecution.wire.WireCoders;
+import org.apache.beam.runners.spark.SparkPipelineOptions;
+import org.apache.beam.runners.spark.coders.CoderHelpers;
+import org.apache.beam.runners.spark.metrics.MetricsAccumulator;
+import org.apache.beam.runners.spark.stateful.SparkGroupAlsoByWindowViaWindowSet;
+import org.apache.beam.runners.spark.translation.streaming.UnboundedDataset;
+import org.apache.beam.runners.spark.util.GlobalWatermarkHolder;
+import org.apache.beam.runners.spark.util.SparkCompat;
+import org.apache.beam.sdk.coders.ByteArrayCoder;
+import org.apache.beam.sdk.coders.Coder;
+import org.apache.beam.sdk.coders.KvCoder;
+import org.apache.beam.sdk.transforms.join.RawUnionValue;
+import org.apache.beam.sdk.transforms.windowing.BoundedWindow;
+import org.apache.beam.sdk.transforms.windowing.GlobalWindow;
+import org.apache.beam.sdk.transforms.windowing.PaneInfo;
+import org.apache.beam.sdk.transforms.windowing.WindowFn;
+import org.apache.beam.sdk.util.WindowedValue;
+import org.apache.beam.sdk.values.KV;
+import org.apache.beam.sdk.values.TimestampedValue;
+import org.apache.beam.sdk.values.WindowingStrategy;
+import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.collect.BiMap;
+import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.collect.ImmutableMap;
+import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.collect.Iterables;
+import org.apache.spark.api.java.JavaRDD;
+import org.apache.spark.api.java.JavaSparkContext;
+import org.apache.spark.broadcast.Broadcast;
+import org.apache.spark.storage.StorageLevel;
+import org.apache.spark.streaming.api.java.JavaDStream;
+import org.apache.spark.streaming.api.java.JavaInputDStream;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import scala.Tuple2;
+
+/** Translates a bounded portable pipeline into a Spark job. */
+public class SparkStreamingPortablePipelineTranslator
+    implements SparkPortablePipelineTranslator<SparkStreamingTranslationContext> {
+
+  private static final Logger LOG =
+      LoggerFactory.getLogger(SparkStreamingPortablePipelineTranslator.class);
+
+  private final ImmutableMap<String, PTransformTranslator> urnToTransformTranslator;
+
+  interface PTransformTranslator {
+
+    /** Translates transformNode from Beam into the Spark context. */
+    void translate(
+        PTransformNode transformNode,
+        RunnerApi.Pipeline pipeline,
+        SparkStreamingTranslationContext context);
+  }
+
+  @Override
+  public Set<String> knownUrns() {
+    return urnToTransformTranslator.keySet();
+  }
+
+  public SparkStreamingPortablePipelineTranslator() {
+    ImmutableMap.Builder<String, PTransformTranslator> translatorMap = ImmutableMap.builder();
+    translatorMap.put(
+        PTransformTranslation.IMPULSE_TRANSFORM_URN,
+        SparkStreamingPortablePipelineTranslator::translateImpulse);
+    translatorMap.put(
+        PTransformTranslation.GROUP_BY_KEY_TRANSFORM_URN,
+        SparkStreamingPortablePipelineTranslator::translateGroupByKey);
+    translatorMap.put(
+        ExecutableStage.URN, SparkStreamingPortablePipelineTranslator::translateExecutableStage);
+    translatorMap.put(
+        PTransformTranslation.FLATTEN_TRANSFORM_URN,
+        SparkStreamingPortablePipelineTranslator::translateFlatten);
+    translatorMap.put(
+        PTransformTranslation.RESHUFFLE_URN,
+        SparkStreamingPortablePipelineTranslator::translateReshuffle);
+    this.urnToTransformTranslator = translatorMap.build();
+  }
+
+  /** Translates pipeline from Beam into the Spark context. */
+  @Override
+  public void translate(
+      final RunnerApi.Pipeline pipeline, SparkStreamingTranslationContext context) {
+    QueryablePipeline p =
+        QueryablePipeline.forTransforms(
+            pipeline.getRootTransformIdsList(), pipeline.getComponents());
+    for (PipelineNode.PTransformNode transformNode : p.getTopologicallyOrderedTransforms()) {
+      urnToTransformTranslator
+          .getOrDefault(
+              transformNode.getTransform().getSpec().getUrn(),
+              SparkStreamingPortablePipelineTranslator::urnNotFound)
+          .translate(transformNode, pipeline, context);
+    }
+  }
+
+  private static void urnNotFound(
+      PTransformNode transformNode,
+      RunnerApi.Pipeline pipeline,
+      SparkStreamingTranslationContext context) {
+    throw new IllegalArgumentException(
+        String.format(
+            "Transform %s has unknown URN %s",
+            transformNode.getId(), transformNode.getTransform().getSpec().getUrn()));
+  }
+
+  private static void translateImpulse(
+      PTransformNode transformNode,
+      RunnerApi.Pipeline pipeline,
+      SparkStreamingTranslationContext context) {
+
+    TimestampedValue<byte[]> tsValue = TimestampedValue.atMinimumTimestamp(new byte[0]);
+    Iterable<TimestampedValue<byte[]>> timestampedValues = Collections.singletonList(tsValue);
+    Iterable<WindowedValue<byte[]>> windowedValues =
+        StreamSupport.stream(timestampedValues.spliterator(), false)
+            .map(
+                timestampedValue ->
+                    WindowedValue.of(
+                        timestampedValue.getValue(),
+                        timestampedValue.getTimestamp(),
+                        GlobalWindow.INSTANCE,
+                        PaneInfo.NO_FIRING))
+            .collect(Collectors.toList());
+
+    ByteArrayCoder coder = ByteArrayCoder.of();
+
+    WindowedValue.FullWindowedValueCoder<byte[]> windowCoder =
+        WindowedValue.FullWindowedValueCoder.of(coder, GlobalWindow.Coder.INSTANCE);
+    JavaRDD<WindowedValue<byte[]>> emptyRDD =
+        context
+            .getSparkContext()
+            .parallelize(CoderHelpers.toByteArrays(windowedValues, windowCoder))
+            .map(CoderHelpers.fromByteFunction(windowCoder));
+
+    // create input DStream from RDD queue
+    Queue<JavaRDD<WindowedValue<byte[]>>> queueRDD = new LinkedBlockingQueue<>();
+    queueRDD.offer(emptyRDD);
+    JavaInputDStream<WindowedValue<byte[]>> emptyStream =
+        context.getStreamingContext().queueStream(queueRDD, true);
+
+    UnboundedDataset<byte[]> output =
+        new UnboundedDataset<>(
+            emptyStream, Collections.singletonList(emptyStream.inputDStream().id()));
+
+    GlobalWatermarkHolder.SparkWatermarks sparkWatermark =

Review comment:
       This block needs a comment (though maybe this should wait until we figure out what is going on in failing tests).

##########
File path: runners/spark/src/main/java/org/apache/beam/runners/spark/translation/SparkStreamingPortablePipelineTranslator.java
##########
@@ -0,0 +1,393 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.runners.spark.translation;
+
+import static org.apache.beam.runners.fnexecution.translation.PipelineTranslatorUtils.createOutputMap;
+import static org.apache.beam.runners.fnexecution.translation.PipelineTranslatorUtils.getWindowingStrategy;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Queue;
+import java.util.Set;
+import java.util.concurrent.LinkedBlockingQueue;
+import java.util.stream.Collectors;
+import java.util.stream.StreamSupport;
+import org.apache.beam.model.pipeline.v1.RunnerApi;
+import org.apache.beam.runners.core.construction.PTransformTranslation;
+import org.apache.beam.runners.core.construction.graph.ExecutableStage;
+import org.apache.beam.runners.core.construction.graph.PipelineNode;
+import org.apache.beam.runners.core.construction.graph.PipelineNode.PTransformNode;
+import org.apache.beam.runners.core.construction.graph.QueryablePipeline;
+import org.apache.beam.runners.fnexecution.provisioning.JobInfo;
+import org.apache.beam.runners.fnexecution.wire.WireCoders;
+import org.apache.beam.runners.spark.SparkPipelineOptions;
+import org.apache.beam.runners.spark.coders.CoderHelpers;
+import org.apache.beam.runners.spark.metrics.MetricsAccumulator;
+import org.apache.beam.runners.spark.stateful.SparkGroupAlsoByWindowViaWindowSet;
+import org.apache.beam.runners.spark.translation.streaming.UnboundedDataset;
+import org.apache.beam.runners.spark.util.GlobalWatermarkHolder;
+import org.apache.beam.runners.spark.util.SparkCompat;
+import org.apache.beam.sdk.coders.ByteArrayCoder;
+import org.apache.beam.sdk.coders.Coder;
+import org.apache.beam.sdk.coders.KvCoder;
+import org.apache.beam.sdk.transforms.join.RawUnionValue;
+import org.apache.beam.sdk.transforms.windowing.BoundedWindow;
+import org.apache.beam.sdk.transforms.windowing.GlobalWindow;
+import org.apache.beam.sdk.transforms.windowing.PaneInfo;
+import org.apache.beam.sdk.transforms.windowing.WindowFn;
+import org.apache.beam.sdk.util.WindowedValue;
+import org.apache.beam.sdk.values.KV;
+import org.apache.beam.sdk.values.TimestampedValue;
+import org.apache.beam.sdk.values.WindowingStrategy;
+import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.collect.BiMap;
+import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.collect.ImmutableMap;
+import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.collect.Iterables;
+import org.apache.spark.api.java.JavaRDD;
+import org.apache.spark.api.java.JavaSparkContext;
+import org.apache.spark.broadcast.Broadcast;
+import org.apache.spark.storage.StorageLevel;
+import org.apache.spark.streaming.api.java.JavaDStream;
+import org.apache.spark.streaming.api.java.JavaInputDStream;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import scala.Tuple2;
+
+/** Translates a bounded portable pipeline into a Spark job. */
+public class SparkStreamingPortablePipelineTranslator
+    implements SparkPortablePipelineTranslator<SparkStreamingTranslationContext> {
+
+  private static final Logger LOG =
+      LoggerFactory.getLogger(SparkStreamingPortablePipelineTranslator.class);
+
+  private final ImmutableMap<String, PTransformTranslator> urnToTransformTranslator;
+
+  interface PTransformTranslator {
+
+    /** Translates transformNode from Beam into the Spark context. */
+    void translate(
+        PTransformNode transformNode,
+        RunnerApi.Pipeline pipeline,
+        SparkStreamingTranslationContext context);
+  }
+
+  @Override
+  public Set<String> knownUrns() {
+    return urnToTransformTranslator.keySet();
+  }
+
+  public SparkStreamingPortablePipelineTranslator() {
+    ImmutableMap.Builder<String, PTransformTranslator> translatorMap = ImmutableMap.builder();
+    translatorMap.put(
+        PTransformTranslation.IMPULSE_TRANSFORM_URN,
+        SparkStreamingPortablePipelineTranslator::translateImpulse);
+    translatorMap.put(
+        PTransformTranslation.GROUP_BY_KEY_TRANSFORM_URN,
+        SparkStreamingPortablePipelineTranslator::translateGroupByKey);
+    translatorMap.put(
+        ExecutableStage.URN, SparkStreamingPortablePipelineTranslator::translateExecutableStage);
+    translatorMap.put(
+        PTransformTranslation.FLATTEN_TRANSFORM_URN,
+        SparkStreamingPortablePipelineTranslator::translateFlatten);
+    translatorMap.put(
+        PTransformTranslation.RESHUFFLE_URN,
+        SparkStreamingPortablePipelineTranslator::translateReshuffle);
+    this.urnToTransformTranslator = translatorMap.build();
+  }
+
+  /** Translates pipeline from Beam into the Spark context. */
+  @Override
+  public void translate(
+      final RunnerApi.Pipeline pipeline, SparkStreamingTranslationContext context) {
+    QueryablePipeline p =
+        QueryablePipeline.forTransforms(
+            pipeline.getRootTransformIdsList(), pipeline.getComponents());
+    for (PipelineNode.PTransformNode transformNode : p.getTopologicallyOrderedTransforms()) {
+      urnToTransformTranslator
+          .getOrDefault(
+              transformNode.getTransform().getSpec().getUrn(),
+              SparkStreamingPortablePipelineTranslator::urnNotFound)
+          .translate(transformNode, pipeline, context);
+    }
+  }
+
+  private static void urnNotFound(
+      PTransformNode transformNode,
+      RunnerApi.Pipeline pipeline,
+      SparkStreamingTranslationContext context) {
+    throw new IllegalArgumentException(
+        String.format(
+            "Transform %s has unknown URN %s",
+            transformNode.getId(), transformNode.getTransform().getSpec().getUrn()));
+  }
+
+  private static void translateImpulse(
+      PTransformNode transformNode,
+      RunnerApi.Pipeline pipeline,
+      SparkStreamingTranslationContext context) {
+
+    TimestampedValue<byte[]> tsValue = TimestampedValue.atMinimumTimestamp(new byte[0]);
+    Iterable<TimestampedValue<byte[]>> timestampedValues = Collections.singletonList(tsValue);
+    Iterable<WindowedValue<byte[]>> windowedValues =
+        StreamSupport.stream(timestampedValues.spliterator(), false)
+            .map(
+                timestampedValue ->
+                    WindowedValue.of(
+                        timestampedValue.getValue(),
+                        timestampedValue.getTimestamp(),
+                        GlobalWindow.INSTANCE,
+                        PaneInfo.NO_FIRING))
+            .collect(Collectors.toList());
+
+    ByteArrayCoder coder = ByteArrayCoder.of();
+
+    WindowedValue.FullWindowedValueCoder<byte[]> windowCoder =
+        WindowedValue.FullWindowedValueCoder.of(coder, GlobalWindow.Coder.INSTANCE);
+    JavaRDD<WindowedValue<byte[]>> emptyRDD =
+        context
+            .getSparkContext()
+            .parallelize(CoderHelpers.toByteArrays(windowedValues, windowCoder))
+            .map(CoderHelpers.fromByteFunction(windowCoder));
+
+    // create input DStream from RDD queue
+    Queue<JavaRDD<WindowedValue<byte[]>>> queueRDD = new LinkedBlockingQueue<>();

Review comment:
       Nit: should be named `rddQueue`.

##########
File path: runners/spark/src/main/java/org/apache/beam/runners/spark/translation/SparkStreamingTranslationContext.java
##########
@@ -0,0 +1,50 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.runners.spark.translation;
+
+import org.apache.beam.runners.fnexecution.provisioning.JobInfo;
+import org.apache.beam.runners.spark.SparkPipelineOptions;
+import org.apache.spark.api.java.JavaSparkContext;
+import org.apache.spark.streaming.Duration;
+import org.apache.spark.streaming.api.java.JavaStreamingContext;
+import org.joda.time.Instant;
+
+/**
+ * Translation context used to lazily store Spark data sets during portable pipeline translation and

Review comment:
       Nit: this comment is technically accurate, but it should mention streaming.

##########
File path: runners/spark/src/main/java/org/apache/beam/runners/spark/translation/SparkStreamingPortablePipelineTranslator.java
##########
@@ -0,0 +1,393 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.runners.spark.translation;
+
+import static org.apache.beam.runners.fnexecution.translation.PipelineTranslatorUtils.createOutputMap;
+import static org.apache.beam.runners.fnexecution.translation.PipelineTranslatorUtils.getWindowingStrategy;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Queue;
+import java.util.Set;
+import java.util.concurrent.LinkedBlockingQueue;
+import java.util.stream.Collectors;
+import java.util.stream.StreamSupport;
+import org.apache.beam.model.pipeline.v1.RunnerApi;
+import org.apache.beam.runners.core.construction.PTransformTranslation;
+import org.apache.beam.runners.core.construction.graph.ExecutableStage;
+import org.apache.beam.runners.core.construction.graph.PipelineNode;
+import org.apache.beam.runners.core.construction.graph.PipelineNode.PTransformNode;
+import org.apache.beam.runners.core.construction.graph.QueryablePipeline;
+import org.apache.beam.runners.fnexecution.provisioning.JobInfo;
+import org.apache.beam.runners.fnexecution.wire.WireCoders;
+import org.apache.beam.runners.spark.SparkPipelineOptions;
+import org.apache.beam.runners.spark.coders.CoderHelpers;
+import org.apache.beam.runners.spark.metrics.MetricsAccumulator;
+import org.apache.beam.runners.spark.stateful.SparkGroupAlsoByWindowViaWindowSet;
+import org.apache.beam.runners.spark.translation.streaming.UnboundedDataset;
+import org.apache.beam.runners.spark.util.GlobalWatermarkHolder;
+import org.apache.beam.runners.spark.util.SparkCompat;
+import org.apache.beam.sdk.coders.ByteArrayCoder;
+import org.apache.beam.sdk.coders.Coder;
+import org.apache.beam.sdk.coders.KvCoder;
+import org.apache.beam.sdk.transforms.join.RawUnionValue;
+import org.apache.beam.sdk.transforms.windowing.BoundedWindow;
+import org.apache.beam.sdk.transforms.windowing.GlobalWindow;
+import org.apache.beam.sdk.transforms.windowing.PaneInfo;
+import org.apache.beam.sdk.transforms.windowing.WindowFn;
+import org.apache.beam.sdk.util.WindowedValue;
+import org.apache.beam.sdk.values.KV;
+import org.apache.beam.sdk.values.TimestampedValue;
+import org.apache.beam.sdk.values.WindowingStrategy;
+import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.collect.BiMap;
+import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.collect.ImmutableMap;
+import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.collect.Iterables;
+import org.apache.spark.api.java.JavaRDD;
+import org.apache.spark.api.java.JavaSparkContext;
+import org.apache.spark.broadcast.Broadcast;
+import org.apache.spark.storage.StorageLevel;
+import org.apache.spark.streaming.api.java.JavaDStream;
+import org.apache.spark.streaming.api.java.JavaInputDStream;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import scala.Tuple2;
+
+/** Translates a bounded portable pipeline into a Spark job. */
+public class SparkStreamingPortablePipelineTranslator
+    implements SparkPortablePipelineTranslator<SparkStreamingTranslationContext> {
+
+  private static final Logger LOG =
+      LoggerFactory.getLogger(SparkStreamingPortablePipelineTranslator.class);
+
+  private final ImmutableMap<String, PTransformTranslator> urnToTransformTranslator;
+
+  interface PTransformTranslator {
+
+    /** Translates transformNode from Beam into the Spark context. */
+    void translate(
+        PTransformNode transformNode,
+        RunnerApi.Pipeline pipeline,
+        SparkStreamingTranslationContext context);
+  }
+
+  @Override
+  public Set<String> knownUrns() {
+    return urnToTransformTranslator.keySet();
+  }
+
+  public SparkStreamingPortablePipelineTranslator() {
+    ImmutableMap.Builder<String, PTransformTranslator> translatorMap = ImmutableMap.builder();
+    translatorMap.put(
+        PTransformTranslation.IMPULSE_TRANSFORM_URN,
+        SparkStreamingPortablePipelineTranslator::translateImpulse);
+    translatorMap.put(
+        PTransformTranslation.GROUP_BY_KEY_TRANSFORM_URN,
+        SparkStreamingPortablePipelineTranslator::translateGroupByKey);
+    translatorMap.put(
+        ExecutableStage.URN, SparkStreamingPortablePipelineTranslator::translateExecutableStage);
+    translatorMap.put(
+        PTransformTranslation.FLATTEN_TRANSFORM_URN,
+        SparkStreamingPortablePipelineTranslator::translateFlatten);
+    translatorMap.put(
+        PTransformTranslation.RESHUFFLE_URN,
+        SparkStreamingPortablePipelineTranslator::translateReshuffle);
+    this.urnToTransformTranslator = translatorMap.build();
+  }
+
+  /** Translates pipeline from Beam into the Spark context. */
+  @Override
+  public void translate(
+      final RunnerApi.Pipeline pipeline, SparkStreamingTranslationContext context) {
+    QueryablePipeline p =
+        QueryablePipeline.forTransforms(
+            pipeline.getRootTransformIdsList(), pipeline.getComponents());
+    for (PipelineNode.PTransformNode transformNode : p.getTopologicallyOrderedTransforms()) {
+      urnToTransformTranslator
+          .getOrDefault(
+              transformNode.getTransform().getSpec().getUrn(),
+              SparkStreamingPortablePipelineTranslator::urnNotFound)
+          .translate(transformNode, pipeline, context);
+    }
+  }
+
+  private static void urnNotFound(
+      PTransformNode transformNode,
+      RunnerApi.Pipeline pipeline,
+      SparkStreamingTranslationContext context) {
+    throw new IllegalArgumentException(
+        String.format(
+            "Transform %s has unknown URN %s",
+            transformNode.getId(), transformNode.getTransform().getSpec().getUrn()));
+  }
+
+  private static void translateImpulse(
+      PTransformNode transformNode,
+      RunnerApi.Pipeline pipeline,
+      SparkStreamingTranslationContext context) {
+
+    TimestampedValue<byte[]> tsValue = TimestampedValue.atMinimumTimestamp(new byte[0]);
+    Iterable<TimestampedValue<byte[]>> timestampedValues = Collections.singletonList(tsValue);
+    Iterable<WindowedValue<byte[]>> windowedValues =
+        StreamSupport.stream(timestampedValues.spliterator(), false)
+            .map(
+                timestampedValue ->
+                    WindowedValue.of(
+                        timestampedValue.getValue(),
+                        timestampedValue.getTimestamp(),
+                        GlobalWindow.INSTANCE,
+                        PaneInfo.NO_FIRING))
+            .collect(Collectors.toList());
+
+    ByteArrayCoder coder = ByteArrayCoder.of();
+
+    WindowedValue.FullWindowedValueCoder<byte[]> windowCoder =
+        WindowedValue.FullWindowedValueCoder.of(coder, GlobalWindow.Coder.INSTANCE);
+    JavaRDD<WindowedValue<byte[]>> emptyRDD =
+        context
+            .getSparkContext()
+            .parallelize(CoderHelpers.toByteArrays(windowedValues, windowCoder))
+            .map(CoderHelpers.fromByteFunction(windowCoder));
+
+    // create input DStream from RDD queue
+    Queue<JavaRDD<WindowedValue<byte[]>>> queueRDD = new LinkedBlockingQueue<>();
+    queueRDD.offer(emptyRDD);
+    JavaInputDStream<WindowedValue<byte[]>> emptyStream =
+        context.getStreamingContext().queueStream(queueRDD, true);
+
+    UnboundedDataset<byte[]> output =
+        new UnboundedDataset<>(
+            emptyStream, Collections.singletonList(emptyStream.inputDStream().id()));
+
+    GlobalWatermarkHolder.SparkWatermarks sparkWatermark =
+        new GlobalWatermarkHolder.SparkWatermarks(
+            GlobalWindow.INSTANCE.maxTimestamp(),
+            BoundedWindow.TIMESTAMP_MAX_VALUE,
+            context.getFirstTimestamp());
+    GlobalWatermarkHolder.add(output.getStreamSources().get(0), sparkWatermark);
+
+    context.pushDataset(getOutputId(transformNode), output);
+  }
+
+  private static <K, V> void translateGroupByKey(
+      PTransformNode transformNode,
+      RunnerApi.Pipeline pipeline,
+      SparkStreamingTranslationContext context) {
+    RunnerApi.Components components = pipeline.getComponents();
+    String inputId = getInputId(transformNode);
+    UnboundedDataset<KV<K, V>> inputDataset =
+        (UnboundedDataset<KV<K, V>>) context.popDataset(inputId);
+    List<Integer> streamSources = inputDataset.getStreamSources();
+    JavaDStream<WindowedValue<KV<K, V>>> dStream = inputDataset.getDStream();
+    WindowedValue.WindowedValueCoder<KV<K, V>> inputCoder =
+        getWindowedValueCoder(inputId, components);
+    KvCoder<K, V> inputKvCoder = (KvCoder<K, V>) inputCoder.getValueCoder();
+    Coder<K> inputKeyCoder = inputKvCoder.getKeyCoder();
+    Coder<V> inputValueCoder = inputKvCoder.getValueCoder();
+    final WindowingStrategy windowingStrategy = getWindowingStrategy(inputId, components);
+    final WindowFn<Object, BoundedWindow> windowFn = windowingStrategy.getWindowFn();
+    final WindowedValue.WindowedValueCoder<V> wvCoder =
+        WindowedValue.FullWindowedValueCoder.of(inputValueCoder, windowFn.windowCoder());
+
+    JavaDStream<WindowedValue<KV<K, Iterable<V>>>> outStream =
+        SparkGroupAlsoByWindowViaWindowSet.groupByKeyAndWindow(
+            dStream,
+            inputKeyCoder,
+            wvCoder,
+            windowingStrategy,
+            context.getSerializableOptions(),
+            streamSources,
+            transformNode.getId());
+
+    context.pushDataset(
+        getOutputId(transformNode), new UnboundedDataset<>(outStream, streamSources));
+  }
+
+  private static <InputT, OutputT, SideInputT> void translateExecutableStage(
+      PTransformNode transformNode,
+      RunnerApi.Pipeline pipeline,
+      SparkStreamingTranslationContext context) {
+    RunnerApi.ExecutableStagePayload stagePayload;
+    try {
+      stagePayload =
+          RunnerApi.ExecutableStagePayload.parseFrom(
+              transformNode.getTransform().getSpec().getPayload());
+    } catch (IOException e) {
+      throw new RuntimeException(e);
+    }
+
+    String inputPCollectionId = stagePayload.getInput();
+    UnboundedDataset<InputT> inputDataset =
+        (UnboundedDataset<InputT>) context.popDataset(inputPCollectionId);
+    List<Integer> streamSources = inputDataset.getStreamSources();
+    JavaDStream<WindowedValue<InputT>> inputDStream = inputDataset.getDStream();
+    Map<String, String> outputs = transformNode.getTransform().getOutputsMap();
+    BiMap<String, Integer> outputMap = createOutputMap(outputs.values());
+
+    RunnerApi.Components components = pipeline.getComponents();
+    Coder windowCoder =
+        getWindowingStrategy(inputPCollectionId, components).getWindowFn().windowCoder();
+
+    // TODO: handle side inputs?
+    ImmutableMap<
+            String, Tuple2<Broadcast<List<byte[]>>, WindowedValue.WindowedValueCoder<SideInputT>>>
+        broadcastVariables = ImmutableMap.copyOf(new HashMap<>());
+
+    SparkExecutableStageFunction<InputT, SideInputT> function =
+        new SparkExecutableStageFunction<>(
+            stagePayload,
+            context.jobInfo,
+            outputMap,
+            SparkExecutableStageContextFactory.getInstance(),
+            broadcastVariables,
+            MetricsAccumulator.getInstance(),
+            windowCoder);
+    JavaDStream<RawUnionValue> staged = inputDStream.mapPartitions(function);
+
+    String intermediateId = getExecutableStageIntermediateId(transformNode);
+    context.pushDataset(
+        intermediateId,
+        new Dataset() {
+          @Override
+          public void cache(String storageLevel, Coder<?> coder) {
+            StorageLevel level = StorageLevel.fromString(storageLevel);
+            staged.persist(level);
+          }
+
+          @Override
+          public void action() {
+            // Empty function to force computation of RDD.
+            staged.foreachRDD(TranslationUtils.emptyVoidFunction());
+          }
+
+          @Override
+          public void setName(String name) {
+            // ignore
+          }
+        });
+    // pop dataset to mark RDD as used

Review comment:
       Nit:
   ```suggestion
       // pop dataset to mark DStream as used
   ```

##########
File path: runners/spark/src/test/java/org/apache/beam/runners/spark/SparkRunnerRegistrarTest.java
##########
@@ -37,7 +37,10 @@
   @Test
   public void testOptions() {
     assertEquals(
-        ImmutableList.of(SparkPipelineOptions.class, SparkStructuredStreamingPipelineOptions.class),
+        ImmutableList.of(

Review comment:
       Maybe we don't have to do it in this PR, but I'd be in favor of removing this test. It's kind of pointless to test a constant.

##########
File path: runners/spark/job-server/build.gradle
##########
@@ -82,53 +82,116 @@ runShadow {
     jvmArgs += ["-Dorg.slf4j.simpleLogger.defaultLogLevel=${project.property('logLevel')}"]
 }
 
-def portableValidatesRunnerTask(String name) {
-  createPortableValidatesRunnerTask(
-    name: "validatesPortableRunner${name}",
-    jobServerDriver: "org.apache.beam.runners.spark.SparkJobServerDriver",
-    jobServerConfig: "--job-host=localhost,--job-port=0,--artifact-port=0,--expansion-port=0",
-    testClasspathConfiguration: configurations.validatesPortableRunner,
-    numParallelTests: 4,
-    environment: BeamModulePlugin.PortableValidatesRunnerConfiguration.Environment.EMBEDDED,
-    systemProperties: [
-      "beam.spark.test.reuseSparkContext": "false",
-      "spark.ui.enabled": "false",
-      "spark.ui.showConsoleProgress": "false",
-    ],
-    testCategories: {
-      includeCategories 'org.apache.beam.sdk.testing.ValidatesRunner'
-      excludeCategories 'org.apache.beam.sdk.testing.FlattenWithHeterogeneousCoders'
-      excludeCategories 'org.apache.beam.sdk.testing.LargeKeys$Above100MB'
-      excludeCategories 'org.apache.beam.sdk.testing.UsesCommittedMetrics'
-      excludeCategories 'org.apache.beam.sdk.testing.UsesCustomWindowMerging'
-      excludeCategories 'org.apache.beam.sdk.testing.UsesFailureMessage'
-      excludeCategories 'org.apache.beam.sdk.testing.UsesGaugeMetrics'
-      excludeCategories 'org.apache.beam.sdk.testing.UsesParDoLifecycle'
-      excludeCategories 'org.apache.beam.sdk.testing.UsesMapState'
-      excludeCategories 'org.apache.beam.sdk.testing.UsesSetState'
-      excludeCategories 'org.apache.beam.sdk.testing.UsesOrderedListState'
-      excludeCategories 'org.apache.beam.sdk.testing.UsesTimerMap'
-      excludeCategories 'org.apache.beam.sdk.testing.UsesKeyInParDo'
-      excludeCategories 'org.apache.beam.sdk.testing.UsesOnWindowExpiration'
-      excludeCategories 'org.apache.beam.sdk.testing.UsesTestStream'
-      //SplitableDoFnTests
-      excludeCategories 'org.apache.beam.sdk.testing.UsesBoundedSplittableParDo'
-      excludeCategories 'org.apache.beam.sdk.testing.UsesSplittableParDoWithWindowedSideInputs'
-      excludeCategories 'org.apache.beam.sdk.testing.UsesUnboundedSplittableParDo'
-      excludeCategories 'org.apache.beam.sdk.testing.UsesStrictTimerOrdering'
-      excludeCategories 'org.apache.beam.sdk.testing.UsesBundleFinalizer'
-    },
-    testFilter: {
-      // TODO(BEAM-10094)
-      excludeTestsMatching 'org.apache.beam.sdk.transforms.FlattenTest.testFlattenWithDifferentInputAndOutputCoders2'
-    },
-  )
+def portableValidatesRunnerTask(String name, Boolean streaming) {
+  def pipelineOptions = []
+  if (streaming) {
+    pipelineOptions += "--streaming"
+    pipelineOptions += "--timeout=20000"
+    // exclude unsupported tests
+    createPortableValidatesRunnerTask(
+            name: "validatesPortableRunner${name}",
+            jobServerDriver: "org.apache.beam.runners.spark.SparkJobServerDriver",
+            jobServerConfig: "--job-host=localhost,--job-port=0,--artifact-port=0,--expansion-port=0",
+            testClasspathConfiguration: configurations.validatesPortableRunner,
+            numParallelTests: 4,
+            pipelineOpts: pipelineOptions,
+            environment: BeamModulePlugin.PortableValidatesRunnerConfiguration.Environment.EMBEDDED,
+            systemProperties: [
+                    "beam.spark.test.reuseSparkContext": "false",
+                    "spark.ui.enabled": "false",
+                    "spark.ui.showConsoleProgress": "false",
+            ],
+            testCategories: {
+              includeCategories 'org.apache.beam.sdk.testing.ValidatesRunner'
+              excludeCategories 'org.apache.beam.sdk.testing.FlattenWithHeterogeneousCoders'
+              excludeCategories 'org.apache.beam.sdk.testing.LargeKeys$Above100MB'
+              excludeCategories 'org.apache.beam.sdk.testing.UsesCommittedMetrics'
+              excludeCategories 'org.apache.beam.sdk.testing.UsesCustomWindowMerging'
+              excludeCategories 'org.apache.beam.sdk.testing.UsesFailureMessage'
+              excludeCategories 'org.apache.beam.sdk.testing.UsesGaugeMetrics'
+              excludeCategories 'org.apache.beam.sdk.testing.UsesParDoLifecycle'
+              excludeCategories 'org.apache.beam.sdk.testing.UsesMapState'
+              excludeCategories 'org.apache.beam.sdk.testing.UsesSetState'
+              excludeCategories 'org.apache.beam.sdk.testing.UsesOrderedListState'
+              excludeCategories 'org.apache.beam.sdk.testing.UsesTimerMap'
+              excludeCategories 'org.apache.beam.sdk.testing.UsesKeyInParDo'
+              excludeCategories 'org.apache.beam.sdk.testing.UsesOnWindowExpiration'
+              excludeCategories 'org.apache.beam.sdk.testing.UsesTestStream'
+              //SplitableDoFnTests
+              excludeCategories 'org.apache.beam.sdk.testing.UsesBoundedSplittableParDo'
+              excludeCategories 'org.apache.beam.sdk.testing.UsesSplittableParDoWithWindowedSideInputs'
+              excludeCategories 'org.apache.beam.sdk.testing.UsesUnboundedSplittableParDo'
+              excludeCategories 'org.apache.beam.sdk.testing.UsesStrictTimerOrdering'
+              excludeCategories 'org.apache.beam.sdk.testing.UsesBundleFinalizer'
+              // Tests to exclude when streaming
+              excludeCategories 'org.apache.beam.sdk.testing.UsesSideInputs'
+            },
+            testFilter: {
+              // TODO(BEAM-10094)
+              excludeTestsMatching 'org.apache.beam.sdk.transforms.FlattenTest.testFlattenWithDifferentInputAndOutputCoders2'
+              // Tests to exclude when streaming
+              excludeTestsMatching 'org.apache.beam.sdk.transforms.CombineTest'
+              excludeTestsMatching 'org.apache.beam.sdk.transforms.CombineFnsTest'
+              excludeTestsMatching 'org.apache.beam.sdk.transforms.ParDoTest$MultipleInputsAndOutputTests'
+              excludeTestsMatching 'org.apache.beam.sdk.transforms.ParDoTest$StateCoderInferenceTests'
+              excludeTestsMatching 'org.apache.beam.sdk.transforms.ParDoTest$StateTests'
+              excludeTestsMatching 'org.apache.beam.sdk.transforms.ParDoTest$TimerTests'

Review comment:
       If possible, exclude test categories instead of test classes.

##########
File path: runners/spark/src/main/java/org/apache/beam/runners/spark/translation/SparkStreamingPortablePipelineTranslator.java
##########
@@ -0,0 +1,393 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.runners.spark.translation;
+
+import static org.apache.beam.runners.fnexecution.translation.PipelineTranslatorUtils.createOutputMap;
+import static org.apache.beam.runners.fnexecution.translation.PipelineTranslatorUtils.getWindowingStrategy;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Queue;
+import java.util.Set;
+import java.util.concurrent.LinkedBlockingQueue;
+import java.util.stream.Collectors;
+import java.util.stream.StreamSupport;
+import org.apache.beam.model.pipeline.v1.RunnerApi;
+import org.apache.beam.runners.core.construction.PTransformTranslation;
+import org.apache.beam.runners.core.construction.graph.ExecutableStage;
+import org.apache.beam.runners.core.construction.graph.PipelineNode;
+import org.apache.beam.runners.core.construction.graph.PipelineNode.PTransformNode;
+import org.apache.beam.runners.core.construction.graph.QueryablePipeline;
+import org.apache.beam.runners.fnexecution.provisioning.JobInfo;
+import org.apache.beam.runners.fnexecution.wire.WireCoders;
+import org.apache.beam.runners.spark.SparkPipelineOptions;
+import org.apache.beam.runners.spark.coders.CoderHelpers;
+import org.apache.beam.runners.spark.metrics.MetricsAccumulator;
+import org.apache.beam.runners.spark.stateful.SparkGroupAlsoByWindowViaWindowSet;
+import org.apache.beam.runners.spark.translation.streaming.UnboundedDataset;
+import org.apache.beam.runners.spark.util.GlobalWatermarkHolder;
+import org.apache.beam.runners.spark.util.SparkCompat;
+import org.apache.beam.sdk.coders.ByteArrayCoder;
+import org.apache.beam.sdk.coders.Coder;
+import org.apache.beam.sdk.coders.KvCoder;
+import org.apache.beam.sdk.transforms.join.RawUnionValue;
+import org.apache.beam.sdk.transforms.windowing.BoundedWindow;
+import org.apache.beam.sdk.transforms.windowing.GlobalWindow;
+import org.apache.beam.sdk.transforms.windowing.PaneInfo;
+import org.apache.beam.sdk.transforms.windowing.WindowFn;
+import org.apache.beam.sdk.util.WindowedValue;
+import org.apache.beam.sdk.values.KV;
+import org.apache.beam.sdk.values.TimestampedValue;
+import org.apache.beam.sdk.values.WindowingStrategy;
+import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.collect.BiMap;
+import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.collect.ImmutableMap;
+import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.collect.Iterables;
+import org.apache.spark.api.java.JavaRDD;
+import org.apache.spark.api.java.JavaSparkContext;
+import org.apache.spark.broadcast.Broadcast;
+import org.apache.spark.storage.StorageLevel;
+import org.apache.spark.streaming.api.java.JavaDStream;
+import org.apache.spark.streaming.api.java.JavaInputDStream;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import scala.Tuple2;
+
+/** Translates a bounded portable pipeline into a Spark job. */
+public class SparkStreamingPortablePipelineTranslator
+    implements SparkPortablePipelineTranslator<SparkStreamingTranslationContext> {
+
+  private static final Logger LOG =
+      LoggerFactory.getLogger(SparkStreamingPortablePipelineTranslator.class);
+
+  private final ImmutableMap<String, PTransformTranslator> urnToTransformTranslator;
+
+  interface PTransformTranslator {
+
+    /** Translates transformNode from Beam into the Spark context. */
+    void translate(
+        PTransformNode transformNode,
+        RunnerApi.Pipeline pipeline,
+        SparkStreamingTranslationContext context);
+  }
+
+  @Override
+  public Set<String> knownUrns() {
+    return urnToTransformTranslator.keySet();
+  }
+
+  public SparkStreamingPortablePipelineTranslator() {
+    ImmutableMap.Builder<String, PTransformTranslator> translatorMap = ImmutableMap.builder();
+    translatorMap.put(
+        PTransformTranslation.IMPULSE_TRANSFORM_URN,
+        SparkStreamingPortablePipelineTranslator::translateImpulse);
+    translatorMap.put(
+        PTransformTranslation.GROUP_BY_KEY_TRANSFORM_URN,
+        SparkStreamingPortablePipelineTranslator::translateGroupByKey);
+    translatorMap.put(
+        ExecutableStage.URN, SparkStreamingPortablePipelineTranslator::translateExecutableStage);
+    translatorMap.put(
+        PTransformTranslation.FLATTEN_TRANSFORM_URN,
+        SparkStreamingPortablePipelineTranslator::translateFlatten);
+    translatorMap.put(
+        PTransformTranslation.RESHUFFLE_URN,
+        SparkStreamingPortablePipelineTranslator::translateReshuffle);
+    this.urnToTransformTranslator = translatorMap.build();
+  }
+
+  /** Translates pipeline from Beam into the Spark context. */
+  @Override
+  public void translate(
+      final RunnerApi.Pipeline pipeline, SparkStreamingTranslationContext context) {
+    QueryablePipeline p =
+        QueryablePipeline.forTransforms(
+            pipeline.getRootTransformIdsList(), pipeline.getComponents());
+    for (PipelineNode.PTransformNode transformNode : p.getTopologicallyOrderedTransforms()) {
+      urnToTransformTranslator
+          .getOrDefault(
+              transformNode.getTransform().getSpec().getUrn(),
+              SparkStreamingPortablePipelineTranslator::urnNotFound)
+          .translate(transformNode, pipeline, context);
+    }
+  }
+
+  private static void urnNotFound(
+      PTransformNode transformNode,
+      RunnerApi.Pipeline pipeline,
+      SparkStreamingTranslationContext context) {
+    throw new IllegalArgumentException(
+        String.format(
+            "Transform %s has unknown URN %s",
+            transformNode.getId(), transformNode.getTransform().getSpec().getUrn()));
+  }
+
+  private static void translateImpulse(
+      PTransformNode transformNode,
+      RunnerApi.Pipeline pipeline,
+      SparkStreamingTranslationContext context) {
+
+    TimestampedValue<byte[]> tsValue = TimestampedValue.atMinimumTimestamp(new byte[0]);
+    Iterable<TimestampedValue<byte[]>> timestampedValues = Collections.singletonList(tsValue);
+    Iterable<WindowedValue<byte[]>> windowedValues =
+        StreamSupport.stream(timestampedValues.spliterator(), false)
+            .map(
+                timestampedValue ->
+                    WindowedValue.of(
+                        timestampedValue.getValue(),
+                        timestampedValue.getTimestamp(),
+                        GlobalWindow.INSTANCE,
+                        PaneInfo.NO_FIRING))
+            .collect(Collectors.toList());
+
+    ByteArrayCoder coder = ByteArrayCoder.of();
+
+    WindowedValue.FullWindowedValueCoder<byte[]> windowCoder =
+        WindowedValue.FullWindowedValueCoder.of(coder, GlobalWindow.Coder.INSTANCE);
+    JavaRDD<WindowedValue<byte[]>> emptyRDD =
+        context
+            .getSparkContext()
+            .parallelize(CoderHelpers.toByteArrays(windowedValues, windowCoder))
+            .map(CoderHelpers.fromByteFunction(windowCoder));
+
+    // create input DStream from RDD queue
+    Queue<JavaRDD<WindowedValue<byte[]>>> queueRDD = new LinkedBlockingQueue<>();
+    queueRDD.offer(emptyRDD);
+    JavaInputDStream<WindowedValue<byte[]>> emptyStream =
+        context.getStreamingContext().queueStream(queueRDD, true);
+
+    UnboundedDataset<byte[]> output =
+        new UnboundedDataset<>(
+            emptyStream, Collections.singletonList(emptyStream.inputDStream().id()));
+
+    GlobalWatermarkHolder.SparkWatermarks sparkWatermark =
+        new GlobalWatermarkHolder.SparkWatermarks(
+            GlobalWindow.INSTANCE.maxTimestamp(),
+            BoundedWindow.TIMESTAMP_MAX_VALUE,
+            context.getFirstTimestamp());
+    GlobalWatermarkHolder.add(output.getStreamSources().get(0), sparkWatermark);
+
+    context.pushDataset(getOutputId(transformNode), output);
+  }
+
+  private static <K, V> void translateGroupByKey(
+      PTransformNode transformNode,
+      RunnerApi.Pipeline pipeline,
+      SparkStreamingTranslationContext context) {
+    RunnerApi.Components components = pipeline.getComponents();
+    String inputId = getInputId(transformNode);
+    UnboundedDataset<KV<K, V>> inputDataset =
+        (UnboundedDataset<KV<K, V>>) context.popDataset(inputId);
+    List<Integer> streamSources = inputDataset.getStreamSources();
+    JavaDStream<WindowedValue<KV<K, V>>> dStream = inputDataset.getDStream();
+    WindowedValue.WindowedValueCoder<KV<K, V>> inputCoder =
+        getWindowedValueCoder(inputId, components);
+    KvCoder<K, V> inputKvCoder = (KvCoder<K, V>) inputCoder.getValueCoder();
+    Coder<K> inputKeyCoder = inputKvCoder.getKeyCoder();
+    Coder<V> inputValueCoder = inputKvCoder.getValueCoder();
+    final WindowingStrategy windowingStrategy = getWindowingStrategy(inputId, components);
+    final WindowFn<Object, BoundedWindow> windowFn = windowingStrategy.getWindowFn();
+    final WindowedValue.WindowedValueCoder<V> wvCoder =
+        WindowedValue.FullWindowedValueCoder.of(inputValueCoder, windowFn.windowCoder());
+
+    JavaDStream<WindowedValue<KV<K, Iterable<V>>>> outStream =
+        SparkGroupAlsoByWindowViaWindowSet.groupByKeyAndWindow(
+            dStream,
+            inputKeyCoder,
+            wvCoder,
+            windowingStrategy,
+            context.getSerializableOptions(),
+            streamSources,
+            transformNode.getId());
+
+    context.pushDataset(
+        getOutputId(transformNode), new UnboundedDataset<>(outStream, streamSources));
+  }
+
+  private static <InputT, OutputT, SideInputT> void translateExecutableStage(
+      PTransformNode transformNode,
+      RunnerApi.Pipeline pipeline,
+      SparkStreamingTranslationContext context) {
+    RunnerApi.ExecutableStagePayload stagePayload;
+    try {
+      stagePayload =
+          RunnerApi.ExecutableStagePayload.parseFrom(
+              transformNode.getTransform().getSpec().getPayload());
+    } catch (IOException e) {
+      throw new RuntimeException(e);
+    }
+
+    String inputPCollectionId = stagePayload.getInput();
+    UnboundedDataset<InputT> inputDataset =
+        (UnboundedDataset<InputT>) context.popDataset(inputPCollectionId);
+    List<Integer> streamSources = inputDataset.getStreamSources();
+    JavaDStream<WindowedValue<InputT>> inputDStream = inputDataset.getDStream();
+    Map<String, String> outputs = transformNode.getTransform().getOutputsMap();
+    BiMap<String, Integer> outputMap = createOutputMap(outputs.values());
+
+    RunnerApi.Components components = pipeline.getComponents();
+    Coder windowCoder =
+        getWindowingStrategy(inputPCollectionId, components).getWindowFn().windowCoder();
+
+    // TODO: handle side inputs?
+    ImmutableMap<
+            String, Tuple2<Broadcast<List<byte[]>>, WindowedValue.WindowedValueCoder<SideInputT>>>
+        broadcastVariables = ImmutableMap.copyOf(new HashMap<>());
+
+    SparkExecutableStageFunction<InputT, SideInputT> function =
+        new SparkExecutableStageFunction<>(
+            stagePayload,
+            context.jobInfo,
+            outputMap,
+            SparkExecutableStageContextFactory.getInstance(),
+            broadcastVariables,
+            MetricsAccumulator.getInstance(),
+            windowCoder);
+    JavaDStream<RawUnionValue> staged = inputDStream.mapPartitions(function);
+
+    String intermediateId = getExecutableStageIntermediateId(transformNode);
+    context.pushDataset(
+        intermediateId,
+        new Dataset() {
+          @Override
+          public void cache(String storageLevel, Coder<?> coder) {
+            StorageLevel level = StorageLevel.fromString(storageLevel);
+            staged.persist(level);
+          }
+
+          @Override
+          public void action() {
+            // Empty function to force computation of RDD.
+            staged.foreachRDD(TranslationUtils.emptyVoidFunction());
+          }
+
+          @Override
+          public void setName(String name) {
+            // ignore
+          }
+        });
+    // pop dataset to mark RDD as used
+    context.popDataset(intermediateId);
+
+    for (String outputId : outputs.values()) {
+      JavaDStream<WindowedValue<OutputT>> outStream =
+          staged.flatMap(new SparkExecutableStageExtractionFunction<>(outputMap.get(outputId)));
+      context.pushDataset(outputId, new UnboundedDataset<>(outStream, streamSources));
+    }
+
+    if (outputs.isEmpty()) {
+      // Add sink to ensure all outputs are computed
+      JavaDStream<WindowedValue<OutputT>> outStream =
+          staged.flatMap((rawUnionValue) -> Collections.emptyIterator());
+      context.pushDataset(
+          String.format("EmptyOutputSink_%d", context.nextSinkId()),
+          new UnboundedDataset<>(outStream, streamSources));
+    }
+  }
+
+  private static <T> void translateFlatten(
+      PTransformNode transformNode,
+      RunnerApi.Pipeline pipeline,
+      SparkStreamingTranslationContext context) {
+    Map<String, String> inputsMap = transformNode.getTransform().getInputsMap();
+    JavaDStream<WindowedValue<T>> unifiedStreams;
+    final List<Integer> streamSources = new ArrayList<>();
+
+    if (inputsMap.isEmpty()) {
+      Queue<JavaRDD<WindowedValue<T>>> q = new LinkedBlockingQueue<>();
+      q.offer(context.getSparkContext().emptyRDD());
+      unifiedStreams = context.getStreamingContext().queueStream(q);
+    } else {
+      final List<JavaDStream<WindowedValue<T>>> dStreams = new ArrayList<>();
+      for (String inputId : inputsMap.values()) {
+        Dataset dataset = context.popDataset(inputId);
+        if (dataset instanceof UnboundedDataset) {
+          UnboundedDataset<T> unboundedDataset = (UnboundedDataset<T>) dataset;
+          streamSources.addAll(unboundedDataset.getStreamSources());
+          dStreams.add(unboundedDataset.getDStream());
+        } else {
+          // create a single RDD stream.
+          Queue<JavaRDD<WindowedValue<T>>> q = new LinkedBlockingQueue<>();
+          q.offer(((BoundedDataset) dataset).getRDD());
+          // TODO: this is not recoverable from checkpoint!
+          JavaDStream<WindowedValue<T>> dStream = context.getStreamingContext().queueStream(q);
+          dStreams.add(dStream);
+        }
+      }
+      // Unify streams into a single stream.
+      unifiedStreams = SparkCompat.joinStreams(context.getStreamingContext(), dStreams);
+    }
+
+    context.pushDataset(
+        getOutputId(transformNode), new UnboundedDataset<>(unifiedStreams, streamSources));
+  }
+
+  private static <T> void translateReshuffle(
+      PTransformNode transformNode,
+      RunnerApi.Pipeline pipeline,
+      SparkStreamingTranslationContext context) {
+    String inputId = getInputId(transformNode);
+    UnboundedDataset<T> inputDataset = (UnboundedDataset<T>) context.popDataset(inputId);
+    List<Integer> streamSources = inputDataset.getStreamSources();
+    JavaDStream<WindowedValue<T>> dStream = inputDataset.getDStream();
+    WindowedValue.WindowedValueCoder<T> coder =
+        getWindowedValueCoder(inputId, pipeline.getComponents());
+
+    JavaDStream<WindowedValue<T>> reshuffledStream =
+        dStream.transform(rdd -> GroupCombineFunctions.reshuffle(rdd, coder));
+
+    context.pushDataset(
+        getOutputId(transformNode), new UnboundedDataset<>(reshuffledStream, streamSources));
+  }
+
+  private static String getInputId(PTransformNode transformNode) {
+    return Iterables.getOnlyElement(transformNode.getTransform().getInputsMap().values());
+  }
+
+  private static String getOutputId(PTransformNode transformNode) {
+    return Iterables.getOnlyElement(transformNode.getTransform().getOutputsMap().values());
+  }
+
+  private static String getExecutableStageIntermediateId(PTransformNode transformNode) {
+    return transformNode.getId();
+  }
+
+  private static <T> WindowedValue.WindowedValueCoder<T> getWindowedValueCoder(

Review comment:
       We should move these common methods into a shared class like [`PipelineTranslatorUtils`](https://github.com/apache/beam/blob/master/runners/java-fn-execution/src/main/java/org/apache/beam/runners/fnexecution/translation/PipelineTranslatorUtils.java).

##########
File path: runners/spark/src/main/java/org/apache/beam/runners/spark/translation/SparkStreamingPortablePipelineTranslator.java
##########
@@ -0,0 +1,393 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.runners.spark.translation;
+
+import static org.apache.beam.runners.fnexecution.translation.PipelineTranslatorUtils.createOutputMap;
+import static org.apache.beam.runners.fnexecution.translation.PipelineTranslatorUtils.getWindowingStrategy;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Queue;
+import java.util.Set;
+import java.util.concurrent.LinkedBlockingQueue;
+import java.util.stream.Collectors;
+import java.util.stream.StreamSupport;
+import org.apache.beam.model.pipeline.v1.RunnerApi;
+import org.apache.beam.runners.core.construction.PTransformTranslation;
+import org.apache.beam.runners.core.construction.graph.ExecutableStage;
+import org.apache.beam.runners.core.construction.graph.PipelineNode;
+import org.apache.beam.runners.core.construction.graph.PipelineNode.PTransformNode;
+import org.apache.beam.runners.core.construction.graph.QueryablePipeline;
+import org.apache.beam.runners.fnexecution.provisioning.JobInfo;
+import org.apache.beam.runners.fnexecution.wire.WireCoders;
+import org.apache.beam.runners.spark.SparkPipelineOptions;
+import org.apache.beam.runners.spark.coders.CoderHelpers;
+import org.apache.beam.runners.spark.metrics.MetricsAccumulator;
+import org.apache.beam.runners.spark.stateful.SparkGroupAlsoByWindowViaWindowSet;
+import org.apache.beam.runners.spark.translation.streaming.UnboundedDataset;
+import org.apache.beam.runners.spark.util.GlobalWatermarkHolder;
+import org.apache.beam.runners.spark.util.SparkCompat;
+import org.apache.beam.sdk.coders.ByteArrayCoder;
+import org.apache.beam.sdk.coders.Coder;
+import org.apache.beam.sdk.coders.KvCoder;
+import org.apache.beam.sdk.transforms.join.RawUnionValue;
+import org.apache.beam.sdk.transforms.windowing.BoundedWindow;
+import org.apache.beam.sdk.transforms.windowing.GlobalWindow;
+import org.apache.beam.sdk.transforms.windowing.PaneInfo;
+import org.apache.beam.sdk.transforms.windowing.WindowFn;
+import org.apache.beam.sdk.util.WindowedValue;
+import org.apache.beam.sdk.values.KV;
+import org.apache.beam.sdk.values.TimestampedValue;
+import org.apache.beam.sdk.values.WindowingStrategy;
+import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.collect.BiMap;
+import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.collect.ImmutableMap;
+import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.collect.Iterables;
+import org.apache.spark.api.java.JavaRDD;
+import org.apache.spark.api.java.JavaSparkContext;
+import org.apache.spark.broadcast.Broadcast;
+import org.apache.spark.storage.StorageLevel;
+import org.apache.spark.streaming.api.java.JavaDStream;
+import org.apache.spark.streaming.api.java.JavaInputDStream;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import scala.Tuple2;
+
+/** Translates a bounded portable pipeline into a Spark job. */
+public class SparkStreamingPortablePipelineTranslator
+    implements SparkPortablePipelineTranslator<SparkStreamingTranslationContext> {
+
+  private static final Logger LOG =
+      LoggerFactory.getLogger(SparkStreamingPortablePipelineTranslator.class);
+
+  private final ImmutableMap<String, PTransformTranslator> urnToTransformTranslator;
+
+  interface PTransformTranslator {
+
+    /** Translates transformNode from Beam into the Spark context. */
+    void translate(
+        PTransformNode transformNode,
+        RunnerApi.Pipeline pipeline,
+        SparkStreamingTranslationContext context);
+  }
+
+  @Override
+  public Set<String> knownUrns() {
+    return urnToTransformTranslator.keySet();
+  }
+
+  public SparkStreamingPortablePipelineTranslator() {
+    ImmutableMap.Builder<String, PTransformTranslator> translatorMap = ImmutableMap.builder();
+    translatorMap.put(
+        PTransformTranslation.IMPULSE_TRANSFORM_URN,
+        SparkStreamingPortablePipelineTranslator::translateImpulse);
+    translatorMap.put(
+        PTransformTranslation.GROUP_BY_KEY_TRANSFORM_URN,
+        SparkStreamingPortablePipelineTranslator::translateGroupByKey);
+    translatorMap.put(
+        ExecutableStage.URN, SparkStreamingPortablePipelineTranslator::translateExecutableStage);
+    translatorMap.put(
+        PTransformTranslation.FLATTEN_TRANSFORM_URN,
+        SparkStreamingPortablePipelineTranslator::translateFlatten);
+    translatorMap.put(
+        PTransformTranslation.RESHUFFLE_URN,
+        SparkStreamingPortablePipelineTranslator::translateReshuffle);
+    this.urnToTransformTranslator = translatorMap.build();
+  }
+
+  /** Translates pipeline from Beam into the Spark context. */
+  @Override
+  public void translate(
+      final RunnerApi.Pipeline pipeline, SparkStreamingTranslationContext context) {
+    QueryablePipeline p =
+        QueryablePipeline.forTransforms(
+            pipeline.getRootTransformIdsList(), pipeline.getComponents());
+    for (PipelineNode.PTransformNode transformNode : p.getTopologicallyOrderedTransforms()) {
+      urnToTransformTranslator
+          .getOrDefault(
+              transformNode.getTransform().getSpec().getUrn(),
+              SparkStreamingPortablePipelineTranslator::urnNotFound)
+          .translate(transformNode, pipeline, context);
+    }
+  }
+
+  private static void urnNotFound(
+      PTransformNode transformNode,
+      RunnerApi.Pipeline pipeline,
+      SparkStreamingTranslationContext context) {
+    throw new IllegalArgumentException(
+        String.format(
+            "Transform %s has unknown URN %s",
+            transformNode.getId(), transformNode.getTransform().getSpec().getUrn()));
+  }
+
+  private static void translateImpulse(
+      PTransformNode transformNode,
+      RunnerApi.Pipeline pipeline,
+      SparkStreamingTranslationContext context) {
+
+    TimestampedValue<byte[]> tsValue = TimestampedValue.atMinimumTimestamp(new byte[0]);
+    Iterable<TimestampedValue<byte[]>> timestampedValues = Collections.singletonList(tsValue);
+    Iterable<WindowedValue<byte[]>> windowedValues =
+        StreamSupport.stream(timestampedValues.spliterator(), false)
+            .map(
+                timestampedValue ->
+                    WindowedValue.of(
+                        timestampedValue.getValue(),
+                        timestampedValue.getTimestamp(),
+                        GlobalWindow.INSTANCE,
+                        PaneInfo.NO_FIRING))
+            .collect(Collectors.toList());
+
+    ByteArrayCoder coder = ByteArrayCoder.of();
+
+    WindowedValue.FullWindowedValueCoder<byte[]> windowCoder =
+        WindowedValue.FullWindowedValueCoder.of(coder, GlobalWindow.Coder.INSTANCE);
+    JavaRDD<WindowedValue<byte[]>> emptyRDD =
+        context
+            .getSparkContext()
+            .parallelize(CoderHelpers.toByteArrays(windowedValues, windowCoder))
+            .map(CoderHelpers.fromByteFunction(windowCoder));
+
+    // create input DStream from RDD queue

Review comment:
       Nit: this comment is obvious from the code.

##########
File path: runners/spark/src/main/java/org/apache/beam/runners/spark/translation/SparkStreamingTranslationContext.java
##########
@@ -0,0 +1,50 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.runners.spark.translation;
+
+import org.apache.beam.runners.fnexecution.provisioning.JobInfo;
+import org.apache.beam.runners.spark.SparkPipelineOptions;
+import org.apache.spark.api.java.JavaSparkContext;
+import org.apache.spark.streaming.Duration;
+import org.apache.spark.streaming.api.java.JavaStreamingContext;
+import org.joda.time.Instant;
+
+/**
+ * Translation context used to lazily store Spark data sets during portable pipeline translation and
+ * compute them after translation.
+ */
+public class SparkStreamingTranslationContext extends SparkTranslationContext {
+  private final JavaStreamingContext streamingContext;
+  private final Instant firstTimestamp;

Review comment:
       Nit: `firstTimestamp` sounds a little strange here, since it's the only timestamp around. Maybe `initialTimestamp` or something?

##########
File path: runners/spark/src/main/java/org/apache/beam/runners/spark/translation/SparkStreamingPortablePipelineTranslator.java
##########
@@ -0,0 +1,393 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.runners.spark.translation;
+
+import static org.apache.beam.runners.fnexecution.translation.PipelineTranslatorUtils.createOutputMap;
+import static org.apache.beam.runners.fnexecution.translation.PipelineTranslatorUtils.getWindowingStrategy;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Queue;
+import java.util.Set;
+import java.util.concurrent.LinkedBlockingQueue;
+import java.util.stream.Collectors;
+import java.util.stream.StreamSupport;
+import org.apache.beam.model.pipeline.v1.RunnerApi;
+import org.apache.beam.runners.core.construction.PTransformTranslation;
+import org.apache.beam.runners.core.construction.graph.ExecutableStage;
+import org.apache.beam.runners.core.construction.graph.PipelineNode;
+import org.apache.beam.runners.core.construction.graph.PipelineNode.PTransformNode;
+import org.apache.beam.runners.core.construction.graph.QueryablePipeline;
+import org.apache.beam.runners.fnexecution.provisioning.JobInfo;
+import org.apache.beam.runners.fnexecution.wire.WireCoders;
+import org.apache.beam.runners.spark.SparkPipelineOptions;
+import org.apache.beam.runners.spark.coders.CoderHelpers;
+import org.apache.beam.runners.spark.metrics.MetricsAccumulator;
+import org.apache.beam.runners.spark.stateful.SparkGroupAlsoByWindowViaWindowSet;
+import org.apache.beam.runners.spark.translation.streaming.UnboundedDataset;
+import org.apache.beam.runners.spark.util.GlobalWatermarkHolder;
+import org.apache.beam.runners.spark.util.SparkCompat;
+import org.apache.beam.sdk.coders.ByteArrayCoder;
+import org.apache.beam.sdk.coders.Coder;
+import org.apache.beam.sdk.coders.KvCoder;
+import org.apache.beam.sdk.transforms.join.RawUnionValue;
+import org.apache.beam.sdk.transforms.windowing.BoundedWindow;
+import org.apache.beam.sdk.transforms.windowing.GlobalWindow;
+import org.apache.beam.sdk.transforms.windowing.PaneInfo;
+import org.apache.beam.sdk.transforms.windowing.WindowFn;
+import org.apache.beam.sdk.util.WindowedValue;
+import org.apache.beam.sdk.values.KV;
+import org.apache.beam.sdk.values.TimestampedValue;
+import org.apache.beam.sdk.values.WindowingStrategy;
+import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.collect.BiMap;
+import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.collect.ImmutableMap;
+import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.collect.Iterables;
+import org.apache.spark.api.java.JavaRDD;
+import org.apache.spark.api.java.JavaSparkContext;
+import org.apache.spark.broadcast.Broadcast;
+import org.apache.spark.storage.StorageLevel;
+import org.apache.spark.streaming.api.java.JavaDStream;
+import org.apache.spark.streaming.api.java.JavaInputDStream;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import scala.Tuple2;
+
+/** Translates a bounded portable pipeline into a Spark job. */
+public class SparkStreamingPortablePipelineTranslator
+    implements SparkPortablePipelineTranslator<SparkStreamingTranslationContext> {
+
+  private static final Logger LOG =
+      LoggerFactory.getLogger(SparkStreamingPortablePipelineTranslator.class);
+
+  private final ImmutableMap<String, PTransformTranslator> urnToTransformTranslator;
+
+  interface PTransformTranslator {
+
+    /** Translates transformNode from Beam into the Spark context. */
+    void translate(
+        PTransformNode transformNode,
+        RunnerApi.Pipeline pipeline,
+        SparkStreamingTranslationContext context);
+  }
+
+  @Override
+  public Set<String> knownUrns() {
+    return urnToTransformTranslator.keySet();
+  }
+
+  public SparkStreamingPortablePipelineTranslator() {
+    ImmutableMap.Builder<String, PTransformTranslator> translatorMap = ImmutableMap.builder();
+    translatorMap.put(
+        PTransformTranslation.IMPULSE_TRANSFORM_URN,
+        SparkStreamingPortablePipelineTranslator::translateImpulse);
+    translatorMap.put(
+        PTransformTranslation.GROUP_BY_KEY_TRANSFORM_URN,
+        SparkStreamingPortablePipelineTranslator::translateGroupByKey);
+    translatorMap.put(
+        ExecutableStage.URN, SparkStreamingPortablePipelineTranslator::translateExecutableStage);
+    translatorMap.put(
+        PTransformTranslation.FLATTEN_TRANSFORM_URN,
+        SparkStreamingPortablePipelineTranslator::translateFlatten);
+    translatorMap.put(
+        PTransformTranslation.RESHUFFLE_URN,
+        SparkStreamingPortablePipelineTranslator::translateReshuffle);
+    this.urnToTransformTranslator = translatorMap.build();
+  }
+
+  /** Translates pipeline from Beam into the Spark context. */
+  @Override
+  public void translate(
+      final RunnerApi.Pipeline pipeline, SparkStreamingTranslationContext context) {
+    QueryablePipeline p =
+        QueryablePipeline.forTransforms(
+            pipeline.getRootTransformIdsList(), pipeline.getComponents());
+    for (PipelineNode.PTransformNode transformNode : p.getTopologicallyOrderedTransforms()) {
+      urnToTransformTranslator
+          .getOrDefault(
+              transformNode.getTransform().getSpec().getUrn(),
+              SparkStreamingPortablePipelineTranslator::urnNotFound)
+          .translate(transformNode, pipeline, context);
+    }
+  }
+
+  private static void urnNotFound(
+      PTransformNode transformNode,
+      RunnerApi.Pipeline pipeline,
+      SparkStreamingTranslationContext context) {
+    throw new IllegalArgumentException(
+        String.format(
+            "Transform %s has unknown URN %s",
+            transformNode.getId(), transformNode.getTransform().getSpec().getUrn()));
+  }
+
+  private static void translateImpulse(
+      PTransformNode transformNode,
+      RunnerApi.Pipeline pipeline,
+      SparkStreamingTranslationContext context) {
+
+    TimestampedValue<byte[]> tsValue = TimestampedValue.atMinimumTimestamp(new byte[0]);
+    Iterable<TimestampedValue<byte[]>> timestampedValues = Collections.singletonList(tsValue);
+    Iterable<WindowedValue<byte[]>> windowedValues =
+        StreamSupport.stream(timestampedValues.spliterator(), false)
+            .map(
+                timestampedValue ->
+                    WindowedValue.of(
+                        timestampedValue.getValue(),
+                        timestampedValue.getTimestamp(),
+                        GlobalWindow.INSTANCE,
+                        PaneInfo.NO_FIRING))
+            .collect(Collectors.toList());
+
+    ByteArrayCoder coder = ByteArrayCoder.of();
+
+    WindowedValue.FullWindowedValueCoder<byte[]> windowCoder =
+        WindowedValue.FullWindowedValueCoder.of(coder, GlobalWindow.Coder.INSTANCE);
+    JavaRDD<WindowedValue<byte[]>> emptyRDD =

Review comment:
       Nit: variable naming. An RDD containing an empty byte array is not empty itself.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org