You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@beam.apache.org by ib...@apache.org on 2020/08/22 16:59:49 UTC

[beam] branch master updated: [BEAM-7587] Spark portable streaming (#12157)

This is an automated email from the ASF dual-hosted git repository.

ibzib pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/beam.git


The following commit(s) were added to refs/heads/master by this push:
     new 10361a3  [BEAM-7587] Spark portable streaming (#12157)
10361a3 is described below

commit 10361a3e138532cffd086c98016235c4cd2abcf8
Author: annaqin418 <an...@google.com>
AuthorDate: Sat Aug 22 12:59:05 2020 -0400

    [BEAM-7587] Spark portable streaming (#12157)
    
    [BEAM-7587] Implement Spark portable streaming.
---
 ..._PortableValidatesRunner_Spark_Streaming.groovy |  43 +++
 .../translation/PipelineTranslatorUtils.java       |  29 ++
 runners/spark/job-server/build.gradle              | 110 +++++--
 .../beam/runners/spark/SparkPipelineResult.java    |  18 +-
 .../beam/runners/spark/SparkPipelineRunner.java    | 109 +++++--
 .../SparkPortableStreamingPipelineOptions.java     |  36 +++
 .../beam/runners/spark/SparkRunnerRegistrar.java   |   4 +-
 .../SparkBatchPortablePipelineTranslator.java      |  43 +--
 .../translation/SparkExecutableStageFunction.java  |   7 +
 .../SparkPortablePipelineTranslator.java           |  38 +++
 .../SparkStreamingPortablePipelineTranslator.java  | 360 +++++++++++++++++++++
 .../SparkStreamingTranslationContext.java          |  50 +++
 .../spark/translation/SparkTranslationContext.java |   4 +
 .../streaming/StreamingTransformTranslator.java    |   2 +-
 .../translation/streaming/UnboundedDataset.java    |   4 +-
 .../runners/spark/SparkRunnerRegistrarTest.java    |   5 +-
 .../SparkExecutableStageFunctionTest.java          |  22 +-
 17 files changed, 803 insertions(+), 81 deletions(-)

diff --git a/.test-infra/jenkins/job_PostCommit_Java_PortableValidatesRunner_Spark_Streaming.groovy b/.test-infra/jenkins/job_PostCommit_Java_PortableValidatesRunner_Spark_Streaming.groovy
new file mode 100644
index 0000000..bfc5063
--- /dev/null
+++ b/.test-infra/jenkins/job_PostCommit_Java_PortableValidatesRunner_Spark_Streaming.groovy
@@ -0,0 +1,43 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+import CommonJobProperties as commonJobProperties
+import PostcommitJobBuilder
+
+// This job runs the suite of Java ValidatesRunner tests against the Spark runner in streaming mode.
+PostcommitJobBuilder.postCommitJob('beam_PostCommit_Java_PVR_Spark_Streaming',
+    'Run Java Spark PortableValidatesRunner Streaming', 'Java Spark PortableValidatesRunner Streaming Tests', this) {
+      description('Runs the Java PortableValidatesRunner suite on the Spark runner in streaming mode.')
+
+      // Set common parameters.
+      commonJobProperties.setTopLevelMainJobProperties(delegate)
+
+      // Publish all test results to Jenkins
+      publishers {
+        archiveJunit('**/build/test-results/**/*.xml')
+      }
+
+      // Gradle goals for this job.
+      steps {
+        gradle {
+          rootBuildScriptDir(commonJobProperties.checkoutDir)
+          tasks(':runners:spark:job-server:validatesPortableRunnerStreaming')
+          commonJobProperties.setGradleSwitches(delegate)
+        }
+      }
+    }
diff --git a/runners/java-fn-execution/src/main/java/org/apache/beam/runners/fnexecution/translation/PipelineTranslatorUtils.java b/runners/java-fn-execution/src/main/java/org/apache/beam/runners/fnexecution/translation/PipelineTranslatorUtils.java
index 1fa4ef0..4705290 100644
--- a/runners/java-fn-execution/src/main/java/org/apache/beam/runners/fnexecution/translation/PipelineTranslatorUtils.java
+++ b/runners/java-fn-execution/src/main/java/org/apache/beam/runners/fnexecution/translation/PipelineTranslatorUtils.java
@@ -47,6 +47,7 @@ import org.apache.beam.vendor.grpc.v1p26p0.com.google.protobuf.InvalidProtocolBu
 import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.base.Preconditions;
 import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.collect.BiMap;
 import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.collect.ImmutableBiMap;
+import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.collect.Iterables;
 import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.collect.Sets;
 import org.joda.time.Instant;
 
@@ -163,4 +164,32 @@ public final class PipelineTranslatorUtils {
           String.format(Locale.ENGLISH, "Failed to process timer: %s", timerValue));
     }
   }
+
+  public static <T> WindowedValue.WindowedValueCoder<T> getWindowedValueCoder(
+      String pCollectionId, RunnerApi.Components components) {
+    RunnerApi.PCollection pCollection = components.getPcollectionsOrThrow(pCollectionId);
+    PipelineNode.PCollectionNode pCollectionNode =
+        PipelineNode.pCollection(pCollectionId, pCollection);
+    WindowedValue.WindowedValueCoder<T> coder;
+    try {
+      coder =
+          (WindowedValue.WindowedValueCoder)
+              WireCoders.instantiateRunnerWireCoder(pCollectionNode, components);
+    } catch (IOException e) {
+      throw new RuntimeException(e);
+    }
+    return coder;
+  }
+
+  public static String getInputId(PipelineNode.PTransformNode transformNode) {
+    return Iterables.getOnlyElement(transformNode.getTransform().getInputsMap().values());
+  }
+
+  public static String getOutputId(PipelineNode.PTransformNode transformNode) {
+    return Iterables.getOnlyElement(transformNode.getTransform().getOutputsMap().values());
+  }
+
+  public static String getExecutableStageIntermediateId(PipelineNode.PTransformNode transformNode) {
+    return transformNode.getId();
+  }
 }
diff --git a/runners/spark/job-server/build.gradle b/runners/spark/job-server/build.gradle
index bd05d0f..cfde392 100644
--- a/runners/spark/job-server/build.gradle
+++ b/runners/spark/job-server/build.gradle
@@ -82,20 +82,16 @@ runShadow {
     jvmArgs += ["-Dorg.slf4j.simpleLogger.defaultLogLevel=${project.property('logLevel')}"]
 }
 
-def portableValidatesRunnerTask(String name) {
-  createPortableValidatesRunnerTask(
-    name: "validatesPortableRunner${name}",
-    jobServerDriver: "org.apache.beam.runners.spark.SparkJobServerDriver",
-    jobServerConfig: "--job-host=localhost,--job-port=0,--artifact-port=0,--expansion-port=0",
-    testClasspathConfiguration: configurations.validatesPortableRunner,
-    numParallelTests: 4,
-    environment: BeamModulePlugin.PortableValidatesRunnerConfiguration.Environment.EMBEDDED,
-    systemProperties: [
-      "beam.spark.test.reuseSparkContext": "false",
-      "spark.ui.enabled": "false",
-      "spark.ui.showConsoleProgress": "false",
-    ],
-    testCategories: {
+def portableValidatesRunnerTask(String name, Boolean streaming) {
+  def pipelineOptions = []
+  def testCategories
+  def testFilter
+
+  if (streaming) {
+    pipelineOptions += "--streaming"
+    pipelineOptions += "--streamingTimeoutMs=30000"
+
+    testCategories = {
       includeCategories 'org.apache.beam.sdk.testing.ValidatesRunner'
       excludeCategories 'org.apache.beam.sdk.testing.FlattenWithHeterogeneousCoders'
       excludeCategories 'org.apache.beam.sdk.testing.LargeKeys$Above100MB'
@@ -111,24 +107,96 @@ def portableValidatesRunnerTask(String name) {
       excludeCategories 'org.apache.beam.sdk.testing.UsesKeyInParDo'
       excludeCategories 'org.apache.beam.sdk.testing.UsesOnWindowExpiration'
       excludeCategories 'org.apache.beam.sdk.testing.UsesTestStream'
-      //SplitableDoFnTests
+      // TODO (BEAM-7222) SplittableDoFnTests
       excludeCategories 'org.apache.beam.sdk.testing.UsesBoundedSplittableParDo'
       excludeCategories 'org.apache.beam.sdk.testing.UsesSplittableParDoWithWindowedSideInputs'
       excludeCategories 'org.apache.beam.sdk.testing.UsesUnboundedSplittableParDo'
       excludeCategories 'org.apache.beam.sdk.testing.UsesStrictTimerOrdering'
       excludeCategories 'org.apache.beam.sdk.testing.UsesBundleFinalizer'
-    },
-    testFilter: {
-      // TODO(BEAM-10094)
+      // Currently unsupported in portable streaming:
+      // TODO (BEAM-10712)
+      excludeCategories 'org.apache.beam.sdk.testing.UsesSideInputs'
+      // TODO (BEAM-10754)
+      excludeCategories 'org.apache.beam.sdk.testing.UsesStatefulParDo'
+      // TODO (BEAM-10755)
+      excludeCategories 'org.apache.beam.sdk.testing.UsesTimersInParDo'
+    }
+
+    testFilter = {
+      // TODO (BEAM-10094)
       excludeTestsMatching 'org.apache.beam.sdk.transforms.FlattenTest.testFlattenWithDifferentInputAndOutputCoders2'
-    },
-  )
+      // TODO (BEAM-10784) Currently unsupported in portable streaming:
+      // // Timeout error
+      excludeTestsMatching 'org.apache.beam.sdk.testing.PAssertTest.testWindowedContainsInAnyOrder'
+      excludeTestsMatching 'org.apache.beam.sdk.testing.PAssertTest.testWindowedSerializablePredicate'
+      excludeTestsMatching 'org.apache.beam.sdk.transforms.windowing.WindowTest.testNoWindowFnDoesNotReassignWindows'
+      // // Assertion error: empty iterable output
+      excludeTestsMatching 'org.apache.beam.sdk.transforms.CombineTest$WindowingTests.testFixedWindowsCombine'
+      excludeTestsMatching 'org.apache.beam.sdk.transforms.CombineTest$WindowingTests.testSessionsCombine'
+      excludeTestsMatching 'org.apache.beam.sdk.transforms.ReshuffleTest.testReshuffleAfterFixedWindowsAndGroupByKey'
+      excludeTestsMatching 'org.apache.beam.sdk.transforms.ReshuffleTest.testReshuffleAfterSessionsAndGroupByKey'
+      excludeTestsMatching 'org.apache.beam.sdk.transforms.ReshuffleTest.testReshuffleAfterSlidingWindowsAndGroupByKey'
+      excludeTestsMatching 'org.apache.beam.sdk.transforms.join.CoGroupByKeyTest.testCoGroupByKeyWithWindowing'
+      excludeTestsMatching 'org.apache.beam.sdk.transforms.windowing.WindowingTest'
+      // // Assertion error: incorrect output
+      excludeTestsMatching 'CombineTest$BasicTests.testHotKeyCombining'
+    }
+  }
+  else {
+    testCategories = {
+      includeCategories 'org.apache.beam.sdk.testing.ValidatesRunner'
+      excludeCategories 'org.apache.beam.sdk.testing.FlattenWithHeterogeneousCoders'
+      excludeCategories 'org.apache.beam.sdk.testing.LargeKeys$Above100MB'
+      excludeCategories 'org.apache.beam.sdk.testing.UsesCommittedMetrics'
+      excludeCategories 'org.apache.beam.sdk.testing.UsesCustomWindowMerging'
+      excludeCategories 'org.apache.beam.sdk.testing.UsesFailureMessage'
+      excludeCategories 'org.apache.beam.sdk.testing.UsesGaugeMetrics'
+      excludeCategories 'org.apache.beam.sdk.testing.UsesParDoLifecycle'
+      excludeCategories 'org.apache.beam.sdk.testing.UsesMapState'
+      excludeCategories 'org.apache.beam.sdk.testing.UsesSetState'
+      excludeCategories 'org.apache.beam.sdk.testing.UsesOrderedListState'
+      excludeCategories 'org.apache.beam.sdk.testing.UsesTimerMap'
+      excludeCategories 'org.apache.beam.sdk.testing.UsesUnboundedPCollections'
+      excludeCategories 'org.apache.beam.sdk.testing.UsesKeyInParDo'
+      excludeCategories 'org.apache.beam.sdk.testing.UsesOnWindowExpiration'
+      excludeCategories 'org.apache.beam.sdk.testing.UsesTestStream'
+      // TODO (BEAM-7222) SplittableDoFnTests
+      excludeCategories 'org.apache.beam.sdk.testing.UsesBoundedSplittableParDo'
+      excludeCategories 'org.apache.beam.sdk.testing.UsesSplittableParDoWithWindowedSideInputs'
+      excludeCategories 'org.apache.beam.sdk.testing.UsesUnboundedSplittableParDo'
+      excludeCategories 'org.apache.beam.sdk.testing.UsesStrictTimerOrdering'
+      excludeCategories 'org.apache.beam.sdk.testing.UsesBundleFinalizer'
+    }
+    testFilter = {
+      // TODO (BEAM-10094)
+      excludeTestsMatching 'org.apache.beam.sdk.transforms.FlattenTest.testFlattenWithDifferentInputAndOutputCoders2'
+    }
+  }
+
+  createPortableValidatesRunnerTask(
+          name: "validatesPortableRunner${name}",
+          jobServerDriver: "org.apache.beam.runners.spark.SparkJobServerDriver",
+          jobServerConfig: "--job-host=localhost,--job-port=0,--artifact-port=0,--expansion-port=0",
+          testClasspathConfiguration: configurations.validatesPortableRunner,
+          numParallelTests: 4,
+          pipelineOpts: pipelineOptions,
+          environment: BeamModulePlugin.PortableValidatesRunnerConfiguration.Environment.EMBEDDED,
+          systemProperties: [
+                  "beam.spark.test.reuseSparkContext": "false",
+                  "spark.ui.enabled": "false",
+                  "spark.ui.showConsoleProgress": "false",
+          ],
+          testCategories: testCategories,
+          testFilter: testFilter,
+    )
 }
 
-project.ext.validatesPortableRunnerBatch = portableValidatesRunnerTask("Batch")
+project.ext.validatesPortableRunnerBatch = portableValidatesRunnerTask("Batch", false)
+project.ext.validatesPortableRunnerStreaming = portableValidatesRunnerTask("Streaming", true)
 
 task validatesPortableRunner() {
   dependsOn validatesPortableRunnerBatch
+  dependsOn validatesPortableRunnerStreaming
 }
 
 def jobPort = BeamModulePlugin.startingExpansionPortNumber.getAndDecrement()
diff --git a/runners/spark/src/main/java/org/apache/beam/runners/spark/SparkPipelineResult.java b/runners/spark/src/main/java/org/apache/beam/runners/spark/SparkPipelineResult.java
index 44d2524..5c0656d 100644
--- a/runners/spark/src/main/java/org/apache/beam/runners/spark/SparkPipelineResult.java
+++ b/runners/spark/src/main/java/org/apache/beam/runners/spark/SparkPipelineResult.java
@@ -37,8 +37,6 @@ import org.apache.spark.SparkException;
 import org.apache.spark.api.java.JavaSparkContext;
 import org.apache.spark.streaming.api.java.JavaStreamingContext;
 import org.joda.time.Duration;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
 
 /** Represents a Spark pipeline execution result. */
 public abstract class SparkPipelineResult implements PipelineResult {
@@ -148,8 +146,6 @@ public abstract class SparkPipelineResult implements PipelineResult {
 
   static class PortableBatchMode extends BatchMode implements PortablePipelineResult {
 
-    private static final Logger LOG = LoggerFactory.getLogger(BatchMode.class);
-
     PortableBatchMode(Future<?> pipelineExecution, JavaSparkContext javaSparkContext) {
       super(pipelineExecution, javaSparkContext);
     }
@@ -213,6 +209,20 @@ public abstract class SparkPipelineResult implements PipelineResult {
     }
   }
 
+  static class PortableStreamingMode extends StreamingMode implements PortablePipelineResult {
+
+    PortableStreamingMode(Future<?> pipelineExecution, JavaStreamingContext javaStreamingContext) {
+      super(pipelineExecution, javaStreamingContext);
+    }
+
+    @Override
+    public JobApi.MetricResults portableMetrics() {
+      return JobApi.MetricResults.newBuilder()
+          .addAllAttempted(MetricsAccumulator.getInstance().value().getMonitoringInfos())
+          .build();
+    }
+  }
+
   private void offerNewState(State newState) {
     State oldState = this.state;
     this.state = newState;
diff --git a/runners/spark/src/main/java/org/apache/beam/runners/spark/SparkPipelineRunner.java b/runners/spark/src/main/java/org/apache/beam/runners/spark/SparkPipelineRunner.java
index 304fcc1..f33cb47 100644
--- a/runners/spark/src/main/java/org/apache/beam/runners/spark/SparkPipelineRunner.java
+++ b/runners/spark/src/main/java/org/apache/beam/runners/spark/SparkPipelineRunner.java
@@ -18,6 +18,7 @@
 package org.apache.beam.runners.spark;
 
 import static org.apache.beam.runners.core.construction.resources.PipelineResources.detectClassPathResourcesToStage;
+import static org.apache.beam.runners.fnexecution.translation.PipelineTranslatorUtils.hasUnboundedPCollections;
 import static org.apache.beam.runners.spark.SparkPipelineOptions.prepareFilesToStage;
 
 import java.util.UUID;
@@ -42,7 +43,11 @@ import org.apache.beam.runners.spark.aggregators.AggregatorsAccumulator;
 import org.apache.beam.runners.spark.metrics.MetricsAccumulator;
 import org.apache.beam.runners.spark.translation.SparkBatchPortablePipelineTranslator;
 import org.apache.beam.runners.spark.translation.SparkContextFactory;
+import org.apache.beam.runners.spark.translation.SparkPortablePipelineTranslator;
+import org.apache.beam.runners.spark.translation.SparkStreamingPortablePipelineTranslator;
+import org.apache.beam.runners.spark.translation.SparkStreamingTranslationContext;
 import org.apache.beam.runners.spark.translation.SparkTranslationContext;
+import org.apache.beam.runners.spark.util.GlobalWatermarkHolder;
 import org.apache.beam.sdk.io.FileSystems;
 import org.apache.beam.sdk.metrics.MetricsEnvironment;
 import org.apache.beam.sdk.metrics.MetricsOptions;
@@ -52,6 +57,9 @@ import org.apache.beam.sdk.options.PortablePipelineOptions.RetrievalServiceType;
 import org.apache.beam.vendor.grpc.v1p26p0.com.google.protobuf.Struct;
 import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.base.Preconditions;
 import org.apache.spark.api.java.JavaSparkContext;
+import org.apache.spark.streaming.api.java.JavaStreamingContext;
+import org.apache.spark.streaming.api.java.JavaStreamingListener;
+import org.apache.spark.streaming.api.java.JavaStreamingListenerWrapper;
 import org.kohsuke.args4j.CmdLineException;
 import org.kohsuke.args4j.CmdLineParser;
 import org.kohsuke.args4j.Option;
@@ -71,7 +79,13 @@ public class SparkPipelineRunner implements PortablePipelineRunner {
 
   @Override
   public PortablePipelineResult run(RunnerApi.Pipeline pipeline, JobInfo jobInfo) {
-    SparkBatchPortablePipelineTranslator translator = new SparkBatchPortablePipelineTranslator();
+    SparkPortablePipelineTranslator translator;
+    boolean isStreaming = pipelineOptions.isStreaming() || hasUnboundedPCollections(pipeline);
+    if (isStreaming) {
+      translator = new SparkStreamingPortablePipelineTranslator();
+    } else {
+      translator = new SparkBatchPortablePipelineTranslator();
+    }
 
     // Expand any splittable DoFns within the graph to enable sizing and splitting of bundles.
     Pipeline pipelineWithSdfExpanded =
@@ -93,6 +107,7 @@ public class SparkPipelineRunner implements PortablePipelineRunner {
             ? trimmedPipeline
             : GreedyPipelineFuser.fuse(trimmedPipeline).toPipeline();
 
+    // File staging.
     if (pipelineOptions.getFilesToStage() == null) {
       pipelineOptions.setFilesToStage(
           detectClassPathResourcesToStage(
@@ -106,30 +121,88 @@ public class SparkPipelineRunner implements PortablePipelineRunner {
         pipelineOptions.getFilesToStage().size());
     LOG.debug("Staging files: {}", pipelineOptions.getFilesToStage());
 
+    PortablePipelineResult result;
     final JavaSparkContext jsc = SparkContextFactory.getSparkContext(pipelineOptions);
+
     LOG.info(String.format("Running job %s on Spark master %s", jobInfo.jobId(), jsc.master()));
-    AggregatorsAccumulator.init(pipelineOptions, jsc);
 
+    // Initialize accumulators.
+    AggregatorsAccumulator.init(pipelineOptions, jsc);
     MetricsEnvironment.setMetricsSupported(true);
     MetricsAccumulator.init(pipelineOptions, jsc);
 
     final SparkTranslationContext context =
-        new SparkTranslationContext(jsc, pipelineOptions, jobInfo);
+        translator.createTranslationContext(jsc, pipelineOptions, jobInfo);
     final ExecutorService executorService = Executors.newSingleThreadExecutor();
-    final Future<?> submissionFuture =
-        executorService.submit(
-            () -> {
-              translator.translate(fusedPipeline, context);
-              LOG.info(
-                  String.format(
-                      "Job %s: Pipeline translated successfully. Computing outputs",
-                      jobInfo.jobId()));
-              context.computeOutputs();
-              LOG.info(String.format("Job %s finished.", jobInfo.jobId()));
-            });
-
-    PortablePipelineResult result =
-        new SparkPipelineResult.PortableBatchMode(submissionFuture, jsc);
+
+    LOG.info(String.format("Running job %s on Spark master %s", jobInfo.jobId(), jsc.master()));
+
+    if (isStreaming) {
+      final JavaStreamingContext jssc =
+          ((SparkStreamingTranslationContext) context).getStreamingContext();
+
+      jssc.addStreamingListener(
+          new JavaStreamingListenerWrapper(
+              new AggregatorsAccumulator.AccumulatorCheckpointingSparkListener()));
+      jssc.addStreamingListener(
+          new JavaStreamingListenerWrapper(
+              new MetricsAccumulator.AccumulatorCheckpointingSparkListener()));
+
+      // Register user-defined listeners.
+      for (JavaStreamingListener listener :
+          pipelineOptions.as(SparkContextOptions.class).getListeners()) {
+        LOG.info("Registered listener {}." + listener.getClass().getSimpleName());
+        jssc.addStreamingListener(new JavaStreamingListenerWrapper(listener));
+      }
+
+      // Register Watermarks listener to broadcast the advanced WMs.
+      jssc.addStreamingListener(
+          new JavaStreamingListenerWrapper(
+              new GlobalWatermarkHolder.WatermarkAdvancingStreamingListener()));
+
+      jssc.checkpoint(pipelineOptions.getCheckpointDir());
+
+      // Obtain timeout from options.
+      Long timeout =
+          pipelineOptions.as(SparkPortableStreamingPipelineOptions.class).getStreamingTimeoutMs();
+
+      final Future<?> submissionFuture =
+          executorService.submit(
+              () -> {
+                translator.translate(fusedPipeline, context);
+                LOG.info(
+                    String.format(
+                        "Job %s: Pipeline translated successfully. Computing outputs",
+                        jobInfo.jobId()));
+                context.computeOutputs();
+
+                jssc.start();
+                try {
+                  jssc.awaitTerminationOrTimeout(timeout);
+                } catch (InterruptedException e) {
+                  LOG.warn("Streaming context interrupted, shutting down.", e);
+                }
+                jssc.stop();
+                LOG.info(String.format("Job %s finished.", jobInfo.jobId()));
+              });
+      result = new SparkPipelineResult.PortableStreamingMode(submissionFuture, jssc);
+    } else {
+      final Future<?> submissionFuture =
+          executorService.submit(
+              () -> {
+                translator.translate(fusedPipeline, context);
+                LOG.info(
+                    String.format(
+                        "Job %s: Pipeline translated successfully. Computing outputs",
+                        jobInfo.jobId()));
+                context.computeOutputs();
+                LOG.info(String.format("Job %s finished.", jobInfo.jobId()));
+              });
+      result = new SparkPipelineResult.PortableBatchMode(submissionFuture, jsc);
+    }
+    executorService.shutdown();
+    result.waitUntilFinish();
+
     MetricsPusher metricsPusher =
         new MetricsPusher(
             MetricsAccumulator.getInstance().value(),
@@ -137,8 +210,6 @@ public class SparkPipelineRunner implements PortablePipelineRunner {
             result);
     metricsPusher.start();
 
-    result.waitUntilFinish();
-    executorService.shutdown();
     return result;
   }
 
diff --git a/runners/spark/src/main/java/org/apache/beam/runners/spark/SparkPortableStreamingPipelineOptions.java b/runners/spark/src/main/java/org/apache/beam/runners/spark/SparkPortableStreamingPipelineOptions.java
new file mode 100644
index 0000000..ef0e3fa
--- /dev/null
+++ b/runners/spark/src/main/java/org/apache/beam/runners/spark/SparkPortableStreamingPipelineOptions.java
@@ -0,0 +1,36 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.runners.spark;
+
+import org.apache.beam.sdk.annotations.Experimental;
+import org.apache.beam.sdk.options.Default;
+import org.apache.beam.sdk.options.Description;
+import org.apache.beam.sdk.options.PortablePipelineOptions;
+
+/** Pipeline options specific to the Spark portable runner running a streaming job. */
+@Experimental
+public interface SparkPortableStreamingPipelineOptions
+    extends SparkPipelineOptions, PortablePipelineOptions {
+  @Description(
+      "Timeout for Spark portable streaming, in milliseconds."
+          + "Default (-1L) is infinity, i.e. no timeout.")
+  @Default.Long(-1L)
+  Long getStreamingTimeoutMs();
+
+  void setStreamingTimeoutMs(Long value);
+}
diff --git a/runners/spark/src/main/java/org/apache/beam/runners/spark/SparkRunnerRegistrar.java b/runners/spark/src/main/java/org/apache/beam/runners/spark/SparkRunnerRegistrar.java
index 1919014..79f2a4e 100644
--- a/runners/spark/src/main/java/org/apache/beam/runners/spark/SparkRunnerRegistrar.java
+++ b/runners/spark/src/main/java/org/apache/beam/runners/spark/SparkRunnerRegistrar.java
@@ -52,7 +52,9 @@ public final class SparkRunnerRegistrar {
     @Override
     public Iterable<Class<? extends PipelineOptions>> getPipelineOptions() {
       return ImmutableList.of(
-          SparkPipelineOptions.class, SparkStructuredStreamingPipelineOptions.class);
+          SparkPipelineOptions.class,
+          SparkStructuredStreamingPipelineOptions.class,
+          SparkPortableStreamingPipelineOptions.class);
     }
   }
 }
diff --git a/runners/spark/src/main/java/org/apache/beam/runners/spark/translation/SparkBatchPortablePipelineTranslator.java b/runners/spark/src/main/java/org/apache/beam/runners/spark/translation/SparkBatchPortablePipelineTranslator.java
index 47e11cc..3de05e3 100644
--- a/runners/spark/src/main/java/org/apache/beam/runners/spark/translation/SparkBatchPortablePipelineTranslator.java
+++ b/runners/spark/src/main/java/org/apache/beam/runners/spark/translation/SparkBatchPortablePipelineTranslator.java
@@ -18,6 +18,10 @@
 package org.apache.beam.runners.spark.translation;
 
 import static org.apache.beam.runners.fnexecution.translation.PipelineTranslatorUtils.createOutputMap;
+import static org.apache.beam.runners.fnexecution.translation.PipelineTranslatorUtils.getExecutableStageIntermediateId;
+import static org.apache.beam.runners.fnexecution.translation.PipelineTranslatorUtils.getInputId;
+import static org.apache.beam.runners.fnexecution.translation.PipelineTranslatorUtils.getOutputId;
+import static org.apache.beam.runners.fnexecution.translation.PipelineTranslatorUtils.getWindowedValueCoder;
 import static org.apache.beam.runners.fnexecution.translation.PipelineTranslatorUtils.getWindowingStrategy;
 import static org.apache.beam.runners.fnexecution.translation.PipelineTranslatorUtils.instantiateCoder;
 
@@ -32,16 +36,14 @@ import java.util.Set;
 import org.apache.beam.model.pipeline.v1.RunnerApi;
 import org.apache.beam.model.pipeline.v1.RunnerApi.Components;
 import org.apache.beam.model.pipeline.v1.RunnerApi.ExecutableStagePayload.SideInputId;
-import org.apache.beam.model.pipeline.v1.RunnerApi.PCollection;
 import org.apache.beam.runners.core.SystemReduceFn;
 import org.apache.beam.runners.core.construction.NativeTransforms;
 import org.apache.beam.runners.core.construction.PTransformTranslation;
 import org.apache.beam.runners.core.construction.graph.ExecutableStage;
 import org.apache.beam.runners.core.construction.graph.PipelineNode;
-import org.apache.beam.runners.core.construction.graph.PipelineNode.PCollectionNode;
 import org.apache.beam.runners.core.construction.graph.PipelineNode.PTransformNode;
 import org.apache.beam.runners.core.construction.graph.QueryablePipeline;
-import org.apache.beam.runners.fnexecution.wire.WireCoders;
+import org.apache.beam.runners.fnexecution.provisioning.JobInfo;
 import org.apache.beam.runners.spark.SparkPipelineOptions;
 import org.apache.beam.runners.spark.metrics.MetricsAccumulator;
 import org.apache.beam.runners.spark.util.ByteArray;
@@ -57,11 +59,11 @@ import org.apache.beam.sdk.values.KV;
 import org.apache.beam.sdk.values.WindowingStrategy;
 import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.collect.BiMap;
 import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.collect.ImmutableMap;
-import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.collect.Iterables;
 import org.apache.spark.HashPartitioner;
 import org.apache.spark.Partitioner;
 import org.apache.spark.api.java.JavaPairRDD;
 import org.apache.spark.api.java.JavaRDD;
+import org.apache.spark.api.java.JavaSparkContext;
 import org.apache.spark.broadcast.Broadcast;
 import org.apache.spark.storage.StorageLevel;
 import org.checkerframework.checker.nullness.qual.Nullable;
@@ -70,7 +72,8 @@ import org.slf4j.LoggerFactory;
 import scala.Tuple2;
 
 /** Translates a bounded portable pipeline into a Spark job. */
-public class SparkBatchPortablePipelineTranslator {
+public class SparkBatchPortablePipelineTranslator
+    implements SparkPortablePipelineTranslator<SparkTranslationContext> {
 
   private static final Logger LOG =
       LoggerFactory.getLogger(SparkBatchPortablePipelineTranslator.class);
@@ -84,6 +87,7 @@ public class SparkBatchPortablePipelineTranslator {
         PTransformNode transformNode, RunnerApi.Pipeline pipeline, SparkTranslationContext context);
   }
 
+  @Override
   public Set<String> knownUrns() {
     return urnToTransformTranslator.keySet();
   }
@@ -108,6 +112,7 @@ public class SparkBatchPortablePipelineTranslator {
   }
 
   /** Translates pipeline from Beam into the Spark context. */
+  @Override
   public void translate(final RunnerApi.Pipeline pipeline, SparkTranslationContext context) {
     QueryablePipeline p =
         QueryablePipeline.forTransforms(
@@ -393,30 +398,10 @@ public class SparkBatchPortablePipelineTranslator {
         : new HashPartitioner(context.getSparkContext().defaultParallelism());
   }
 
-  private static String getInputId(PTransformNode transformNode) {
-    return Iterables.getOnlyElement(transformNode.getTransform().getInputsMap().values());
-  }
-
-  private static String getOutputId(PTransformNode transformNode) {
-    return Iterables.getOnlyElement(transformNode.getTransform().getOutputsMap().values());
-  }
-
-  private static <T> WindowedValueCoder<T> getWindowedValueCoder(
-      String pCollectionId, RunnerApi.Components components) {
-    PCollection pCollection = components.getPcollectionsOrThrow(pCollectionId);
-    PCollectionNode pCollectionNode = PipelineNode.pCollection(pCollectionId, pCollection);
-    WindowedValueCoder<T> coder;
-    try {
-      coder =
-          (WindowedValueCoder) WireCoders.instantiateRunnerWireCoder(pCollectionNode, components);
-    } catch (IOException e) {
-      throw new RuntimeException(e);
-    }
-    return coder;
-  }
-
-  private static String getExecutableStageIntermediateId(PTransformNode transformNode) {
-    return transformNode.getId();
+  @Override
+  public SparkTranslationContext createTranslationContext(
+      JavaSparkContext jsc, SparkPipelineOptions options, JobInfo jobInfo) {
+    return new SparkTranslationContext(jsc, options, jobInfo);
   }
 
   /** Predicate to determine whether a URN is a Spark native transform. */
diff --git a/runners/spark/src/main/java/org/apache/beam/runners/spark/translation/SparkExecutableStageFunction.java b/runners/spark/src/main/java/org/apache/beam/runners/spark/translation/SparkExecutableStageFunction.java
index 233e095..d28b9bd 100644
--- a/runners/spark/src/main/java/org/apache/beam/runners/spark/translation/SparkExecutableStageFunction.java
+++ b/runners/spark/src/main/java/org/apache/beam/runners/spark/translation/SparkExecutableStageFunction.java
@@ -19,6 +19,7 @@ package org.apache.beam.runners.spark.translation;
 
 import java.io.IOException;
 import java.io.Serializable;
+import java.util.Collections;
 import java.util.EnumMap;
 import java.util.Iterator;
 import java.util.List;
@@ -118,6 +119,12 @@ class SparkExecutableStageFunction<InputT, SideInputT>
 
   @Override
   public Iterator<RawUnionValue> call(Iterator<WindowedValue<InputT>> inputs) throws Exception {
+    // Do not call processElements if there are no inputs
+    // Otherwise, this may cause validation errors (e.g. ParDoTest)
+    if (!inputs.hasNext()) {
+      return Collections.emptyIterator();
+    }
+
     try (ExecutableStageContext stageContext = contextFactory.get(jobInfo)) {
       ExecutableStage executableStage = ExecutableStage.fromPayload(stagePayload);
       try (StageBundleFactory stageBundleFactory =
diff --git a/runners/spark/src/main/java/org/apache/beam/runners/spark/translation/SparkPortablePipelineTranslator.java b/runners/spark/src/main/java/org/apache/beam/runners/spark/translation/SparkPortablePipelineTranslator.java
new file mode 100644
index 0000000..3dc885a
--- /dev/null
+++ b/runners/spark/src/main/java/org/apache/beam/runners/spark/translation/SparkPortablePipelineTranslator.java
@@ -0,0 +1,38 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.runners.spark.translation;
+
+import java.util.Set;
+import org.apache.beam.model.pipeline.v1.RunnerApi;
+import org.apache.beam.runners.fnexecution.provisioning.JobInfo;
+import org.apache.beam.runners.spark.SparkPipelineOptions;
+import org.apache.spark.api.java.JavaSparkContext;
+
+/**
+ * Interface for portable Spark translators. This allows for a uniform invocation pattern for
+ * pipeline translation between streaming and batch runners.
+ */
+public interface SparkPortablePipelineTranslator<T extends SparkTranslationContext> {
+
+  Set<String> knownUrns();
+
+  /** Translates the given pipeline. */
+  void translate(RunnerApi.Pipeline pipeline, T context);
+
+  T createTranslationContext(JavaSparkContext jsc, SparkPipelineOptions options, JobInfo jobInfo);
+}
diff --git a/runners/spark/src/main/java/org/apache/beam/runners/spark/translation/SparkStreamingPortablePipelineTranslator.java b/runners/spark/src/main/java/org/apache/beam/runners/spark/translation/SparkStreamingPortablePipelineTranslator.java
new file mode 100644
index 0000000..48312ad
--- /dev/null
+++ b/runners/spark/src/main/java/org/apache/beam/runners/spark/translation/SparkStreamingPortablePipelineTranslator.java
@@ -0,0 +1,360 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.runners.spark.translation;
+
+import static org.apache.beam.runners.fnexecution.translation.PipelineTranslatorUtils.createOutputMap;
+import static org.apache.beam.runners.fnexecution.translation.PipelineTranslatorUtils.getExecutableStageIntermediateId;
+import static org.apache.beam.runners.fnexecution.translation.PipelineTranslatorUtils.getInputId;
+import static org.apache.beam.runners.fnexecution.translation.PipelineTranslatorUtils.getOutputId;
+import static org.apache.beam.runners.fnexecution.translation.PipelineTranslatorUtils.getWindowedValueCoder;
+import static org.apache.beam.runners.fnexecution.translation.PipelineTranslatorUtils.getWindowingStrategy;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Queue;
+import java.util.Set;
+import java.util.concurrent.LinkedBlockingQueue;
+import org.apache.beam.model.pipeline.v1.RunnerApi;
+import org.apache.beam.runners.core.construction.PTransformTranslation;
+import org.apache.beam.runners.core.construction.graph.ExecutableStage;
+import org.apache.beam.runners.core.construction.graph.PipelineNode;
+import org.apache.beam.runners.core.construction.graph.PipelineNode.PTransformNode;
+import org.apache.beam.runners.core.construction.graph.QueryablePipeline;
+import org.apache.beam.runners.fnexecution.provisioning.JobInfo;
+import org.apache.beam.runners.spark.SparkPipelineOptions;
+import org.apache.beam.runners.spark.coders.CoderHelpers;
+import org.apache.beam.runners.spark.metrics.MetricsAccumulator;
+import org.apache.beam.runners.spark.stateful.SparkGroupAlsoByWindowViaWindowSet;
+import org.apache.beam.runners.spark.translation.streaming.UnboundedDataset;
+import org.apache.beam.runners.spark.util.GlobalWatermarkHolder;
+import org.apache.beam.runners.spark.util.SparkCompat;
+import org.apache.beam.sdk.coders.ByteArrayCoder;
+import org.apache.beam.sdk.coders.Coder;
+import org.apache.beam.sdk.coders.KvCoder;
+import org.apache.beam.sdk.transforms.join.RawUnionValue;
+import org.apache.beam.sdk.transforms.windowing.BoundedWindow;
+import org.apache.beam.sdk.transforms.windowing.GlobalWindow;
+import org.apache.beam.sdk.transforms.windowing.PaneInfo;
+import org.apache.beam.sdk.transforms.windowing.WindowFn;
+import org.apache.beam.sdk.util.WindowedValue;
+import org.apache.beam.sdk.values.KV;
+import org.apache.beam.sdk.values.WindowingStrategy;
+import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.collect.BiMap;
+import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.collect.ImmutableMap;
+import org.apache.spark.api.java.JavaRDD;
+import org.apache.spark.api.java.JavaSparkContext;
+import org.apache.spark.broadcast.Broadcast;
+import org.apache.spark.storage.StorageLevel;
+import org.apache.spark.streaming.api.java.JavaDStream;
+import org.apache.spark.streaming.api.java.JavaInputDStream;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import scala.Tuple2;
+
+/** Translates an unbounded portable pipeline into a Spark job. */
+public class SparkStreamingPortablePipelineTranslator
+    implements SparkPortablePipelineTranslator<SparkStreamingTranslationContext> {
+
+  private static final Logger LOG =
+      LoggerFactory.getLogger(SparkStreamingPortablePipelineTranslator.class);
+
+  private final ImmutableMap<String, PTransformTranslator> urnToTransformTranslator;
+
+  interface PTransformTranslator {
+
+    /** Translates transformNode from Beam into the Spark context. */
+    void translate(
+        PTransformNode transformNode,
+        RunnerApi.Pipeline pipeline,
+        SparkStreamingTranslationContext context);
+  }
+
+  @Override
+  public Set<String> knownUrns() {
+    return urnToTransformTranslator.keySet();
+  }
+
+  public SparkStreamingPortablePipelineTranslator() {
+    ImmutableMap.Builder<String, PTransformTranslator> translatorMap = ImmutableMap.builder();
+    translatorMap.put(
+        PTransformTranslation.IMPULSE_TRANSFORM_URN,
+        SparkStreamingPortablePipelineTranslator::translateImpulse);
+    translatorMap.put(
+        PTransformTranslation.GROUP_BY_KEY_TRANSFORM_URN,
+        SparkStreamingPortablePipelineTranslator::translateGroupByKey);
+    translatorMap.put(
+        ExecutableStage.URN, SparkStreamingPortablePipelineTranslator::translateExecutableStage);
+    translatorMap.put(
+        PTransformTranslation.FLATTEN_TRANSFORM_URN,
+        SparkStreamingPortablePipelineTranslator::translateFlatten);
+    translatorMap.put(
+        PTransformTranslation.RESHUFFLE_URN,
+        SparkStreamingPortablePipelineTranslator::translateReshuffle);
+    this.urnToTransformTranslator = translatorMap.build();
+  }
+
+  /** Translates pipeline from Beam into the Spark context. */
+  @Override
+  public void translate(
+      final RunnerApi.Pipeline pipeline, SparkStreamingTranslationContext context) {
+    QueryablePipeline p =
+        QueryablePipeline.forTransforms(
+            pipeline.getRootTransformIdsList(), pipeline.getComponents());
+    for (PipelineNode.PTransformNode transformNode : p.getTopologicallyOrderedTransforms()) {
+      urnToTransformTranslator
+          .getOrDefault(
+              transformNode.getTransform().getSpec().getUrn(),
+              SparkStreamingPortablePipelineTranslator::urnNotFound)
+          .translate(transformNode, pipeline, context);
+    }
+  }
+
+  private static void urnNotFound(
+      PTransformNode transformNode,
+      RunnerApi.Pipeline pipeline,
+      SparkStreamingTranslationContext context) {
+    throw new IllegalArgumentException(
+        String.format(
+            "Transform %s has unknown URN %s",
+            transformNode.getId(), transformNode.getTransform().getSpec().getUrn()));
+  }
+
+  private static void translateImpulse(
+      PTransformNode transformNode,
+      RunnerApi.Pipeline pipeline,
+      SparkStreamingTranslationContext context) {
+
+    Iterable<WindowedValue<byte[]>> windowedValues =
+        Collections.singletonList(
+            WindowedValue.of(
+                new byte[0],
+                BoundedWindow.TIMESTAMP_MIN_VALUE,
+                GlobalWindow.INSTANCE,
+                PaneInfo.NO_FIRING));
+
+    WindowedValue.FullWindowedValueCoder<byte[]> windowCoder =
+        WindowedValue.FullWindowedValueCoder.of(ByteArrayCoder.of(), GlobalWindow.Coder.INSTANCE);
+    JavaRDD<WindowedValue<byte[]>> emptyByteArrayRDD =
+        context
+            .getSparkContext()
+            .parallelize(CoderHelpers.toByteArrays(windowedValues, windowCoder))
+            .map(CoderHelpers.fromByteFunction(windowCoder));
+
+    Queue<JavaRDD<WindowedValue<byte[]>>> rddQueue = new LinkedBlockingQueue<>();
+    rddQueue.offer(emptyByteArrayRDD);
+    JavaInputDStream<WindowedValue<byte[]>> emptyByteArrayStream =
+        context.getStreamingContext().queueStream(rddQueue, true /* oneAtATime */);
+
+    UnboundedDataset<byte[]> output =
+        new UnboundedDataset<>(
+            emptyByteArrayStream,
+            Collections.singletonList(emptyByteArrayStream.inputDStream().id()));
+
+    // Add watermark to holder and advance to infinity to ensure future watermarks can be updated
+    GlobalWatermarkHolder.SparkWatermarks sparkWatermark =
+        new GlobalWatermarkHolder.SparkWatermarks(
+            GlobalWindow.INSTANCE.maxTimestamp(),
+            BoundedWindow.TIMESTAMP_MAX_VALUE,
+            context.getFirstTimestamp());
+    GlobalWatermarkHolder.add(output.getStreamSources().get(0), sparkWatermark);
+
+    context.pushDataset(getOutputId(transformNode), output);
+  }
+
+  private static <K, V> void translateGroupByKey(
+      PTransformNode transformNode,
+      RunnerApi.Pipeline pipeline,
+      SparkStreamingTranslationContext context) {
+    RunnerApi.Components components = pipeline.getComponents();
+    String inputId = getInputId(transformNode);
+    UnboundedDataset<KV<K, V>> inputDataset =
+        (UnboundedDataset<KV<K, V>>) context.popDataset(inputId);
+    List<Integer> streamSources = inputDataset.getStreamSources();
+    WindowedValue.WindowedValueCoder<KV<K, V>> inputCoder =
+        getWindowedValueCoder(inputId, components);
+    KvCoder<K, V> inputKvCoder = (KvCoder<K, V>) inputCoder.getValueCoder();
+    WindowingStrategy windowingStrategy = getWindowingStrategy(inputId, components);
+    WindowFn<Object, BoundedWindow> windowFn = windowingStrategy.getWindowFn();
+    WindowedValue.WindowedValueCoder<V> wvCoder =
+        WindowedValue.FullWindowedValueCoder.of(
+            inputKvCoder.getValueCoder(), windowFn.windowCoder());
+
+    JavaDStream<WindowedValue<KV<K, Iterable<V>>>> outStream =
+        SparkGroupAlsoByWindowViaWindowSet.groupByKeyAndWindow(
+            inputDataset.getDStream(),
+            inputKvCoder.getKeyCoder(),
+            wvCoder,
+            windowingStrategy,
+            context.getSerializableOptions(),
+            streamSources,
+            transformNode.getId());
+
+    context.pushDataset(
+        getOutputId(transformNode), new UnboundedDataset<>(outStream, streamSources));
+  }
+
+  private static <InputT, OutputT, SideInputT> void translateExecutableStage(
+      PTransformNode transformNode,
+      RunnerApi.Pipeline pipeline,
+      SparkStreamingTranslationContext context) {
+    RunnerApi.ExecutableStagePayload stagePayload;
+    try {
+      stagePayload =
+          RunnerApi.ExecutableStagePayload.parseFrom(
+              transformNode.getTransform().getSpec().getPayload());
+    } catch (IOException e) {
+      throw new RuntimeException(e);
+    }
+
+    String inputPCollectionId = stagePayload.getInput();
+    UnboundedDataset<InputT> inputDataset =
+        (UnboundedDataset<InputT>) context.popDataset(inputPCollectionId);
+    List<Integer> streamSources = inputDataset.getStreamSources();
+    JavaDStream<WindowedValue<InputT>> inputDStream = inputDataset.getDStream();
+    Map<String, String> outputs = transformNode.getTransform().getOutputsMap();
+    BiMap<String, Integer> outputMap = createOutputMap(outputs.values());
+
+    RunnerApi.Components components = pipeline.getComponents();
+    Coder windowCoder =
+        getWindowingStrategy(inputPCollectionId, components).getWindowFn().windowCoder();
+
+    // TODO (BEAM-10712): handle side inputs.
+    if (stagePayload.getSideInputsCount() > 0) {
+      throw new UnsupportedOperationException(
+          "Side inputs to executable stage are currently unsupported.");
+    }
+    ImmutableMap<
+            String, Tuple2<Broadcast<List<byte[]>>, WindowedValue.WindowedValueCoder<SideInputT>>>
+        broadcastVariables = ImmutableMap.copyOf(new HashMap<>());
+
+    SparkExecutableStageFunction<InputT, SideInputT> function =
+        new SparkExecutableStageFunction<>(
+            stagePayload,
+            context.jobInfo,
+            outputMap,
+            SparkExecutableStageContextFactory.getInstance(),
+            broadcastVariables,
+            MetricsAccumulator.getInstance(),
+            windowCoder);
+    JavaDStream<RawUnionValue> staged = inputDStream.mapPartitions(function);
+
+    String intermediateId = getExecutableStageIntermediateId(transformNode);
+    context.pushDataset(
+        intermediateId,
+        new Dataset() {
+          @Override
+          public void cache(String storageLevel, Coder<?> coder) {
+            StorageLevel level = StorageLevel.fromString(storageLevel);
+            staged.persist(level);
+          }
+
+          @Override
+          public void action() {
+            // Empty function to force computation of RDD.
+            staged.foreachRDD(TranslationUtils.emptyVoidFunction());
+          }
+
+          @Override
+          public void setName(String name) {
+            // ignore
+          }
+        });
+    // Pop dataset to mark DStream as used
+    context.popDataset(intermediateId);
+
+    for (String outputId : outputs.values()) {
+      JavaDStream<WindowedValue<OutputT>> outStream =
+          staged.flatMap(new SparkExecutableStageExtractionFunction<>(outputMap.get(outputId)));
+      context.pushDataset(outputId, new UnboundedDataset<>(outStream, streamSources));
+    }
+
+    // Add sink to ensure stage is executed
+    if (outputs.isEmpty()) {
+      JavaDStream<WindowedValue<OutputT>> outStream =
+          staged.flatMap((rawUnionValue) -> Collections.emptyIterator());
+      context.pushDataset(
+          String.format("EmptyOutputSink_%d", context.nextSinkId()),
+          new UnboundedDataset<>(outStream, streamSources));
+    }
+  }
+
+  private static <T> void translateFlatten(
+      PTransformNode transformNode,
+      RunnerApi.Pipeline pipeline,
+      SparkStreamingTranslationContext context) {
+    Map<String, String> inputsMap = transformNode.getTransform().getInputsMap();
+    JavaDStream<WindowedValue<T>> unifiedStreams;
+    List<Integer> streamSources = new ArrayList<>();
+
+    if (inputsMap.isEmpty()) {
+      Queue<JavaRDD<WindowedValue<T>>> q = new LinkedBlockingQueue<>();
+      q.offer(context.getSparkContext().emptyRDD());
+      unifiedStreams = context.getStreamingContext().queueStream(q);
+    } else {
+      List<JavaDStream<WindowedValue<T>>> dStreams = new ArrayList<>();
+      for (String inputId : inputsMap.values()) {
+        Dataset dataset = context.popDataset(inputId);
+        if (dataset instanceof UnboundedDataset) {
+          UnboundedDataset<T> unboundedDataset = (UnboundedDataset<T>) dataset;
+          streamSources.addAll(unboundedDataset.getStreamSources());
+          dStreams.add(unboundedDataset.getDStream());
+        } else {
+          // create a single RDD stream.
+          Queue<JavaRDD<WindowedValue<T>>> q = new LinkedBlockingQueue<>();
+          q.offer(((BoundedDataset) dataset).getRDD());
+          // TODO (BEAM-10789): this is not recoverable from checkpoint!
+          JavaDStream<WindowedValue<T>> dStream = context.getStreamingContext().queueStream(q);
+          dStreams.add(dStream);
+        }
+      }
+      // Unify streams into a single stream.
+      unifiedStreams = SparkCompat.joinStreams(context.getStreamingContext(), dStreams);
+    }
+
+    context.pushDataset(
+        getOutputId(transformNode), new UnboundedDataset<>(unifiedStreams, streamSources));
+  }
+
+  private static <T> void translateReshuffle(
+      PTransformNode transformNode,
+      RunnerApi.Pipeline pipeline,
+      SparkStreamingTranslationContext context) {
+    String inputId = getInputId(transformNode);
+    UnboundedDataset<T> inputDataset = (UnboundedDataset<T>) context.popDataset(inputId);
+    List<Integer> streamSources = inputDataset.getStreamSources();
+    JavaDStream<WindowedValue<T>> dStream = inputDataset.getDStream();
+    WindowedValue.WindowedValueCoder<T> coder =
+        getWindowedValueCoder(inputId, pipeline.getComponents());
+
+    JavaDStream<WindowedValue<T>> reshuffledStream =
+        dStream.transform(rdd -> GroupCombineFunctions.reshuffle(rdd, coder));
+
+    context.pushDataset(
+        getOutputId(transformNode), new UnboundedDataset<>(reshuffledStream, streamSources));
+  }
+
+  @Override
+  public SparkStreamingTranslationContext createTranslationContext(
+      JavaSparkContext jsc, SparkPipelineOptions options, JobInfo jobInfo) {
+    return new SparkStreamingTranslationContext(jsc, options, jobInfo);
+  }
+}
diff --git a/runners/spark/src/main/java/org/apache/beam/runners/spark/translation/SparkStreamingTranslationContext.java b/runners/spark/src/main/java/org/apache/beam/runners/spark/translation/SparkStreamingTranslationContext.java
new file mode 100644
index 0000000..7e20b21
--- /dev/null
+++ b/runners/spark/src/main/java/org/apache/beam/runners/spark/translation/SparkStreamingTranslationContext.java
@@ -0,0 +1,50 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.runners.spark.translation;
+
+import org.apache.beam.runners.fnexecution.provisioning.JobInfo;
+import org.apache.beam.runners.spark.SparkPipelineOptions;
+import org.apache.spark.api.java.JavaSparkContext;
+import org.apache.spark.streaming.Duration;
+import org.apache.spark.streaming.api.java.JavaStreamingContext;
+import org.joda.time.Instant;
+
+/**
+ * Translation context used to lazily store Spark datasets during streaming portable pipeline
+ * translation and compute them after translation.
+ */
+public class SparkStreamingTranslationContext extends SparkTranslationContext {
+  private final JavaStreamingContext streamingContext;
+  private final Instant firstTimestamp;
+
+  public SparkStreamingTranslationContext(
+      JavaSparkContext jsc, SparkPipelineOptions options, JobInfo jobInfo) {
+    super(jsc, options, jobInfo);
+    Duration batchDuration = new Duration(options.getBatchIntervalMillis());
+    this.streamingContext = new JavaStreamingContext(jsc, batchDuration);
+    this.firstTimestamp = new Instant();
+  }
+
+  public JavaStreamingContext getStreamingContext() {
+    return streamingContext;
+  }
+
+  public Instant getFirstTimestamp() {
+    return firstTimestamp;
+  }
+}
diff --git a/runners/spark/src/main/java/org/apache/beam/runners/spark/translation/SparkTranslationContext.java b/runners/spark/src/main/java/org/apache/beam/runners/spark/translation/SparkTranslationContext.java
index d7cc5e9..a96a784 100644
--- a/runners/spark/src/main/java/org/apache/beam/runners/spark/translation/SparkTranslationContext.java
+++ b/runners/spark/src/main/java/org/apache/beam/runners/spark/translation/SparkTranslationContext.java
@@ -55,6 +55,10 @@ public class SparkTranslationContext {
     return jsc;
   }
 
+  public SerializablePipelineOptions getSerializableOptions() {
+    return serializablePipelineOptions;
+  }
+
   /** Add output of transform to context. */
   public void pushDataset(String pCollectionId, Dataset dataset) {
     dataset.setName(pCollectionId);
diff --git a/runners/spark/src/main/java/org/apache/beam/runners/spark/translation/streaming/StreamingTransformTranslator.java b/runners/spark/src/main/java/org/apache/beam/runners/spark/translation/streaming/StreamingTransformTranslator.java
index 6ce3644..9b45f05 100644
--- a/runners/spark/src/main/java/org/apache/beam/runners/spark/translation/streaming/StreamingTransformTranslator.java
+++ b/runners/spark/src/main/java/org/apache/beam/runners/spark/translation/streaming/StreamingTransformTranslator.java
@@ -237,7 +237,7 @@ public final class StreamingTransformTranslator {
             // create a single RDD stream.
             Queue<JavaRDD<WindowedValue<T>>> q = new LinkedBlockingQueue<>();
             q.offer(((BoundedDataset) dataset).getRDD());
-            // TODO: this is not recoverable from checkpoint!
+            // TODO (BEAM-10789): this is not recoverable from checkpoint!
             JavaDStream<WindowedValue<T>> dStream = context.getStreamingContext().queueStream(q);
             dStreams.add(dStream);
           }
diff --git a/runners/spark/src/main/java/org/apache/beam/runners/spark/translation/streaming/UnboundedDataset.java b/runners/spark/src/main/java/org/apache/beam/runners/spark/translation/streaming/UnboundedDataset.java
index f59ee2b..ec7e00d 100644
--- a/runners/spark/src/main/java/org/apache/beam/runners/spark/translation/streaming/UnboundedDataset.java
+++ b/runners/spark/src/main/java/org/apache/beam/runners/spark/translation/streaming/UnboundedDataset.java
@@ -48,11 +48,11 @@ public class UnboundedDataset<T> implements Dataset {
     this.streamSources.addAll(streamSources);
   }
 
-  JavaDStream<WindowedValue<T>> getDStream() {
+  public JavaDStream<WindowedValue<T>> getDStream() {
     return dStream;
   }
 
-  List<Integer> getStreamSources() {
+  public List<Integer> getStreamSources() {
     return streamSources;
   }
 
diff --git a/runners/spark/src/test/java/org/apache/beam/runners/spark/SparkRunnerRegistrarTest.java b/runners/spark/src/test/java/org/apache/beam/runners/spark/SparkRunnerRegistrarTest.java
index 6b273d0..652bb8f 100644
--- a/runners/spark/src/test/java/org/apache/beam/runners/spark/SparkRunnerRegistrarTest.java
+++ b/runners/spark/src/test/java/org/apache/beam/runners/spark/SparkRunnerRegistrarTest.java
@@ -37,7 +37,10 @@ public class SparkRunnerRegistrarTest {
   @Test
   public void testOptions() {
     assertEquals(
-        ImmutableList.of(SparkPipelineOptions.class, SparkStructuredStreamingPipelineOptions.class),
+        ImmutableList.of(
+            SparkPipelineOptions.class,
+            SparkStructuredStreamingPipelineOptions.class,
+            SparkPortableStreamingPipelineOptions.class),
         new SparkRunnerRegistrar.Options().getPipelineOptions());
   }
 
diff --git a/runners/spark/src/test/java/org/apache/beam/runners/spark/translation/SparkExecutableStageFunctionTest.java b/runners/spark/src/test/java/org/apache/beam/runners/spark/translation/SparkExecutableStageFunctionTest.java
index 061f5c0..39e28be 100644
--- a/runners/spark/src/test/java/org/apache/beam/runners/spark/translation/SparkExecutableStageFunctionTest.java
+++ b/runners/spark/src/test/java/org/apache/beam/runners/spark/translation/SparkExecutableStageFunctionTest.java
@@ -24,11 +24,14 @@ import static org.mockito.Matchers.any;
 import static org.mockito.Mockito.doThrow;
 import static org.mockito.Mockito.verify;
 import static org.mockito.Mockito.verifyNoMoreInteractions;
+import static org.mockito.Mockito.verifyZeroInteractions;
 import static org.mockito.Mockito.when;
 
+import java.util.ArrayList;
 import java.util.Arrays;
 import java.util.Collections;
 import java.util.Iterator;
+import java.util.List;
 import java.util.Map;
 import org.apache.beam.model.pipeline.v1.RunnerApi;
 import org.apache.beam.model.pipeline.v1.RunnerApi.Components;
@@ -101,7 +104,9 @@ public class SparkExecutableStageFunctionTest {
   public void sdkErrorsSurfaceOnClose() throws Exception {
     SparkExecutableStageFunction<Integer, ?> function = getFunction(Collections.emptyMap());
     doThrow(new Exception()).when(remoteBundle).close();
-    function.call(Collections.emptyIterator());
+    List<WindowedValue<Integer>> inputs = new ArrayList<>();
+    inputs.add(WindowedValue.valueInGlobalWindow(0));
+    function.call(inputs.iterator());
   }
 
   @Test
@@ -205,7 +210,9 @@ public class SparkExecutableStageFunctionTest {
     when(stageContext.getStageBundleFactory(any())).thenReturn(stageBundleFactory);
 
     SparkExecutableStageFunction<Integer, ?> function = getFunction(outputTagMap);
-    Iterator<RawUnionValue> iterator = function.call(Collections.emptyIterator());
+    List<WindowedValue<Integer>> inputs = new ArrayList<>();
+    inputs.add(WindowedValue.valueInGlobalWindow(0));
+    Iterator<RawUnionValue> iterator = function.call(inputs.iterator());
     Iterable<RawUnionValue> iterable = () -> iterator;
 
     assertThat(
@@ -217,13 +224,22 @@ public class SparkExecutableStageFunctionTest {
   @Test
   public void testStageBundleClosed() throws Exception {
     SparkExecutableStageFunction<Integer, ?> function = getFunction(Collections.emptyMap());
-    function.call(Collections.emptyIterator());
+    List<WindowedValue<Integer>> inputs = new ArrayList<>();
+    inputs.add(WindowedValue.valueInGlobalWindow(0));
+    function.call(inputs.iterator());
     verify(stageBundleFactory).getBundle(any(), any(), any(), any());
     verify(stageBundleFactory).getProcessBundleDescriptor();
     verify(stageBundleFactory).close();
     verifyNoMoreInteractions(stageBundleFactory);
   }
 
+  @Test
+  public void testNoCallOnEmptyInputIterator() throws Exception {
+    SparkExecutableStageFunction<Integer, ?> function = getFunction(Collections.emptyMap());
+    function.call(Collections.emptyIterator());
+    verifyZeroInteractions(stageBundleFactory);
+  }
+
   private <InputT, SideInputT> SparkExecutableStageFunction<InputT, SideInputT> getFunction(
       Map<String, Integer> outputMap) {
     return new SparkExecutableStageFunction<>(