You are viewing a plain text version of this content. The canonical link for it is here.
Posted to github@beam.apache.org by GitBox <gi...@apache.org> on 2020/07/01 22:28:43 UTC

[GitHub] [beam] annaqin418 opened a new pull request #12157: [BEAM-7587] Spark portable streaming

annaqin418 opened a new pull request #12157:
URL: https://github.com/apache/beam/pull/12157


   These changes will allow streaming data on the portable Spark runner. 
   
   ------------------------
   
   Thank you for your contribution! Follow this checklist to help us incorporate your contribution quickly and easily:
   
    - [ ] [**Choose reviewer(s)**](https://beam.apache.org/contribute/#make-your-change) and mention them in a comment (`R: @username`).
    - [ ] Format the pull request title like `[BEAM-XXX] Fixes bug in ApproximateQuantiles`, where you replace `BEAM-XXX` with the appropriate JIRA issue, if applicable. This will automatically link the pull request to the issue.
    - [ ] Update `CHANGES.md` with noteworthy changes.
    - [ ] If this contribution is large, please file an Apache [Individual Contributor License Agreement](https://www.apache.org/licenses/icla.pdf).
   
   See the [Contributor Guide](https://beam.apache.org/contribute) for more tips on [how to make review process smoother](https://beam.apache.org/contribute/#make-reviewers-job-easier).
   
   Post-Commit Tests Status (on master branch)
   ------------------------------------------------------------------------------------------------
   
   Lang | SDK | Dataflow | Flink | Samza | Spark | Twister2
   --- | --- | --- | --- | --- | --- | ---
   Go | [![Build Status](https://ci-beam.apache.org/job/beam_PostCommit_Go/lastCompletedBuild/badge/icon)](https://ci-beam.apache.org/job/beam_PostCommit_Go/lastCompletedBuild/) | --- | [![Build Status](https://ci-beam.apache.org/job/beam_PostCommit_Go_VR_Flink/lastCompletedBuild/badge/icon)](https://ci-beam.apache.org/job/beam_PostCommit_Go_VR_Flink/lastCompletedBuild/) | --- | [![Build Status](https://ci-beam.apache.org/job/beam_PostCommit_Go_VR_Spark/lastCompletedBuild/badge/icon)](https://ci-beam.apache.org/job/beam_PostCommit_Go_VR_Spark/lastCompletedBuild/) | ---
   Java | [![Build Status](https://ci-beam.apache.org/job/beam_PostCommit_Java/lastCompletedBuild/badge/icon)](https://ci-beam.apache.org/job/beam_PostCommit_Java/lastCompletedBuild/) | [![Build Status](https://ci-beam.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Dataflow/lastCompletedBuild/badge/icon)](https://ci-beam.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Dataflow/lastCompletedBuild/)<br>[![Build Status](https://ci-beam.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Dataflow_Java11/lastCompletedBuild/badge/icon)](https://ci-beam.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Dataflow_Java11/lastCompletedBuild/) | [![Build Status](https://ci-beam.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Flink/lastCompletedBuild/badge/icon)](https://ci-beam.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Flink/lastCompletedBuild/)<br>[![Build Status](https://ci-beam.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Flink_Java11/lastCompletedBuild/badge/icon)](https://ci-beam.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Flink_Java11/lastCompletedBuild/)<br>[![Build Status](https://ci-beam.apache.org/job/beam_PostCommit_Java_PVR_Flink_Batch/lastCompletedBuild/badge/icon)](https://ci-beam.apache.org/job/beam_PostCommit_Java_PVR_Flink_Batch/lastCompletedBuild/)<br>[![Build Status](https://ci-beam.apache.org/job/beam_PostCommit_Java_PVR_Flink_Streaming/lastCompletedBuild/badge/icon)](https://ci-beam.apache.org/job/beam_PostCommit_Java_PVR_Flink_Streaming/lastCompletedBuild/) | [![Build Status](https://ci-beam.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Samza/lastCompletedBuild/badge/icon)](https://ci-beam.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Samza/lastCompletedBuild/) | [![Build Status](https://ci-beam.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Spark/lastCompletedBuild/badge/icon)](https://ci-beam.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Spark/lastCompletedBuild/)<br>[![Build Status](https://ci-beam.apache.org/job/beam_PostCommit_Java_PVR_Spark_Batch/lastCompletedBuild/badge/icon)](https://ci-beam.apache.org/job/beam_PostCommit_Java_PVR_Spark_Batch/lastCompletedBuild/)<br>[![Build Status](https://ci-beam.apache.org/job/beam_PostCommit_Java_ValidatesRunner_SparkStructuredStreaming/lastCompletedBuild/badge/icon)](https://ci-beam.apache.org/job/beam_PostCommit_Java_ValidatesRunner_SparkStructuredStreaming/lastCompletedBuild/) | [![Build Status](https://builds.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Twister2/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Twister2/lastCompletedBuild/)
   Python | [![Build Status](https://ci-beam.apache.org/job/beam_PostCommit_Python2/lastCompletedBuild/badge/icon)](https://ci-beam.apache.org/job/beam_PostCommit_Python2/lastCompletedBuild/)<br>[![Build Status](https://ci-beam.apache.org/job/beam_PostCommit_Python35/lastCompletedBuild/badge/icon)](https://ci-beam.apache.org/job/beam_PostCommit_Python35/lastCompletedBuild/)<br>[![Build Status](https://ci-beam.apache.org/job/beam_PostCommit_Python36/lastCompletedBuild/badge/icon)](https://ci-beam.apache.org/job/beam_PostCommit_Python36/lastCompletedBuild/)<br>[![Build Status](https://ci-beam.apache.org/job/beam_PostCommit_Python37/lastCompletedBuild/badge/icon)](https://ci-beam.apache.org/job/beam_PostCommit_Python37/lastCompletedBuild/) | [![Build Status](https://ci-beam.apache.org/job/beam_PostCommit_Py_VR_Dataflow/lastCompletedBuild/badge/icon)](https://ci-beam.apache.org/job/beam_PostCommit_Py_VR_Dataflow/lastCompletedBuild/)<br>[![Build Status](https://ci-beam.apache.org/job/beam_PostCommit_Py_VR_Dataflow_V2/lastCompletedBuild/badge/icon)](https://ci-beam.apache.org/job/beam_PostCommit_Py_VR_Dataflow_V2/lastCompletedBuild/)<br>[![Build Status](https://ci-beam.apache.org/job/beam_PostCommit_Py_ValCont/lastCompletedBuild/badge/icon)](https://ci-beam.apache.org/job/beam_PostCommit_Py_ValCont/lastCompletedBuild/) | [![Build Status](https://ci-beam.apache.org/job/beam_PreCommit_Python2_PVR_Flink_Cron/lastCompletedBuild/badge/icon)](https://ci-beam.apache.org/job/beam_PreCommit_Python2_PVR_Flink_Cron/lastCompletedBuild/)<br>[![Build Status](https://ci-beam.apache.org/job/beam_PostCommit_Python35_VR_Flink/lastCompletedBuild/badge/icon)](https://ci-beam.apache.org/job/beam_PostCommit_Python35_VR_Flink/lastCompletedBuild/) | --- | [![Build Status](https://ci-beam.apache.org/job/beam_PostCommit_Python_VR_Spark/lastCompletedBuild/badge/icon)](https://ci-beam.apache.org/job/beam_PostCommit_Python_VR_Spark/lastCompletedBuild/) | ---
   XLang | --- | --- | [![Build Status](https://ci-beam.apache.org/job/beam_PostCommit_XVR_Flink/lastCompletedBuild/badge/icon)](https://ci-beam.apache.org/job/beam_PostCommit_XVR_Flink/lastCompletedBuild/) | --- | --- | [![Build Status](https://ci-beam.apache.org/job/beam_PostCommit_XVR_Spark/lastCompletedBuild/badge/icon)](https://ci-beam.apache.org/job/beam_PostCommit_XVR_Spark/lastCompletedBuild/) | ---
   
   Pre-Commit Tests Status (on master branch)
   ------------------------------------------------------------------------------------------------
   
   --- |Java | Python | Go | Website
   --- | --- | --- | --- | ---
   Non-portable | [![Build Status](https://ci-beam.apache.org/job/beam_PreCommit_Java_Cron/lastCompletedBuild/badge/icon)](https://ci-beam.apache.org/job/beam_PreCommit_Java_Cron/lastCompletedBuild/) | [![Build Status](https://ci-beam.apache.org/job/beam_PreCommit_Python_Cron/lastCompletedBuild/badge/icon)](https://ci-beam.apache.org/job/beam_PreCommit_Python_Cron/lastCompletedBuild/)<br>[![Build Status](https://ci-beam.apache.org/job/beam_PreCommit_PythonLint_Cron/lastCompletedBuild/badge/icon)](https://ci-beam.apache.org/job/beam_PreCommit_PythonLint_Cron/lastCompletedBuild/) | [![Build Status](https://ci-beam.apache.org/job/beam_PreCommit_Go_Cron/lastCompletedBuild/badge/icon)](https://ci-beam.apache.org/job/beam_PreCommit_Go_Cron/lastCompletedBuild/) | [![Build Status](https://ci-beam.apache.org/job/beam_PreCommit_Website_Cron/lastCompletedBuild/badge/icon)](https://ci-beam.apache.org/job/beam_PreCommit_Website_Cron/lastCompletedBuild/)
   Portable | --- | [![Build Status](https://ci-beam.apache.org/job/beam_PreCommit_Portable_Python_Cron/lastCompletedBuild/badge/icon)](https://ci-beam.apache.org/job/beam_PreCommit_Portable_Python_Cron/lastCompletedBuild/) | --- | ---
   
   See [.test-infra/jenkins/README](https://github.com/apache/beam/blob/master/.test-infra/jenkins/README.md) for trigger phrase, status and link of all Jenkins jobs.
   


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [beam] annaqin418 commented on pull request #12157: [BEAM-7587] Spark portable streaming

Posted by GitBox <gi...@apache.org>.
annaqin418 commented on pull request #12157:
URL: https://github.com/apache/beam/pull/12157#issuecomment-652678870


   R: @ibzib 
   Creating this PR for now to track changes as I go. I set up some of the new interfaces, could you check if they look reasonable? I currently have streaming context as a subclass of the original spark translation context, but thinking some more I'm not sure if that totally works, and it might be better to have them both implementing an interface instead? 


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [beam] annaqin418 commented on pull request #12157: [BEAM-7587] Spark portable streaming

Posted by GitBox <gi...@apache.org>.
annaqin418 commented on pull request #12157:
URL: https://github.com/apache/beam/pull/12157#issuecomment-674267670


   R: @ibzib @robertwb 
   I still need to document all of the limitations (e.g. side inputs, state/timers), but this code is ready for review 


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [beam] ibzib commented on pull request #12157: [BEAM-7587] Spark portable streaming

Posted by GitBox <gi...@apache.org>.
ibzib commented on pull request #12157:
URL: https://github.com/apache/beam/pull/12157#issuecomment-678590499


   run seed job


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [beam] ibzib commented on pull request #12157: [BEAM-7587] Spark portable streaming

Posted by GitBox <gi...@apache.org>.
ibzib commented on pull request #12157:
URL: https://github.com/apache/beam/pull/12157#issuecomment-674273597


   Run Java Spark PortableValidatesRunner Streaming


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [beam] annaqin418 commented on a change in pull request #12157: [BEAM-7587] Spark portable streaming

Posted by GitBox <gi...@apache.org>.
annaqin418 commented on a change in pull request #12157:
URL: https://github.com/apache/beam/pull/12157#discussion_r450281739



##########
File path: runners/spark/src/main/java/org/apache/beam/runners/spark/translation/SparkTranslationContext.java
##########
@@ -35,15 +35,15 @@
  * compute them after translation.
  */
 public class SparkTranslationContext {
-  private final JavaSparkContext jsc;
+  protected final JavaSparkContext jsc;

Review comment:
       You're right, they shouldn't be, I got confused with the inheritance




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [beam] annaqin418 commented on pull request #12157: [BEAM-7587] Spark portable streaming

Posted by GitBox <gi...@apache.org>.
annaqin418 commented on pull request #12157:
URL: https://github.com/apache/beam/pull/12157#issuecomment-678534421


   Run Java Spark PortableValidatesRunner Streaming


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [beam] annaqin418 commented on pull request #12157: [BEAM-7587] Spark portable streaming

Posted by GitBox <gi...@apache.org>.
annaqin418 commented on pull request #12157:
URL: https://github.com/apache/beam/pull/12157#issuecomment-677947945


   run seed job


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [beam] ibzib commented on a change in pull request #12157: [BEAM-7587] Spark portable streaming

Posted by GitBox <gi...@apache.org>.
ibzib commented on a change in pull request #12157:
URL: https://github.com/apache/beam/pull/12157#discussion_r474316833



##########
File path: runners/spark/job-server/build.gradle
##########
@@ -82,20 +82,68 @@ runShadow {
     jvmArgs += ["-Dorg.slf4j.simpleLogger.defaultLogLevel=${project.property('logLevel')}"]
 }
 
-def portableValidatesRunnerTask(String name) {
-  createPortableValidatesRunnerTask(
-    name: "validatesPortableRunner${name}",
-    jobServerDriver: "org.apache.beam.runners.spark.SparkJobServerDriver",
-    jobServerConfig: "--job-host=localhost,--job-port=0,--artifact-port=0,--expansion-port=0",
-    testClasspathConfiguration: configurations.validatesPortableRunner,
-    numParallelTests: 4,
-    environment: BeamModulePlugin.PortableValidatesRunnerConfiguration.Environment.EMBEDDED,
-    systemProperties: [
-      "beam.spark.test.reuseSparkContext": "false",
-      "spark.ui.enabled": "false",
-      "spark.ui.showConsoleProgress": "false",
-    ],
-    testCategories: {
+def portableValidatesRunnerTask(String name, Boolean streaming) {
+  def pipelineOptions = []
+  def testCategories
+  def testFilter
+
+  if (streaming) {
+    pipelineOptions += "--streaming"
+    pipelineOptions += "--streamingTimeoutMs=30000"
+
+    testCategories = {
+      includeCategories 'org.apache.beam.sdk.testing.ValidatesRunner'
+      excludeCategories 'org.apache.beam.sdk.testing.FlattenWithHeterogeneousCoders'
+      excludeCategories 'org.apache.beam.sdk.testing.LargeKeys$Above100MB'
+      excludeCategories 'org.apache.beam.sdk.testing.UsesCommittedMetrics'
+      excludeCategories 'org.apache.beam.sdk.testing.UsesCustomWindowMerging'
+      excludeCategories 'org.apache.beam.sdk.testing.UsesFailureMessage'
+      excludeCategories 'org.apache.beam.sdk.testing.UsesGaugeMetrics'
+      excludeCategories 'org.apache.beam.sdk.testing.UsesParDoLifecycle'
+      excludeCategories 'org.apache.beam.sdk.testing.UsesMapState'
+      excludeCategories 'org.apache.beam.sdk.testing.UsesSetState'
+      excludeCategories 'org.apache.beam.sdk.testing.UsesOrderedListState'
+      excludeCategories 'org.apache.beam.sdk.testing.UsesTimerMap'
+      excludeCategories 'org.apache.beam.sdk.testing.UsesKeyInParDo'
+      excludeCategories 'org.apache.beam.sdk.testing.UsesOnWindowExpiration'
+      excludeCategories 'org.apache.beam.sdk.testing.UsesTestStream'
+      // SplittableDoFnTests
+      excludeCategories 'org.apache.beam.sdk.testing.UsesBoundedSplittableParDo'
+      excludeCategories 'org.apache.beam.sdk.testing.UsesSplittableParDoWithWindowedSideInputs'
+      excludeCategories 'org.apache.beam.sdk.testing.UsesUnboundedSplittableParDo'
+      excludeCategories 'org.apache.beam.sdk.testing.UsesStrictTimerOrdering'
+      excludeCategories 'org.apache.beam.sdk.testing.UsesBundleFinalizer'
+      // Currently unsupported in portable streaming:
+      // TODO (BEAM-10712)
+      excludeCategories 'org.apache.beam.sdk.testing.UsesSideInputs'
+      // TODO (BEAM-10754)
+      excludeCategories 'org.apache.beam.sdk.testing.UsesStatefulParDo'
+      // TODO (BEAM-10755)
+      excludeCategories 'org.apache.beam.sdk.testing.UsesTimersInParDo'
+    }
+
+    testFilter = {
+      // TODO(BEAM-10094)
+      excludeTestsMatching 'org.apache.beam.sdk.transforms.FlattenTest.testFlattenWithDifferentInputAndOutputCoders2'
+      // Currently unsupported in portable streaming:

Review comment:
       We should file a JIRA for all these (since they are conspicuously all windowing related).

##########
File path: runners/spark/src/main/java/org/apache/beam/runners/spark/translation/SparkPortablePipelineTranslator.java
##########
@@ -0,0 +1,38 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.runners.spark.translation;
+
+import java.util.Set;
+import org.apache.beam.model.pipeline.v1.RunnerApi;
+import org.apache.beam.runners.fnexecution.provisioning.JobInfo;
+import org.apache.beam.runners.spark.SparkPipelineOptions;
+import org.apache.spark.api.java.JavaSparkContext;
+
+/**
+ * Interface for portable Spark translators. This allows for a uniform invocation pattern for
+ * pipeline translation between streaming and portable runners.

Review comment:
       "streaming and portable" should be "streaming and batch."

##########
File path: runners/spark/src/test/java/org/apache/beam/runners/spark/translation/SparkExecutableStageFunctionTest.java
##########
@@ -101,7 +104,9 @@ public void setUpMocks() throws Exception {
   public void sdkErrorsSurfaceOnClose() throws Exception {
     SparkExecutableStageFunction<Integer, ?> function = getFunction(Collections.emptyMap());
     doThrow(new Exception()).when(remoteBundle).close();
-    function.call(Collections.emptyIterator());
+    List<WindowedValue<Integer>> dummyList = new ArrayList<>();

Review comment:
       Nit: "dummy" sounds a lot like mocks or stubs, which are technical terms that describe the implementation of test objects. This list isn't a "dummy" in that sense; it's a real list, albeit one constructed for test purposes.
   
   tl;dr rename this `inputs`.

##########
File path: runners/spark/job-server/build.gradle
##########
@@ -82,20 +82,68 @@ runShadow {
     jvmArgs += ["-Dorg.slf4j.simpleLogger.defaultLogLevel=${project.property('logLevel')}"]
 }
 
-def portableValidatesRunnerTask(String name) {
-  createPortableValidatesRunnerTask(
-    name: "validatesPortableRunner${name}",
-    jobServerDriver: "org.apache.beam.runners.spark.SparkJobServerDriver",
-    jobServerConfig: "--job-host=localhost,--job-port=0,--artifact-port=0,--expansion-port=0",
-    testClasspathConfiguration: configurations.validatesPortableRunner,
-    numParallelTests: 4,
-    environment: BeamModulePlugin.PortableValidatesRunnerConfiguration.Environment.EMBEDDED,
-    systemProperties: [
-      "beam.spark.test.reuseSparkContext": "false",
-      "spark.ui.enabled": "false",
-      "spark.ui.showConsoleProgress": "false",
-    ],
-    testCategories: {
+def portableValidatesRunnerTask(String name, Boolean streaming) {
+  def pipelineOptions = []
+  def testCategories
+  def testFilter
+
+  if (streaming) {
+    pipelineOptions += "--streaming"
+    pipelineOptions += "--streamingTimeoutMs=30000"
+
+    testCategories = {
+      includeCategories 'org.apache.beam.sdk.testing.ValidatesRunner'
+      excludeCategories 'org.apache.beam.sdk.testing.FlattenWithHeterogeneousCoders'
+      excludeCategories 'org.apache.beam.sdk.testing.LargeKeys$Above100MB'
+      excludeCategories 'org.apache.beam.sdk.testing.UsesCommittedMetrics'
+      excludeCategories 'org.apache.beam.sdk.testing.UsesCustomWindowMerging'
+      excludeCategories 'org.apache.beam.sdk.testing.UsesFailureMessage'
+      excludeCategories 'org.apache.beam.sdk.testing.UsesGaugeMetrics'
+      excludeCategories 'org.apache.beam.sdk.testing.UsesParDoLifecycle'
+      excludeCategories 'org.apache.beam.sdk.testing.UsesMapState'
+      excludeCategories 'org.apache.beam.sdk.testing.UsesSetState'
+      excludeCategories 'org.apache.beam.sdk.testing.UsesOrderedListState'
+      excludeCategories 'org.apache.beam.sdk.testing.UsesTimerMap'
+      excludeCategories 'org.apache.beam.sdk.testing.UsesKeyInParDo'
+      excludeCategories 'org.apache.beam.sdk.testing.UsesOnWindowExpiration'
+      excludeCategories 'org.apache.beam.sdk.testing.UsesTestStream'
+      // SplittableDoFnTests

Review comment:
       This should've been there before, but the jira for this (BEAM-7222) should be linked.

##########
File path: runners/spark/src/main/java/org/apache/beam/runners/spark/translation/SparkExecutableStageFunction.java
##########
@@ -118,6 +118,12 @@
 
   @Override
   public Iterator<RawUnionValue> call(Iterator<WindowedValue<InputT>> inputs) throws Exception {
+    // Do not call processElements if there are no inputs
+    // Otherwise, this may cause validation errors (e.g. ParDoTest)
+    if (!inputs.hasNext()) {
+      return new ConcurrentLinkedQueue<RawUnionValue>().iterator();

Review comment:
       `Collections.emptyIterator()` has the same behavior, but is more concise.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [beam] ibzib commented on a change in pull request #12157: [BEAM-7587] Spark portable streaming

Posted by GitBox <gi...@apache.org>.
ibzib commented on a change in pull request #12157:
URL: https://github.com/apache/beam/pull/12157#discussion_r470894541



##########
File path: runners/spark/src/main/java/org/apache/beam/runners/spark/SparkPipelineRunner.java
##########
@@ -71,7 +79,15 @@ public SparkPipelineRunner(SparkPipelineOptions pipelineOptions) {
 
   @Override
   public PortablePipelineResult run(RunnerApi.Pipeline pipeline, JobInfo jobInfo) {
-    SparkBatchPortablePipelineTranslator translator = new SparkBatchPortablePipelineTranslator();
+    boolean isStreaming;
+    SparkPortablePipelineTranslator translator;
+    if (!pipelineOptions.isStreaming() && !hasUnboundedPCollections(pipeline)) {

Review comment:
       Nit: reorder this to make it more readable.
   
   ```java
   boolean isStreaming = pipelineOptions.isStreaming() || hasUnboundedPCollections(pipeline);
   if (isStreaming) {
   ...
   } else {
   ...
   }
   ```

##########
File path: runners/spark/src/main/java/org/apache/beam/runners/spark/SparkPortableStreamingPipelineOptions.java
##########
@@ -0,0 +1,38 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.runners.spark;
+
+import org.apache.beam.sdk.annotations.Experimental;
+import org.apache.beam.sdk.options.Default;
+import org.apache.beam.sdk.options.Description;
+import org.apache.beam.sdk.options.PipelineOptions;
+import org.apache.beam.sdk.options.PortablePipelineOptions;
+
+/**
+ * Spark runner {@link PipelineOptions} handles Spark execution-related configurations, such as the

Review comment:
       Change this.

##########
File path: runners/spark/src/main/java/org/apache/beam/runners/spark/SparkPortableStreamingPipelineOptions.java
##########
@@ -0,0 +1,38 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.runners.spark;
+
+import org.apache.beam.sdk.annotations.Experimental;
+import org.apache.beam.sdk.options.Default;
+import org.apache.beam.sdk.options.Description;
+import org.apache.beam.sdk.options.PipelineOptions;
+import org.apache.beam.sdk.options.PortablePipelineOptions;
+
+/**
+ * Spark runner {@link PipelineOptions} handles Spark execution-related configurations, such as the
+ * master address, batch-interval, and other user-related knobs.
+ */
+@Experimental
+public interface SparkPortableStreamingPipelineOptions
+    extends SparkPipelineOptions, PortablePipelineOptions, PipelineOptions {
+  @Description("Timeout for testing Spark portable streaming, in milliseconds.")
+  @Default.Long(-1L)
+  Long getTimeout();

Review comment:
       Pipeline options all occupy the same namespace. We should give this option a name more specific to its purpose to prevent collisions.
   
   Also, this is just personal preference, but I like when folks suffix their variable names with the units (here, `Ms` or `Millis` or `Milliseconds`) when using raw numbers as time values.

##########
File path: runners/spark/src/main/java/org/apache/beam/runners/spark/translation/SparkStreamingPortablePipelineTranslator.java
##########
@@ -0,0 +1,393 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.runners.spark.translation;
+
+import static org.apache.beam.runners.fnexecution.translation.PipelineTranslatorUtils.createOutputMap;
+import static org.apache.beam.runners.fnexecution.translation.PipelineTranslatorUtils.getWindowingStrategy;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Queue;
+import java.util.Set;
+import java.util.concurrent.LinkedBlockingQueue;
+import java.util.stream.Collectors;
+import java.util.stream.StreamSupport;
+import org.apache.beam.model.pipeline.v1.RunnerApi;
+import org.apache.beam.runners.core.construction.PTransformTranslation;
+import org.apache.beam.runners.core.construction.graph.ExecutableStage;
+import org.apache.beam.runners.core.construction.graph.PipelineNode;
+import org.apache.beam.runners.core.construction.graph.PipelineNode.PTransformNode;
+import org.apache.beam.runners.core.construction.graph.QueryablePipeline;
+import org.apache.beam.runners.fnexecution.provisioning.JobInfo;
+import org.apache.beam.runners.fnexecution.wire.WireCoders;
+import org.apache.beam.runners.spark.SparkPipelineOptions;
+import org.apache.beam.runners.spark.coders.CoderHelpers;
+import org.apache.beam.runners.spark.metrics.MetricsAccumulator;
+import org.apache.beam.runners.spark.stateful.SparkGroupAlsoByWindowViaWindowSet;
+import org.apache.beam.runners.spark.translation.streaming.UnboundedDataset;
+import org.apache.beam.runners.spark.util.GlobalWatermarkHolder;
+import org.apache.beam.runners.spark.util.SparkCompat;
+import org.apache.beam.sdk.coders.ByteArrayCoder;
+import org.apache.beam.sdk.coders.Coder;
+import org.apache.beam.sdk.coders.KvCoder;
+import org.apache.beam.sdk.transforms.join.RawUnionValue;
+import org.apache.beam.sdk.transforms.windowing.BoundedWindow;
+import org.apache.beam.sdk.transforms.windowing.GlobalWindow;
+import org.apache.beam.sdk.transforms.windowing.PaneInfo;
+import org.apache.beam.sdk.transforms.windowing.WindowFn;
+import org.apache.beam.sdk.util.WindowedValue;
+import org.apache.beam.sdk.values.KV;
+import org.apache.beam.sdk.values.TimestampedValue;
+import org.apache.beam.sdk.values.WindowingStrategy;
+import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.collect.BiMap;
+import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.collect.ImmutableMap;
+import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.collect.Iterables;
+import org.apache.spark.api.java.JavaRDD;
+import org.apache.spark.api.java.JavaSparkContext;
+import org.apache.spark.broadcast.Broadcast;
+import org.apache.spark.storage.StorageLevel;
+import org.apache.spark.streaming.api.java.JavaDStream;
+import org.apache.spark.streaming.api.java.JavaInputDStream;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import scala.Tuple2;
+
+/** Translates a bounded portable pipeline into a Spark job. */

Review comment:
       ```suggestion
   /** Translates an unbounded portable pipeline into a Spark job. */
   ```

##########
File path: runners/spark/src/main/java/org/apache/beam/runners/spark/translation/SparkStreamingPortablePipelineTranslator.java
##########
@@ -0,0 +1,393 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.runners.spark.translation;
+
+import static org.apache.beam.runners.fnexecution.translation.PipelineTranslatorUtils.createOutputMap;
+import static org.apache.beam.runners.fnexecution.translation.PipelineTranslatorUtils.getWindowingStrategy;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Queue;
+import java.util.Set;
+import java.util.concurrent.LinkedBlockingQueue;
+import java.util.stream.Collectors;
+import java.util.stream.StreamSupport;
+import org.apache.beam.model.pipeline.v1.RunnerApi;
+import org.apache.beam.runners.core.construction.PTransformTranslation;
+import org.apache.beam.runners.core.construction.graph.ExecutableStage;
+import org.apache.beam.runners.core.construction.graph.PipelineNode;
+import org.apache.beam.runners.core.construction.graph.PipelineNode.PTransformNode;
+import org.apache.beam.runners.core.construction.graph.QueryablePipeline;
+import org.apache.beam.runners.fnexecution.provisioning.JobInfo;
+import org.apache.beam.runners.fnexecution.wire.WireCoders;
+import org.apache.beam.runners.spark.SparkPipelineOptions;
+import org.apache.beam.runners.spark.coders.CoderHelpers;
+import org.apache.beam.runners.spark.metrics.MetricsAccumulator;
+import org.apache.beam.runners.spark.stateful.SparkGroupAlsoByWindowViaWindowSet;
+import org.apache.beam.runners.spark.translation.streaming.UnboundedDataset;
+import org.apache.beam.runners.spark.util.GlobalWatermarkHolder;
+import org.apache.beam.runners.spark.util.SparkCompat;
+import org.apache.beam.sdk.coders.ByteArrayCoder;
+import org.apache.beam.sdk.coders.Coder;
+import org.apache.beam.sdk.coders.KvCoder;
+import org.apache.beam.sdk.transforms.join.RawUnionValue;
+import org.apache.beam.sdk.transforms.windowing.BoundedWindow;
+import org.apache.beam.sdk.transforms.windowing.GlobalWindow;
+import org.apache.beam.sdk.transforms.windowing.PaneInfo;
+import org.apache.beam.sdk.transforms.windowing.WindowFn;
+import org.apache.beam.sdk.util.WindowedValue;
+import org.apache.beam.sdk.values.KV;
+import org.apache.beam.sdk.values.TimestampedValue;
+import org.apache.beam.sdk.values.WindowingStrategy;
+import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.collect.BiMap;
+import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.collect.ImmutableMap;
+import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.collect.Iterables;
+import org.apache.spark.api.java.JavaRDD;
+import org.apache.spark.api.java.JavaSparkContext;
+import org.apache.spark.broadcast.Broadcast;
+import org.apache.spark.storage.StorageLevel;
+import org.apache.spark.streaming.api.java.JavaDStream;
+import org.apache.spark.streaming.api.java.JavaInputDStream;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import scala.Tuple2;
+
+/** Translates a bounded portable pipeline into a Spark job. */
+public class SparkStreamingPortablePipelineTranslator
+    implements SparkPortablePipelineTranslator<SparkStreamingTranslationContext> {
+
+  private static final Logger LOG =
+      LoggerFactory.getLogger(SparkStreamingPortablePipelineTranslator.class);
+
+  private final ImmutableMap<String, PTransformTranslator> urnToTransformTranslator;
+
+  interface PTransformTranslator {
+
+    /** Translates transformNode from Beam into the Spark context. */
+    void translate(
+        PTransformNode transformNode,
+        RunnerApi.Pipeline pipeline,
+        SparkStreamingTranslationContext context);
+  }
+
+  @Override
+  public Set<String> knownUrns() {
+    return urnToTransformTranslator.keySet();
+  }
+
+  public SparkStreamingPortablePipelineTranslator() {
+    ImmutableMap.Builder<String, PTransformTranslator> translatorMap = ImmutableMap.builder();
+    translatorMap.put(
+        PTransformTranslation.IMPULSE_TRANSFORM_URN,
+        SparkStreamingPortablePipelineTranslator::translateImpulse);
+    translatorMap.put(
+        PTransformTranslation.GROUP_BY_KEY_TRANSFORM_URN,
+        SparkStreamingPortablePipelineTranslator::translateGroupByKey);
+    translatorMap.put(
+        ExecutableStage.URN, SparkStreamingPortablePipelineTranslator::translateExecutableStage);
+    translatorMap.put(
+        PTransformTranslation.FLATTEN_TRANSFORM_URN,
+        SparkStreamingPortablePipelineTranslator::translateFlatten);
+    translatorMap.put(
+        PTransformTranslation.RESHUFFLE_URN,
+        SparkStreamingPortablePipelineTranslator::translateReshuffle);
+    this.urnToTransformTranslator = translatorMap.build();
+  }
+
+  /** Translates pipeline from Beam into the Spark context. */
+  @Override
+  public void translate(
+      final RunnerApi.Pipeline pipeline, SparkStreamingTranslationContext context) {
+    QueryablePipeline p =
+        QueryablePipeline.forTransforms(
+            pipeline.getRootTransformIdsList(), pipeline.getComponents());
+    for (PipelineNode.PTransformNode transformNode : p.getTopologicallyOrderedTransforms()) {
+      urnToTransformTranslator
+          .getOrDefault(
+              transformNode.getTransform().getSpec().getUrn(),
+              SparkStreamingPortablePipelineTranslator::urnNotFound)
+          .translate(transformNode, pipeline, context);
+    }
+  }
+
+  private static void urnNotFound(
+      PTransformNode transformNode,
+      RunnerApi.Pipeline pipeline,
+      SparkStreamingTranslationContext context) {
+    throw new IllegalArgumentException(
+        String.format(
+            "Transform %s has unknown URN %s",
+            transformNode.getId(), transformNode.getTransform().getSpec().getUrn()));
+  }
+
+  private static void translateImpulse(
+      PTransformNode transformNode,
+      RunnerApi.Pipeline pipeline,
+      SparkStreamingTranslationContext context) {
+
+    TimestampedValue<byte[]> tsValue = TimestampedValue.atMinimumTimestamp(new byte[0]);
+    Iterable<TimestampedValue<byte[]>> timestampedValues = Collections.singletonList(tsValue);
+    Iterable<WindowedValue<byte[]>> windowedValues =
+        StreamSupport.stream(timestampedValues.spliterator(), false)
+            .map(
+                timestampedValue ->
+                    WindowedValue.of(
+                        timestampedValue.getValue(),
+                        timestampedValue.getTimestamp(),
+                        GlobalWindow.INSTANCE,
+                        PaneInfo.NO_FIRING))
+            .collect(Collectors.toList());
+
+    ByteArrayCoder coder = ByteArrayCoder.of();

Review comment:
       Super nit: this line doesn't deserve a newline following it.

##########
File path: runners/spark/src/main/java/org/apache/beam/runners/spark/SparkPipelineRunner.java
##########
@@ -106,39 +123,95 @@ public PortablePipelineResult run(RunnerApi.Pipeline pipeline, JobInfo jobInfo)
         pipelineOptions.getFilesToStage().size());
     LOG.debug("Staging files: {}", pipelineOptions.getFilesToStage());
 
+    PortablePipelineResult result;
     final JavaSparkContext jsc = SparkContextFactory.getSparkContext(pipelineOptions);
+
     LOG.info(String.format("Running job %s on Spark master %s", jobInfo.jobId(), jsc.master()));
-    AggregatorsAccumulator.init(pipelineOptions, jsc);
 
+    // Initialize accumulators.
+    AggregatorsAccumulator.init(pipelineOptions, jsc);
     MetricsEnvironment.setMetricsSupported(true);
     MetricsAccumulator.init(pipelineOptions, jsc);
 
     final SparkTranslationContext context =
-        new SparkTranslationContext(jsc, pipelineOptions, jobInfo);
+        translator.createTranslationContext(jsc, pipelineOptions, jobInfo);
     final ExecutorService executorService = Executors.newSingleThreadExecutor();
-    final Future<?> submissionFuture =
-        executorService.submit(
-            () -> {
-              translator.translate(fusedPipeline, context);
-              LOG.info(
-                  String.format(
-                      "Job %s: Pipeline translated successfully. Computing outputs",
-                      jobInfo.jobId()));
-              context.computeOutputs();
-              LOG.info(String.format("Job %s finished.", jobInfo.jobId()));
-            });
-
-    PortablePipelineResult result =
-        new SparkPipelineResult.PortableBatchMode(submissionFuture, jsc);
+
+    LOG.info(String.format("Running job %s on Spark master %s", jobInfo.jobId(), jsc.master()));
+
+    if (isStreaming) {
+      final JavaStreamingContext jssc =
+          ((SparkStreamingTranslationContext) context).getStreamingContext();
+
+      jssc.addStreamingListener(
+          new JavaStreamingListenerWrapper(
+              new AggregatorsAccumulator.AccumulatorCheckpointingSparkListener()));
+      jssc.addStreamingListener(
+          new JavaStreamingListenerWrapper(
+              new MetricsAccumulator.AccumulatorCheckpointingSparkListener()));
+
+      // register user-defined listeners.
+      for (JavaStreamingListener listener :
+          pipelineOptions.as(SparkContextOptions.class).getListeners()) {
+        LOG.info("Registered listener {}." + listener.getClass().getSimpleName());
+        jssc.addStreamingListener(new JavaStreamingListenerWrapper(listener));
+      }
+
+      // register Watermarks listener to broadcast the advanced WMs.
+      jssc.addStreamingListener(
+          new JavaStreamingListenerWrapper(
+              new GlobalWatermarkHolder.WatermarkAdvancingStreamingListener()));
+
+      jssc.checkpoint(pipelineOptions.getCheckpointDir());
+
+      // Obtain timeout from options.
+      Long timeout = pipelineOptions.as(SparkPortableStreamingPipelineOptions.class).getTimeout();
+
+      final Future<?> submissionFuture =
+          executorService.submit(
+              () -> {
+                translator.translate(fusedPipeline, context);
+                LOG.info(
+                    String.format(
+                        "Job %s: Pipeline translated successfully. Computing outputs",
+                        jobInfo.jobId()));
+                context.computeOutputs();
+
+                jssc.start();
+                try {
+                  jssc.awaitTerminationOrTimeout(timeout);
+                } catch (InterruptedException e) {
+                  LOG.warn("Streaming context interrupted, shutting down.");
+                  e.printStackTrace();

Review comment:
       Prefer to go through the logger rather than using `printStackTrace`.
   
   All the `LOG` methods take an exception as an additional argument, like this: https://github.com/apache/beam/blob/e725118fb1afb6d869b9b4e19110aae26ddd26ed/runners/spark/src/main/java/org/apache/beam/runners/spark/SparkJobServerDriver.java#L71

##########
File path: runners/spark/src/main/java/org/apache/beam/runners/spark/SparkPortableStreamingPipelineOptions.java
##########
@@ -0,0 +1,38 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.runners.spark;
+
+import org.apache.beam.sdk.annotations.Experimental;
+import org.apache.beam.sdk.options.Default;
+import org.apache.beam.sdk.options.Description;
+import org.apache.beam.sdk.options.PipelineOptions;
+import org.apache.beam.sdk.options.PortablePipelineOptions;
+
+/**
+ * Spark runner {@link PipelineOptions} handles Spark execution-related configurations, such as the
+ * master address, batch-interval, and other user-related knobs.
+ */
+@Experimental
+public interface SparkPortableStreamingPipelineOptions
+    extends SparkPipelineOptions, PortablePipelineOptions, PipelineOptions {
+  @Description("Timeout for testing Spark portable streaming, in milliseconds.")

Review comment:
       If `-1L` has a special meaning (ie infinity), document it here.

##########
File path: runners/spark/job-server/build.gradle
##########
@@ -82,53 +82,116 @@ runShadow {
     jvmArgs += ["-Dorg.slf4j.simpleLogger.defaultLogLevel=${project.property('logLevel')}"]
 }
 
-def portableValidatesRunnerTask(String name) {
-  createPortableValidatesRunnerTask(
-    name: "validatesPortableRunner${name}",
-    jobServerDriver: "org.apache.beam.runners.spark.SparkJobServerDriver",
-    jobServerConfig: "--job-host=localhost,--job-port=0,--artifact-port=0,--expansion-port=0",
-    testClasspathConfiguration: configurations.validatesPortableRunner,
-    numParallelTests: 4,
-    environment: BeamModulePlugin.PortableValidatesRunnerConfiguration.Environment.EMBEDDED,
-    systemProperties: [
-      "beam.spark.test.reuseSparkContext": "false",
-      "spark.ui.enabled": "false",
-      "spark.ui.showConsoleProgress": "false",
-    ],
-    testCategories: {
-      includeCategories 'org.apache.beam.sdk.testing.ValidatesRunner'
-      excludeCategories 'org.apache.beam.sdk.testing.FlattenWithHeterogeneousCoders'
-      excludeCategories 'org.apache.beam.sdk.testing.LargeKeys$Above100MB'
-      excludeCategories 'org.apache.beam.sdk.testing.UsesCommittedMetrics'
-      excludeCategories 'org.apache.beam.sdk.testing.UsesCustomWindowMerging'
-      excludeCategories 'org.apache.beam.sdk.testing.UsesFailureMessage'
-      excludeCategories 'org.apache.beam.sdk.testing.UsesGaugeMetrics'
-      excludeCategories 'org.apache.beam.sdk.testing.UsesParDoLifecycle'
-      excludeCategories 'org.apache.beam.sdk.testing.UsesMapState'
-      excludeCategories 'org.apache.beam.sdk.testing.UsesSetState'
-      excludeCategories 'org.apache.beam.sdk.testing.UsesOrderedListState'
-      excludeCategories 'org.apache.beam.sdk.testing.UsesTimerMap'
-      excludeCategories 'org.apache.beam.sdk.testing.UsesKeyInParDo'
-      excludeCategories 'org.apache.beam.sdk.testing.UsesOnWindowExpiration'
-      excludeCategories 'org.apache.beam.sdk.testing.UsesTestStream'
-      //SplitableDoFnTests
-      excludeCategories 'org.apache.beam.sdk.testing.UsesBoundedSplittableParDo'
-      excludeCategories 'org.apache.beam.sdk.testing.UsesSplittableParDoWithWindowedSideInputs'
-      excludeCategories 'org.apache.beam.sdk.testing.UsesUnboundedSplittableParDo'
-      excludeCategories 'org.apache.beam.sdk.testing.UsesStrictTimerOrdering'
-      excludeCategories 'org.apache.beam.sdk.testing.UsesBundleFinalizer'
-    },
-    testFilter: {
-      // TODO(BEAM-10094)
-      excludeTestsMatching 'org.apache.beam.sdk.transforms.FlattenTest.testFlattenWithDifferentInputAndOutputCoders2'
-    },
-  )
+def portableValidatesRunnerTask(String name, Boolean streaming) {
+  def pipelineOptions = []
+  if (streaming) {
+    pipelineOptions += "--streaming"
+    pipelineOptions += "--timeout=20000"
+    // exclude unsupported tests
+    createPortableValidatesRunnerTask(

Review comment:
       Since the batch and streaming configurations only differ in three fields (pipelineOptions, testCategories, and testFilter),  they should share the rest to avoid code duplication.
   
   FYI `createPortableValidatesRunnerTask` is implicitly creating a `PortableValidatesRunnerConfiguration` object, to share code you will probably have to create one explicitly.
   https://github.com/apache/beam/blob/71c7760f4b5c5bf0d91e2c8403fae99216308a3e/buildSrc/src/main/groovy/org/apache/beam/gradle/BeamModulePlugin.groovy#L245

##########
File path: runners/spark/src/main/java/org/apache/beam/runners/spark/SparkPortableStreamingPipelineOptions.java
##########
@@ -0,0 +1,38 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.runners.spark;
+
+import org.apache.beam.sdk.annotations.Experimental;
+import org.apache.beam.sdk.options.Default;
+import org.apache.beam.sdk.options.Description;
+import org.apache.beam.sdk.options.PipelineOptions;
+import org.apache.beam.sdk.options.PortablePipelineOptions;
+
+/**
+ * Spark runner {@link PipelineOptions} handles Spark execution-related configurations, such as the
+ * master address, batch-interval, and other user-related knobs.
+ */
+@Experimental
+public interface SparkPortableStreamingPipelineOptions
+    extends SparkPipelineOptions, PortablePipelineOptions, PipelineOptions {
+  @Description("Timeout for testing Spark portable streaming, in milliseconds.")
+  @Default.Long(-1L)

Review comment:
       Is `-1L` already treated as a special case by the Spark streaming context, or do we need to handle it specially in Beam?

##########
File path: runners/spark/src/main/java/org/apache/beam/runners/spark/SparkPortableStreamingPipelineOptions.java
##########
@@ -0,0 +1,38 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.runners.spark;
+
+import org.apache.beam.sdk.annotations.Experimental;
+import org.apache.beam.sdk.options.Default;
+import org.apache.beam.sdk.options.Description;
+import org.apache.beam.sdk.options.PipelineOptions;
+import org.apache.beam.sdk.options.PortablePipelineOptions;
+
+/**
+ * Spark runner {@link PipelineOptions} handles Spark execution-related configurations, such as the
+ * master address, batch-interval, and other user-related knobs.
+ */
+@Experimental
+public interface SparkPortableStreamingPipelineOptions
+    extends SparkPipelineOptions, PortablePipelineOptions, PipelineOptions {

Review comment:
       We figured out we don't need to extend `PipelineOptions` directly. Remove it.

##########
File path: runners/spark/src/main/java/org/apache/beam/runners/spark/translation/SparkStreamingPortablePipelineTranslator.java
##########
@@ -0,0 +1,393 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.runners.spark.translation;
+
+import static org.apache.beam.runners.fnexecution.translation.PipelineTranslatorUtils.createOutputMap;
+import static org.apache.beam.runners.fnexecution.translation.PipelineTranslatorUtils.getWindowingStrategy;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Queue;
+import java.util.Set;
+import java.util.concurrent.LinkedBlockingQueue;
+import java.util.stream.Collectors;
+import java.util.stream.StreamSupport;
+import org.apache.beam.model.pipeline.v1.RunnerApi;
+import org.apache.beam.runners.core.construction.PTransformTranslation;
+import org.apache.beam.runners.core.construction.graph.ExecutableStage;
+import org.apache.beam.runners.core.construction.graph.PipelineNode;
+import org.apache.beam.runners.core.construction.graph.PipelineNode.PTransformNode;
+import org.apache.beam.runners.core.construction.graph.QueryablePipeline;
+import org.apache.beam.runners.fnexecution.provisioning.JobInfo;
+import org.apache.beam.runners.fnexecution.wire.WireCoders;
+import org.apache.beam.runners.spark.SparkPipelineOptions;
+import org.apache.beam.runners.spark.coders.CoderHelpers;
+import org.apache.beam.runners.spark.metrics.MetricsAccumulator;
+import org.apache.beam.runners.spark.stateful.SparkGroupAlsoByWindowViaWindowSet;
+import org.apache.beam.runners.spark.translation.streaming.UnboundedDataset;
+import org.apache.beam.runners.spark.util.GlobalWatermarkHolder;
+import org.apache.beam.runners.spark.util.SparkCompat;
+import org.apache.beam.sdk.coders.ByteArrayCoder;
+import org.apache.beam.sdk.coders.Coder;
+import org.apache.beam.sdk.coders.KvCoder;
+import org.apache.beam.sdk.transforms.join.RawUnionValue;
+import org.apache.beam.sdk.transforms.windowing.BoundedWindow;
+import org.apache.beam.sdk.transforms.windowing.GlobalWindow;
+import org.apache.beam.sdk.transforms.windowing.PaneInfo;
+import org.apache.beam.sdk.transforms.windowing.WindowFn;
+import org.apache.beam.sdk.util.WindowedValue;
+import org.apache.beam.sdk.values.KV;
+import org.apache.beam.sdk.values.TimestampedValue;
+import org.apache.beam.sdk.values.WindowingStrategy;
+import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.collect.BiMap;
+import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.collect.ImmutableMap;
+import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.collect.Iterables;
+import org.apache.spark.api.java.JavaRDD;
+import org.apache.spark.api.java.JavaSparkContext;
+import org.apache.spark.broadcast.Broadcast;
+import org.apache.spark.storage.StorageLevel;
+import org.apache.spark.streaming.api.java.JavaDStream;
+import org.apache.spark.streaming.api.java.JavaInputDStream;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import scala.Tuple2;
+
+/** Translates a bounded portable pipeline into a Spark job. */
+public class SparkStreamingPortablePipelineTranslator
+    implements SparkPortablePipelineTranslator<SparkStreamingTranslationContext> {
+
+  private static final Logger LOG =
+      LoggerFactory.getLogger(SparkStreamingPortablePipelineTranslator.class);
+
+  private final ImmutableMap<String, PTransformTranslator> urnToTransformTranslator;
+
+  interface PTransformTranslator {
+
+    /** Translates transformNode from Beam into the Spark context. */
+    void translate(
+        PTransformNode transformNode,
+        RunnerApi.Pipeline pipeline,
+        SparkStreamingTranslationContext context);
+  }
+
+  @Override
+  public Set<String> knownUrns() {
+    return urnToTransformTranslator.keySet();
+  }
+
+  public SparkStreamingPortablePipelineTranslator() {
+    ImmutableMap.Builder<String, PTransformTranslator> translatorMap = ImmutableMap.builder();
+    translatorMap.put(
+        PTransformTranslation.IMPULSE_TRANSFORM_URN,
+        SparkStreamingPortablePipelineTranslator::translateImpulse);
+    translatorMap.put(
+        PTransformTranslation.GROUP_BY_KEY_TRANSFORM_URN,
+        SparkStreamingPortablePipelineTranslator::translateGroupByKey);
+    translatorMap.put(
+        ExecutableStage.URN, SparkStreamingPortablePipelineTranslator::translateExecutableStage);
+    translatorMap.put(
+        PTransformTranslation.FLATTEN_TRANSFORM_URN,
+        SparkStreamingPortablePipelineTranslator::translateFlatten);
+    translatorMap.put(
+        PTransformTranslation.RESHUFFLE_URN,
+        SparkStreamingPortablePipelineTranslator::translateReshuffle);
+    this.urnToTransformTranslator = translatorMap.build();
+  }
+
+  /** Translates pipeline from Beam into the Spark context. */
+  @Override
+  public void translate(
+      final RunnerApi.Pipeline pipeline, SparkStreamingTranslationContext context) {
+    QueryablePipeline p =
+        QueryablePipeline.forTransforms(
+            pipeline.getRootTransformIdsList(), pipeline.getComponents());
+    for (PipelineNode.PTransformNode transformNode : p.getTopologicallyOrderedTransforms()) {
+      urnToTransformTranslator
+          .getOrDefault(
+              transformNode.getTransform().getSpec().getUrn(),
+              SparkStreamingPortablePipelineTranslator::urnNotFound)
+          .translate(transformNode, pipeline, context);
+    }
+  }
+
+  private static void urnNotFound(
+      PTransformNode transformNode,
+      RunnerApi.Pipeline pipeline,
+      SparkStreamingTranslationContext context) {
+    throw new IllegalArgumentException(
+        String.format(
+            "Transform %s has unknown URN %s",
+            transformNode.getId(), transformNode.getTransform().getSpec().getUrn()));
+  }
+
+  private static void translateImpulse(
+      PTransformNode transformNode,
+      RunnerApi.Pipeline pipeline,
+      SparkStreamingTranslationContext context) {
+
+    TimestampedValue<byte[]> tsValue = TimestampedValue.atMinimumTimestamp(new byte[0]);
+    Iterable<TimestampedValue<byte[]>> timestampedValues = Collections.singletonList(tsValue);
+    Iterable<WindowedValue<byte[]>> windowedValues =
+        StreamSupport.stream(timestampedValues.spliterator(), false)
+            .map(
+                timestampedValue ->
+                    WindowedValue.of(
+                        timestampedValue.getValue(),
+                        timestampedValue.getTimestamp(),
+                        GlobalWindow.INSTANCE,
+                        PaneInfo.NO_FIRING))
+            .collect(Collectors.toList());
+
+    ByteArrayCoder coder = ByteArrayCoder.of();
+
+    WindowedValue.FullWindowedValueCoder<byte[]> windowCoder =
+        WindowedValue.FullWindowedValueCoder.of(coder, GlobalWindow.Coder.INSTANCE);
+    JavaRDD<WindowedValue<byte[]>> emptyRDD =
+        context
+            .getSparkContext()
+            .parallelize(CoderHelpers.toByteArrays(windowedValues, windowCoder))
+            .map(CoderHelpers.fromByteFunction(windowCoder));
+
+    // create input DStream from RDD queue
+    Queue<JavaRDD<WindowedValue<byte[]>>> queueRDD = new LinkedBlockingQueue<>();
+    queueRDD.offer(emptyRDD);
+    JavaInputDStream<WindowedValue<byte[]>> emptyStream =
+        context.getStreamingContext().queueStream(queueRDD, true);
+
+    UnboundedDataset<byte[]> output =
+        new UnboundedDataset<>(
+            emptyStream, Collections.singletonList(emptyStream.inputDStream().id()));
+
+    GlobalWatermarkHolder.SparkWatermarks sparkWatermark =
+        new GlobalWatermarkHolder.SparkWatermarks(
+            GlobalWindow.INSTANCE.maxTimestamp(),
+            BoundedWindow.TIMESTAMP_MAX_VALUE,
+            context.getFirstTimestamp());
+    GlobalWatermarkHolder.add(output.getStreamSources().get(0), sparkWatermark);
+
+    context.pushDataset(getOutputId(transformNode), output);
+  }
+
+  private static <K, V> void translateGroupByKey(
+      PTransformNode transformNode,
+      RunnerApi.Pipeline pipeline,
+      SparkStreamingTranslationContext context) {
+    RunnerApi.Components components = pipeline.getComponents();
+    String inputId = getInputId(transformNode);
+    UnboundedDataset<KV<K, V>> inputDataset =
+        (UnboundedDataset<KV<K, V>>) context.popDataset(inputId);
+    List<Integer> streamSources = inputDataset.getStreamSources();
+    JavaDStream<WindowedValue<KV<K, V>>> dStream = inputDataset.getDStream();
+    WindowedValue.WindowedValueCoder<KV<K, V>> inputCoder =
+        getWindowedValueCoder(inputId, components);
+    KvCoder<K, V> inputKvCoder = (KvCoder<K, V>) inputCoder.getValueCoder();
+    Coder<K> inputKeyCoder = inputKvCoder.getKeyCoder();
+    Coder<V> inputValueCoder = inputKvCoder.getValueCoder();
+    final WindowingStrategy windowingStrategy = getWindowingStrategy(inputId, components);
+    final WindowFn<Object, BoundedWindow> windowFn = windowingStrategy.getWindowFn();
+    final WindowedValue.WindowedValueCoder<V> wvCoder =
+        WindowedValue.FullWindowedValueCoder.of(inputValueCoder, windowFn.windowCoder());
+
+    JavaDStream<WindowedValue<KV<K, Iterable<V>>>> outStream =
+        SparkGroupAlsoByWindowViaWindowSet.groupByKeyAndWindow(
+            dStream,
+            inputKeyCoder,
+            wvCoder,
+            windowingStrategy,
+            context.getSerializableOptions(),
+            streamSources,
+            transformNode.getId());
+
+    context.pushDataset(
+        getOutputId(transformNode), new UnboundedDataset<>(outStream, streamSources));
+  }
+
+  private static <InputT, OutputT, SideInputT> void translateExecutableStage(
+      PTransformNode transformNode,
+      RunnerApi.Pipeline pipeline,
+      SparkStreamingTranslationContext context) {
+    RunnerApi.ExecutableStagePayload stagePayload;
+    try {
+      stagePayload =
+          RunnerApi.ExecutableStagePayload.parseFrom(
+              transformNode.getTransform().getSpec().getPayload());
+    } catch (IOException e) {
+      throw new RuntimeException(e);
+    }
+
+    String inputPCollectionId = stagePayload.getInput();
+    UnboundedDataset<InputT> inputDataset =
+        (UnboundedDataset<InputT>) context.popDataset(inputPCollectionId);
+    List<Integer> streamSources = inputDataset.getStreamSources();
+    JavaDStream<WindowedValue<InputT>> inputDStream = inputDataset.getDStream();
+    Map<String, String> outputs = transformNode.getTransform().getOutputsMap();
+    BiMap<String, Integer> outputMap = createOutputMap(outputs.values());
+
+    RunnerApi.Components components = pipeline.getComponents();
+    Coder windowCoder =
+        getWindowingStrategy(inputPCollectionId, components).getWindowFn().windowCoder();
+
+    // TODO: handle side inputs?

Review comment:
       Create a JIRA and point to it here:
   ```suggestion
       // TODO(BEAM-XXXXX): handle side inputs.
   ```

##########
File path: runners/spark/src/main/java/org/apache/beam/runners/spark/translation/SparkStreamingTranslationContext.java
##########
@@ -0,0 +1,50 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.runners.spark.translation;
+
+import org.apache.beam.runners.fnexecution.provisioning.JobInfo;
+import org.apache.beam.runners.spark.SparkPipelineOptions;
+import org.apache.spark.api.java.JavaSparkContext;
+import org.apache.spark.streaming.Duration;
+import org.apache.spark.streaming.api.java.JavaStreamingContext;
+import org.joda.time.Instant;
+
+/**
+ * Translation context used to lazily store Spark data sets during portable pipeline translation and
+ * compute them after translation.
+ */
+public class SparkStreamingTranslationContext extends SparkTranslationContext {
+  private final JavaStreamingContext streamingContext;
+  private final Instant firstTimestamp;
+
+  public SparkStreamingTranslationContext(
+      JavaSparkContext jsc, SparkPipelineOptions options, JobInfo jobInfo) {
+    super(jsc, options, jobInfo);
+    Duration batchDuration = new Duration(options.getBatchIntervalMillis());
+    this.streamingContext = new JavaStreamingContext(jsc, batchDuration);
+    this.firstTimestamp = new Instant();
+  }
+
+  public JavaStreamingContext getStreamingContext() {

Review comment:
       Make these methods package-private.

##########
File path: runners/spark/src/main/java/org/apache/beam/runners/spark/translation/SparkStreamingPortablePipelineTranslator.java
##########
@@ -0,0 +1,393 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.runners.spark.translation;
+
+import static org.apache.beam.runners.fnexecution.translation.PipelineTranslatorUtils.createOutputMap;
+import static org.apache.beam.runners.fnexecution.translation.PipelineTranslatorUtils.getWindowingStrategy;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Queue;
+import java.util.Set;
+import java.util.concurrent.LinkedBlockingQueue;
+import java.util.stream.Collectors;
+import java.util.stream.StreamSupport;
+import org.apache.beam.model.pipeline.v1.RunnerApi;
+import org.apache.beam.runners.core.construction.PTransformTranslation;
+import org.apache.beam.runners.core.construction.graph.ExecutableStage;
+import org.apache.beam.runners.core.construction.graph.PipelineNode;
+import org.apache.beam.runners.core.construction.graph.PipelineNode.PTransformNode;
+import org.apache.beam.runners.core.construction.graph.QueryablePipeline;
+import org.apache.beam.runners.fnexecution.provisioning.JobInfo;
+import org.apache.beam.runners.fnexecution.wire.WireCoders;
+import org.apache.beam.runners.spark.SparkPipelineOptions;
+import org.apache.beam.runners.spark.coders.CoderHelpers;
+import org.apache.beam.runners.spark.metrics.MetricsAccumulator;
+import org.apache.beam.runners.spark.stateful.SparkGroupAlsoByWindowViaWindowSet;
+import org.apache.beam.runners.spark.translation.streaming.UnboundedDataset;
+import org.apache.beam.runners.spark.util.GlobalWatermarkHolder;
+import org.apache.beam.runners.spark.util.SparkCompat;
+import org.apache.beam.sdk.coders.ByteArrayCoder;
+import org.apache.beam.sdk.coders.Coder;
+import org.apache.beam.sdk.coders.KvCoder;
+import org.apache.beam.sdk.transforms.join.RawUnionValue;
+import org.apache.beam.sdk.transforms.windowing.BoundedWindow;
+import org.apache.beam.sdk.transforms.windowing.GlobalWindow;
+import org.apache.beam.sdk.transforms.windowing.PaneInfo;
+import org.apache.beam.sdk.transforms.windowing.WindowFn;
+import org.apache.beam.sdk.util.WindowedValue;
+import org.apache.beam.sdk.values.KV;
+import org.apache.beam.sdk.values.TimestampedValue;
+import org.apache.beam.sdk.values.WindowingStrategy;
+import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.collect.BiMap;
+import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.collect.ImmutableMap;
+import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.collect.Iterables;
+import org.apache.spark.api.java.JavaRDD;
+import org.apache.spark.api.java.JavaSparkContext;
+import org.apache.spark.broadcast.Broadcast;
+import org.apache.spark.storage.StorageLevel;
+import org.apache.spark.streaming.api.java.JavaDStream;
+import org.apache.spark.streaming.api.java.JavaInputDStream;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import scala.Tuple2;
+
+/** Translates a bounded portable pipeline into a Spark job. */
+public class SparkStreamingPortablePipelineTranslator
+    implements SparkPortablePipelineTranslator<SparkStreamingTranslationContext> {
+
+  private static final Logger LOG =
+      LoggerFactory.getLogger(SparkStreamingPortablePipelineTranslator.class);
+
+  private final ImmutableMap<String, PTransformTranslator> urnToTransformTranslator;
+
+  interface PTransformTranslator {
+
+    /** Translates transformNode from Beam into the Spark context. */
+    void translate(
+        PTransformNode transformNode,
+        RunnerApi.Pipeline pipeline,
+        SparkStreamingTranslationContext context);
+  }
+
+  @Override
+  public Set<String> knownUrns() {
+    return urnToTransformTranslator.keySet();
+  }
+
+  public SparkStreamingPortablePipelineTranslator() {
+    ImmutableMap.Builder<String, PTransformTranslator> translatorMap = ImmutableMap.builder();
+    translatorMap.put(
+        PTransformTranslation.IMPULSE_TRANSFORM_URN,
+        SparkStreamingPortablePipelineTranslator::translateImpulse);
+    translatorMap.put(
+        PTransformTranslation.GROUP_BY_KEY_TRANSFORM_URN,
+        SparkStreamingPortablePipelineTranslator::translateGroupByKey);
+    translatorMap.put(
+        ExecutableStage.URN, SparkStreamingPortablePipelineTranslator::translateExecutableStage);
+    translatorMap.put(
+        PTransformTranslation.FLATTEN_TRANSFORM_URN,
+        SparkStreamingPortablePipelineTranslator::translateFlatten);
+    translatorMap.put(
+        PTransformTranslation.RESHUFFLE_URN,
+        SparkStreamingPortablePipelineTranslator::translateReshuffle);
+    this.urnToTransformTranslator = translatorMap.build();
+  }
+
+  /** Translates pipeline from Beam into the Spark context. */
+  @Override
+  public void translate(
+      final RunnerApi.Pipeline pipeline, SparkStreamingTranslationContext context) {
+    QueryablePipeline p =
+        QueryablePipeline.forTransforms(
+            pipeline.getRootTransformIdsList(), pipeline.getComponents());
+    for (PipelineNode.PTransformNode transformNode : p.getTopologicallyOrderedTransforms()) {
+      urnToTransformTranslator
+          .getOrDefault(
+              transformNode.getTransform().getSpec().getUrn(),
+              SparkStreamingPortablePipelineTranslator::urnNotFound)
+          .translate(transformNode, pipeline, context);
+    }
+  }
+
+  private static void urnNotFound(
+      PTransformNode transformNode,
+      RunnerApi.Pipeline pipeline,
+      SparkStreamingTranslationContext context) {
+    throw new IllegalArgumentException(
+        String.format(
+            "Transform %s has unknown URN %s",
+            transformNode.getId(), transformNode.getTransform().getSpec().getUrn()));
+  }
+
+  private static void translateImpulse(
+      PTransformNode transformNode,
+      RunnerApi.Pipeline pipeline,
+      SparkStreamingTranslationContext context) {
+
+    TimestampedValue<byte[]> tsValue = TimestampedValue.atMinimumTimestamp(new byte[0]);
+    Iterable<TimestampedValue<byte[]>> timestampedValues = Collections.singletonList(tsValue);
+    Iterable<WindowedValue<byte[]>> windowedValues =
+        StreamSupport.stream(timestampedValues.spliterator(), false)
+            .map(
+                timestampedValue ->
+                    WindowedValue.of(
+                        timestampedValue.getValue(),
+                        timestampedValue.getTimestamp(),
+                        GlobalWindow.INSTANCE,
+                        PaneInfo.NO_FIRING))
+            .collect(Collectors.toList());
+
+    ByteArrayCoder coder = ByteArrayCoder.of();
+
+    WindowedValue.FullWindowedValueCoder<byte[]> windowCoder =
+        WindowedValue.FullWindowedValueCoder.of(coder, GlobalWindow.Coder.INSTANCE);
+    JavaRDD<WindowedValue<byte[]>> emptyRDD =
+        context
+            .getSparkContext()
+            .parallelize(CoderHelpers.toByteArrays(windowedValues, windowCoder))
+            .map(CoderHelpers.fromByteFunction(windowCoder));
+
+    // create input DStream from RDD queue
+    Queue<JavaRDD<WindowedValue<byte[]>>> queueRDD = new LinkedBlockingQueue<>();
+    queueRDD.offer(emptyRDD);
+    JavaInputDStream<WindowedValue<byte[]>> emptyStream =
+        context.getStreamingContext().queueStream(queueRDD, true);

Review comment:
       Nit: it isn't immediately clear what is `true` here (without the help of an IDE). Use an inline comment:
   ```suggestion
           context.getStreamingContext().queueStream(queueRDD, true /* oneAtATime */);
   ```

##########
File path: runners/spark/src/main/java/org/apache/beam/runners/spark/translation/SparkStreamingPortablePipelineTranslator.java
##########
@@ -0,0 +1,393 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.runners.spark.translation;
+
+import static org.apache.beam.runners.fnexecution.translation.PipelineTranslatorUtils.createOutputMap;
+import static org.apache.beam.runners.fnexecution.translation.PipelineTranslatorUtils.getWindowingStrategy;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Queue;
+import java.util.Set;
+import java.util.concurrent.LinkedBlockingQueue;
+import java.util.stream.Collectors;
+import java.util.stream.StreamSupport;
+import org.apache.beam.model.pipeline.v1.RunnerApi;
+import org.apache.beam.runners.core.construction.PTransformTranslation;
+import org.apache.beam.runners.core.construction.graph.ExecutableStage;
+import org.apache.beam.runners.core.construction.graph.PipelineNode;
+import org.apache.beam.runners.core.construction.graph.PipelineNode.PTransformNode;
+import org.apache.beam.runners.core.construction.graph.QueryablePipeline;
+import org.apache.beam.runners.fnexecution.provisioning.JobInfo;
+import org.apache.beam.runners.fnexecution.wire.WireCoders;
+import org.apache.beam.runners.spark.SparkPipelineOptions;
+import org.apache.beam.runners.spark.coders.CoderHelpers;
+import org.apache.beam.runners.spark.metrics.MetricsAccumulator;
+import org.apache.beam.runners.spark.stateful.SparkGroupAlsoByWindowViaWindowSet;
+import org.apache.beam.runners.spark.translation.streaming.UnboundedDataset;
+import org.apache.beam.runners.spark.util.GlobalWatermarkHolder;
+import org.apache.beam.runners.spark.util.SparkCompat;
+import org.apache.beam.sdk.coders.ByteArrayCoder;
+import org.apache.beam.sdk.coders.Coder;
+import org.apache.beam.sdk.coders.KvCoder;
+import org.apache.beam.sdk.transforms.join.RawUnionValue;
+import org.apache.beam.sdk.transforms.windowing.BoundedWindow;
+import org.apache.beam.sdk.transforms.windowing.GlobalWindow;
+import org.apache.beam.sdk.transforms.windowing.PaneInfo;
+import org.apache.beam.sdk.transforms.windowing.WindowFn;
+import org.apache.beam.sdk.util.WindowedValue;
+import org.apache.beam.sdk.values.KV;
+import org.apache.beam.sdk.values.TimestampedValue;
+import org.apache.beam.sdk.values.WindowingStrategy;
+import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.collect.BiMap;
+import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.collect.ImmutableMap;
+import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.collect.Iterables;
+import org.apache.spark.api.java.JavaRDD;
+import org.apache.spark.api.java.JavaSparkContext;
+import org.apache.spark.broadcast.Broadcast;
+import org.apache.spark.storage.StorageLevel;
+import org.apache.spark.streaming.api.java.JavaDStream;
+import org.apache.spark.streaming.api.java.JavaInputDStream;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import scala.Tuple2;
+
+/** Translates a bounded portable pipeline into a Spark job. */
+public class SparkStreamingPortablePipelineTranslator
+    implements SparkPortablePipelineTranslator<SparkStreamingTranslationContext> {
+
+  private static final Logger LOG =
+      LoggerFactory.getLogger(SparkStreamingPortablePipelineTranslator.class);
+
+  private final ImmutableMap<String, PTransformTranslator> urnToTransformTranslator;
+
+  interface PTransformTranslator {
+
+    /** Translates transformNode from Beam into the Spark context. */
+    void translate(
+        PTransformNode transformNode,
+        RunnerApi.Pipeline pipeline,
+        SparkStreamingTranslationContext context);
+  }
+
+  @Override
+  public Set<String> knownUrns() {
+    return urnToTransformTranslator.keySet();
+  }
+
+  public SparkStreamingPortablePipelineTranslator() {
+    ImmutableMap.Builder<String, PTransformTranslator> translatorMap = ImmutableMap.builder();
+    translatorMap.put(
+        PTransformTranslation.IMPULSE_TRANSFORM_URN,
+        SparkStreamingPortablePipelineTranslator::translateImpulse);
+    translatorMap.put(
+        PTransformTranslation.GROUP_BY_KEY_TRANSFORM_URN,
+        SparkStreamingPortablePipelineTranslator::translateGroupByKey);
+    translatorMap.put(
+        ExecutableStage.URN, SparkStreamingPortablePipelineTranslator::translateExecutableStage);
+    translatorMap.put(
+        PTransformTranslation.FLATTEN_TRANSFORM_URN,
+        SparkStreamingPortablePipelineTranslator::translateFlatten);
+    translatorMap.put(
+        PTransformTranslation.RESHUFFLE_URN,
+        SparkStreamingPortablePipelineTranslator::translateReshuffle);
+    this.urnToTransformTranslator = translatorMap.build();
+  }
+
+  /** Translates pipeline from Beam into the Spark context. */
+  @Override
+  public void translate(
+      final RunnerApi.Pipeline pipeline, SparkStreamingTranslationContext context) {
+    QueryablePipeline p =
+        QueryablePipeline.forTransforms(
+            pipeline.getRootTransformIdsList(), pipeline.getComponents());
+    for (PipelineNode.PTransformNode transformNode : p.getTopologicallyOrderedTransforms()) {
+      urnToTransformTranslator
+          .getOrDefault(
+              transformNode.getTransform().getSpec().getUrn(),
+              SparkStreamingPortablePipelineTranslator::urnNotFound)
+          .translate(transformNode, pipeline, context);
+    }
+  }
+
+  private static void urnNotFound(
+      PTransformNode transformNode,
+      RunnerApi.Pipeline pipeline,
+      SparkStreamingTranslationContext context) {
+    throw new IllegalArgumentException(
+        String.format(
+            "Transform %s has unknown URN %s",
+            transformNode.getId(), transformNode.getTransform().getSpec().getUrn()));
+  }
+
+  private static void translateImpulse(
+      PTransformNode transformNode,
+      RunnerApi.Pipeline pipeline,
+      SparkStreamingTranslationContext context) {
+
+    TimestampedValue<byte[]> tsValue = TimestampedValue.atMinimumTimestamp(new byte[0]);
+    Iterable<TimestampedValue<byte[]>> timestampedValues = Collections.singletonList(tsValue);
+    Iterable<WindowedValue<byte[]>> windowedValues =
+        StreamSupport.stream(timestampedValues.spliterator(), false)
+            .map(
+                timestampedValue ->
+                    WindowedValue.of(
+                        timestampedValue.getValue(),
+                        timestampedValue.getTimestamp(),
+                        GlobalWindow.INSTANCE,
+                        PaneInfo.NO_FIRING))
+            .collect(Collectors.toList());
+
+    ByteArrayCoder coder = ByteArrayCoder.of();
+
+    WindowedValue.FullWindowedValueCoder<byte[]> windowCoder =
+        WindowedValue.FullWindowedValueCoder.of(coder, GlobalWindow.Coder.INSTANCE);
+    JavaRDD<WindowedValue<byte[]>> emptyRDD =
+        context
+            .getSparkContext()
+            .parallelize(CoderHelpers.toByteArrays(windowedValues, windowCoder))
+            .map(CoderHelpers.fromByteFunction(windowCoder));
+
+    // create input DStream from RDD queue
+    Queue<JavaRDD<WindowedValue<byte[]>>> queueRDD = new LinkedBlockingQueue<>();
+    queueRDD.offer(emptyRDD);
+    JavaInputDStream<WindowedValue<byte[]>> emptyStream =
+        context.getStreamingContext().queueStream(queueRDD, true);
+
+    UnboundedDataset<byte[]> output =
+        new UnboundedDataset<>(
+            emptyStream, Collections.singletonList(emptyStream.inputDStream().id()));
+
+    GlobalWatermarkHolder.SparkWatermarks sparkWatermark =

Review comment:
       This block needs a comment (though maybe this should wait until we figure out what is going on in failing tests).

##########
File path: runners/spark/src/main/java/org/apache/beam/runners/spark/translation/SparkStreamingPortablePipelineTranslator.java
##########
@@ -0,0 +1,393 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.runners.spark.translation;
+
+import static org.apache.beam.runners.fnexecution.translation.PipelineTranslatorUtils.createOutputMap;
+import static org.apache.beam.runners.fnexecution.translation.PipelineTranslatorUtils.getWindowingStrategy;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Queue;
+import java.util.Set;
+import java.util.concurrent.LinkedBlockingQueue;
+import java.util.stream.Collectors;
+import java.util.stream.StreamSupport;
+import org.apache.beam.model.pipeline.v1.RunnerApi;
+import org.apache.beam.runners.core.construction.PTransformTranslation;
+import org.apache.beam.runners.core.construction.graph.ExecutableStage;
+import org.apache.beam.runners.core.construction.graph.PipelineNode;
+import org.apache.beam.runners.core.construction.graph.PipelineNode.PTransformNode;
+import org.apache.beam.runners.core.construction.graph.QueryablePipeline;
+import org.apache.beam.runners.fnexecution.provisioning.JobInfo;
+import org.apache.beam.runners.fnexecution.wire.WireCoders;
+import org.apache.beam.runners.spark.SparkPipelineOptions;
+import org.apache.beam.runners.spark.coders.CoderHelpers;
+import org.apache.beam.runners.spark.metrics.MetricsAccumulator;
+import org.apache.beam.runners.spark.stateful.SparkGroupAlsoByWindowViaWindowSet;
+import org.apache.beam.runners.spark.translation.streaming.UnboundedDataset;
+import org.apache.beam.runners.spark.util.GlobalWatermarkHolder;
+import org.apache.beam.runners.spark.util.SparkCompat;
+import org.apache.beam.sdk.coders.ByteArrayCoder;
+import org.apache.beam.sdk.coders.Coder;
+import org.apache.beam.sdk.coders.KvCoder;
+import org.apache.beam.sdk.transforms.join.RawUnionValue;
+import org.apache.beam.sdk.transforms.windowing.BoundedWindow;
+import org.apache.beam.sdk.transforms.windowing.GlobalWindow;
+import org.apache.beam.sdk.transforms.windowing.PaneInfo;
+import org.apache.beam.sdk.transforms.windowing.WindowFn;
+import org.apache.beam.sdk.util.WindowedValue;
+import org.apache.beam.sdk.values.KV;
+import org.apache.beam.sdk.values.TimestampedValue;
+import org.apache.beam.sdk.values.WindowingStrategy;
+import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.collect.BiMap;
+import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.collect.ImmutableMap;
+import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.collect.Iterables;
+import org.apache.spark.api.java.JavaRDD;
+import org.apache.spark.api.java.JavaSparkContext;
+import org.apache.spark.broadcast.Broadcast;
+import org.apache.spark.storage.StorageLevel;
+import org.apache.spark.streaming.api.java.JavaDStream;
+import org.apache.spark.streaming.api.java.JavaInputDStream;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import scala.Tuple2;
+
+/** Translates a bounded portable pipeline into a Spark job. */
+public class SparkStreamingPortablePipelineTranslator
+    implements SparkPortablePipelineTranslator<SparkStreamingTranslationContext> {
+
+  private static final Logger LOG =
+      LoggerFactory.getLogger(SparkStreamingPortablePipelineTranslator.class);
+
+  private final ImmutableMap<String, PTransformTranslator> urnToTransformTranslator;
+
+  interface PTransformTranslator {
+
+    /** Translates transformNode from Beam into the Spark context. */
+    void translate(
+        PTransformNode transformNode,
+        RunnerApi.Pipeline pipeline,
+        SparkStreamingTranslationContext context);
+  }
+
+  @Override
+  public Set<String> knownUrns() {
+    return urnToTransformTranslator.keySet();
+  }
+
+  public SparkStreamingPortablePipelineTranslator() {
+    ImmutableMap.Builder<String, PTransformTranslator> translatorMap = ImmutableMap.builder();
+    translatorMap.put(
+        PTransformTranslation.IMPULSE_TRANSFORM_URN,
+        SparkStreamingPortablePipelineTranslator::translateImpulse);
+    translatorMap.put(
+        PTransformTranslation.GROUP_BY_KEY_TRANSFORM_URN,
+        SparkStreamingPortablePipelineTranslator::translateGroupByKey);
+    translatorMap.put(
+        ExecutableStage.URN, SparkStreamingPortablePipelineTranslator::translateExecutableStage);
+    translatorMap.put(
+        PTransformTranslation.FLATTEN_TRANSFORM_URN,
+        SparkStreamingPortablePipelineTranslator::translateFlatten);
+    translatorMap.put(
+        PTransformTranslation.RESHUFFLE_URN,
+        SparkStreamingPortablePipelineTranslator::translateReshuffle);
+    this.urnToTransformTranslator = translatorMap.build();
+  }
+
+  /** Translates pipeline from Beam into the Spark context. */
+  @Override
+  public void translate(
+      final RunnerApi.Pipeline pipeline, SparkStreamingTranslationContext context) {
+    QueryablePipeline p =
+        QueryablePipeline.forTransforms(
+            pipeline.getRootTransformIdsList(), pipeline.getComponents());
+    for (PipelineNode.PTransformNode transformNode : p.getTopologicallyOrderedTransforms()) {
+      urnToTransformTranslator
+          .getOrDefault(
+              transformNode.getTransform().getSpec().getUrn(),
+              SparkStreamingPortablePipelineTranslator::urnNotFound)
+          .translate(transformNode, pipeline, context);
+    }
+  }
+
+  private static void urnNotFound(
+      PTransformNode transformNode,
+      RunnerApi.Pipeline pipeline,
+      SparkStreamingTranslationContext context) {
+    throw new IllegalArgumentException(
+        String.format(
+            "Transform %s has unknown URN %s",
+            transformNode.getId(), transformNode.getTransform().getSpec().getUrn()));
+  }
+
+  private static void translateImpulse(
+      PTransformNode transformNode,
+      RunnerApi.Pipeline pipeline,
+      SparkStreamingTranslationContext context) {
+
+    TimestampedValue<byte[]> tsValue = TimestampedValue.atMinimumTimestamp(new byte[0]);
+    Iterable<TimestampedValue<byte[]>> timestampedValues = Collections.singletonList(tsValue);
+    Iterable<WindowedValue<byte[]>> windowedValues =
+        StreamSupport.stream(timestampedValues.spliterator(), false)
+            .map(
+                timestampedValue ->
+                    WindowedValue.of(
+                        timestampedValue.getValue(),
+                        timestampedValue.getTimestamp(),
+                        GlobalWindow.INSTANCE,
+                        PaneInfo.NO_FIRING))
+            .collect(Collectors.toList());
+
+    ByteArrayCoder coder = ByteArrayCoder.of();
+
+    WindowedValue.FullWindowedValueCoder<byte[]> windowCoder =
+        WindowedValue.FullWindowedValueCoder.of(coder, GlobalWindow.Coder.INSTANCE);
+    JavaRDD<WindowedValue<byte[]>> emptyRDD =
+        context
+            .getSparkContext()
+            .parallelize(CoderHelpers.toByteArrays(windowedValues, windowCoder))
+            .map(CoderHelpers.fromByteFunction(windowCoder));
+
+    // create input DStream from RDD queue
+    Queue<JavaRDD<WindowedValue<byte[]>>> queueRDD = new LinkedBlockingQueue<>();

Review comment:
       Nit: should be named `rddQueue`.

##########
File path: runners/spark/src/main/java/org/apache/beam/runners/spark/translation/SparkStreamingTranslationContext.java
##########
@@ -0,0 +1,50 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.runners.spark.translation;
+
+import org.apache.beam.runners.fnexecution.provisioning.JobInfo;
+import org.apache.beam.runners.spark.SparkPipelineOptions;
+import org.apache.spark.api.java.JavaSparkContext;
+import org.apache.spark.streaming.Duration;
+import org.apache.spark.streaming.api.java.JavaStreamingContext;
+import org.joda.time.Instant;
+
+/**
+ * Translation context used to lazily store Spark data sets during portable pipeline translation and

Review comment:
       Nit: this comment is technically accurate, but it should mention streaming.

##########
File path: runners/spark/src/main/java/org/apache/beam/runners/spark/translation/SparkStreamingPortablePipelineTranslator.java
##########
@@ -0,0 +1,393 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.runners.spark.translation;
+
+import static org.apache.beam.runners.fnexecution.translation.PipelineTranslatorUtils.createOutputMap;
+import static org.apache.beam.runners.fnexecution.translation.PipelineTranslatorUtils.getWindowingStrategy;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Queue;
+import java.util.Set;
+import java.util.concurrent.LinkedBlockingQueue;
+import java.util.stream.Collectors;
+import java.util.stream.StreamSupport;
+import org.apache.beam.model.pipeline.v1.RunnerApi;
+import org.apache.beam.runners.core.construction.PTransformTranslation;
+import org.apache.beam.runners.core.construction.graph.ExecutableStage;
+import org.apache.beam.runners.core.construction.graph.PipelineNode;
+import org.apache.beam.runners.core.construction.graph.PipelineNode.PTransformNode;
+import org.apache.beam.runners.core.construction.graph.QueryablePipeline;
+import org.apache.beam.runners.fnexecution.provisioning.JobInfo;
+import org.apache.beam.runners.fnexecution.wire.WireCoders;
+import org.apache.beam.runners.spark.SparkPipelineOptions;
+import org.apache.beam.runners.spark.coders.CoderHelpers;
+import org.apache.beam.runners.spark.metrics.MetricsAccumulator;
+import org.apache.beam.runners.spark.stateful.SparkGroupAlsoByWindowViaWindowSet;
+import org.apache.beam.runners.spark.translation.streaming.UnboundedDataset;
+import org.apache.beam.runners.spark.util.GlobalWatermarkHolder;
+import org.apache.beam.runners.spark.util.SparkCompat;
+import org.apache.beam.sdk.coders.ByteArrayCoder;
+import org.apache.beam.sdk.coders.Coder;
+import org.apache.beam.sdk.coders.KvCoder;
+import org.apache.beam.sdk.transforms.join.RawUnionValue;
+import org.apache.beam.sdk.transforms.windowing.BoundedWindow;
+import org.apache.beam.sdk.transforms.windowing.GlobalWindow;
+import org.apache.beam.sdk.transforms.windowing.PaneInfo;
+import org.apache.beam.sdk.transforms.windowing.WindowFn;
+import org.apache.beam.sdk.util.WindowedValue;
+import org.apache.beam.sdk.values.KV;
+import org.apache.beam.sdk.values.TimestampedValue;
+import org.apache.beam.sdk.values.WindowingStrategy;
+import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.collect.BiMap;
+import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.collect.ImmutableMap;
+import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.collect.Iterables;
+import org.apache.spark.api.java.JavaRDD;
+import org.apache.spark.api.java.JavaSparkContext;
+import org.apache.spark.broadcast.Broadcast;
+import org.apache.spark.storage.StorageLevel;
+import org.apache.spark.streaming.api.java.JavaDStream;
+import org.apache.spark.streaming.api.java.JavaInputDStream;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import scala.Tuple2;
+
+/** Translates a bounded portable pipeline into a Spark job. */
+public class SparkStreamingPortablePipelineTranslator
+    implements SparkPortablePipelineTranslator<SparkStreamingTranslationContext> {
+
+  private static final Logger LOG =
+      LoggerFactory.getLogger(SparkStreamingPortablePipelineTranslator.class);
+
+  private final ImmutableMap<String, PTransformTranslator> urnToTransformTranslator;
+
+  interface PTransformTranslator {
+
+    /** Translates transformNode from Beam into the Spark context. */
+    void translate(
+        PTransformNode transformNode,
+        RunnerApi.Pipeline pipeline,
+        SparkStreamingTranslationContext context);
+  }
+
+  @Override
+  public Set<String> knownUrns() {
+    return urnToTransformTranslator.keySet();
+  }
+
+  public SparkStreamingPortablePipelineTranslator() {
+    ImmutableMap.Builder<String, PTransformTranslator> translatorMap = ImmutableMap.builder();
+    translatorMap.put(
+        PTransformTranslation.IMPULSE_TRANSFORM_URN,
+        SparkStreamingPortablePipelineTranslator::translateImpulse);
+    translatorMap.put(
+        PTransformTranslation.GROUP_BY_KEY_TRANSFORM_URN,
+        SparkStreamingPortablePipelineTranslator::translateGroupByKey);
+    translatorMap.put(
+        ExecutableStage.URN, SparkStreamingPortablePipelineTranslator::translateExecutableStage);
+    translatorMap.put(
+        PTransformTranslation.FLATTEN_TRANSFORM_URN,
+        SparkStreamingPortablePipelineTranslator::translateFlatten);
+    translatorMap.put(
+        PTransformTranslation.RESHUFFLE_URN,
+        SparkStreamingPortablePipelineTranslator::translateReshuffle);
+    this.urnToTransformTranslator = translatorMap.build();
+  }
+
+  /** Translates pipeline from Beam into the Spark context. */
+  @Override
+  public void translate(
+      final RunnerApi.Pipeline pipeline, SparkStreamingTranslationContext context) {
+    QueryablePipeline p =
+        QueryablePipeline.forTransforms(
+            pipeline.getRootTransformIdsList(), pipeline.getComponents());
+    for (PipelineNode.PTransformNode transformNode : p.getTopologicallyOrderedTransforms()) {
+      urnToTransformTranslator
+          .getOrDefault(
+              transformNode.getTransform().getSpec().getUrn(),
+              SparkStreamingPortablePipelineTranslator::urnNotFound)
+          .translate(transformNode, pipeline, context);
+    }
+  }
+
+  private static void urnNotFound(
+      PTransformNode transformNode,
+      RunnerApi.Pipeline pipeline,
+      SparkStreamingTranslationContext context) {
+    throw new IllegalArgumentException(
+        String.format(
+            "Transform %s has unknown URN %s",
+            transformNode.getId(), transformNode.getTransform().getSpec().getUrn()));
+  }
+
+  private static void translateImpulse(
+      PTransformNode transformNode,
+      RunnerApi.Pipeline pipeline,
+      SparkStreamingTranslationContext context) {
+
+    TimestampedValue<byte[]> tsValue = TimestampedValue.atMinimumTimestamp(new byte[0]);
+    Iterable<TimestampedValue<byte[]>> timestampedValues = Collections.singletonList(tsValue);
+    Iterable<WindowedValue<byte[]>> windowedValues =
+        StreamSupport.stream(timestampedValues.spliterator(), false)
+            .map(
+                timestampedValue ->
+                    WindowedValue.of(
+                        timestampedValue.getValue(),
+                        timestampedValue.getTimestamp(),
+                        GlobalWindow.INSTANCE,
+                        PaneInfo.NO_FIRING))
+            .collect(Collectors.toList());
+
+    ByteArrayCoder coder = ByteArrayCoder.of();
+
+    WindowedValue.FullWindowedValueCoder<byte[]> windowCoder =
+        WindowedValue.FullWindowedValueCoder.of(coder, GlobalWindow.Coder.INSTANCE);
+    JavaRDD<WindowedValue<byte[]>> emptyRDD =
+        context
+            .getSparkContext()
+            .parallelize(CoderHelpers.toByteArrays(windowedValues, windowCoder))
+            .map(CoderHelpers.fromByteFunction(windowCoder));
+
+    // create input DStream from RDD queue
+    Queue<JavaRDD<WindowedValue<byte[]>>> queueRDD = new LinkedBlockingQueue<>();
+    queueRDD.offer(emptyRDD);
+    JavaInputDStream<WindowedValue<byte[]>> emptyStream =
+        context.getStreamingContext().queueStream(queueRDD, true);
+
+    UnboundedDataset<byte[]> output =
+        new UnboundedDataset<>(
+            emptyStream, Collections.singletonList(emptyStream.inputDStream().id()));
+
+    GlobalWatermarkHolder.SparkWatermarks sparkWatermark =
+        new GlobalWatermarkHolder.SparkWatermarks(
+            GlobalWindow.INSTANCE.maxTimestamp(),
+            BoundedWindow.TIMESTAMP_MAX_VALUE,
+            context.getFirstTimestamp());
+    GlobalWatermarkHolder.add(output.getStreamSources().get(0), sparkWatermark);
+
+    context.pushDataset(getOutputId(transformNode), output);
+  }
+
+  private static <K, V> void translateGroupByKey(
+      PTransformNode transformNode,
+      RunnerApi.Pipeline pipeline,
+      SparkStreamingTranslationContext context) {
+    RunnerApi.Components components = pipeline.getComponents();
+    String inputId = getInputId(transformNode);
+    UnboundedDataset<KV<K, V>> inputDataset =
+        (UnboundedDataset<KV<K, V>>) context.popDataset(inputId);
+    List<Integer> streamSources = inputDataset.getStreamSources();
+    JavaDStream<WindowedValue<KV<K, V>>> dStream = inputDataset.getDStream();
+    WindowedValue.WindowedValueCoder<KV<K, V>> inputCoder =
+        getWindowedValueCoder(inputId, components);
+    KvCoder<K, V> inputKvCoder = (KvCoder<K, V>) inputCoder.getValueCoder();
+    Coder<K> inputKeyCoder = inputKvCoder.getKeyCoder();
+    Coder<V> inputValueCoder = inputKvCoder.getValueCoder();
+    final WindowingStrategy windowingStrategy = getWindowingStrategy(inputId, components);
+    final WindowFn<Object, BoundedWindow> windowFn = windowingStrategy.getWindowFn();
+    final WindowedValue.WindowedValueCoder<V> wvCoder =
+        WindowedValue.FullWindowedValueCoder.of(inputValueCoder, windowFn.windowCoder());
+
+    JavaDStream<WindowedValue<KV<K, Iterable<V>>>> outStream =
+        SparkGroupAlsoByWindowViaWindowSet.groupByKeyAndWindow(
+            dStream,
+            inputKeyCoder,
+            wvCoder,
+            windowingStrategy,
+            context.getSerializableOptions(),
+            streamSources,
+            transformNode.getId());
+
+    context.pushDataset(
+        getOutputId(transformNode), new UnboundedDataset<>(outStream, streamSources));
+  }
+
+  private static <InputT, OutputT, SideInputT> void translateExecutableStage(
+      PTransformNode transformNode,
+      RunnerApi.Pipeline pipeline,
+      SparkStreamingTranslationContext context) {
+    RunnerApi.ExecutableStagePayload stagePayload;
+    try {
+      stagePayload =
+          RunnerApi.ExecutableStagePayload.parseFrom(
+              transformNode.getTransform().getSpec().getPayload());
+    } catch (IOException e) {
+      throw new RuntimeException(e);
+    }
+
+    String inputPCollectionId = stagePayload.getInput();
+    UnboundedDataset<InputT> inputDataset =
+        (UnboundedDataset<InputT>) context.popDataset(inputPCollectionId);
+    List<Integer> streamSources = inputDataset.getStreamSources();
+    JavaDStream<WindowedValue<InputT>> inputDStream = inputDataset.getDStream();
+    Map<String, String> outputs = transformNode.getTransform().getOutputsMap();
+    BiMap<String, Integer> outputMap = createOutputMap(outputs.values());
+
+    RunnerApi.Components components = pipeline.getComponents();
+    Coder windowCoder =
+        getWindowingStrategy(inputPCollectionId, components).getWindowFn().windowCoder();
+
+    // TODO: handle side inputs?
+    ImmutableMap<
+            String, Tuple2<Broadcast<List<byte[]>>, WindowedValue.WindowedValueCoder<SideInputT>>>
+        broadcastVariables = ImmutableMap.copyOf(new HashMap<>());
+
+    SparkExecutableStageFunction<InputT, SideInputT> function =
+        new SparkExecutableStageFunction<>(
+            stagePayload,
+            context.jobInfo,
+            outputMap,
+            SparkExecutableStageContextFactory.getInstance(),
+            broadcastVariables,
+            MetricsAccumulator.getInstance(),
+            windowCoder);
+    JavaDStream<RawUnionValue> staged = inputDStream.mapPartitions(function);
+
+    String intermediateId = getExecutableStageIntermediateId(transformNode);
+    context.pushDataset(
+        intermediateId,
+        new Dataset() {
+          @Override
+          public void cache(String storageLevel, Coder<?> coder) {
+            StorageLevel level = StorageLevel.fromString(storageLevel);
+            staged.persist(level);
+          }
+
+          @Override
+          public void action() {
+            // Empty function to force computation of RDD.
+            staged.foreachRDD(TranslationUtils.emptyVoidFunction());
+          }
+
+          @Override
+          public void setName(String name) {
+            // ignore
+          }
+        });
+    // pop dataset to mark RDD as used

Review comment:
       Nit:
   ```suggestion
       // pop dataset to mark DStream as used
   ```

##########
File path: runners/spark/src/test/java/org/apache/beam/runners/spark/SparkRunnerRegistrarTest.java
##########
@@ -37,7 +37,10 @@
   @Test
   public void testOptions() {
     assertEquals(
-        ImmutableList.of(SparkPipelineOptions.class, SparkStructuredStreamingPipelineOptions.class),
+        ImmutableList.of(

Review comment:
       Maybe we don't have to do it in this PR, but I'd be in favor of removing this test. It's kind of pointless to test a constant.

##########
File path: runners/spark/job-server/build.gradle
##########
@@ -82,53 +82,116 @@ runShadow {
     jvmArgs += ["-Dorg.slf4j.simpleLogger.defaultLogLevel=${project.property('logLevel')}"]
 }
 
-def portableValidatesRunnerTask(String name) {
-  createPortableValidatesRunnerTask(
-    name: "validatesPortableRunner${name}",
-    jobServerDriver: "org.apache.beam.runners.spark.SparkJobServerDriver",
-    jobServerConfig: "--job-host=localhost,--job-port=0,--artifact-port=0,--expansion-port=0",
-    testClasspathConfiguration: configurations.validatesPortableRunner,
-    numParallelTests: 4,
-    environment: BeamModulePlugin.PortableValidatesRunnerConfiguration.Environment.EMBEDDED,
-    systemProperties: [
-      "beam.spark.test.reuseSparkContext": "false",
-      "spark.ui.enabled": "false",
-      "spark.ui.showConsoleProgress": "false",
-    ],
-    testCategories: {
-      includeCategories 'org.apache.beam.sdk.testing.ValidatesRunner'
-      excludeCategories 'org.apache.beam.sdk.testing.FlattenWithHeterogeneousCoders'
-      excludeCategories 'org.apache.beam.sdk.testing.LargeKeys$Above100MB'
-      excludeCategories 'org.apache.beam.sdk.testing.UsesCommittedMetrics'
-      excludeCategories 'org.apache.beam.sdk.testing.UsesCustomWindowMerging'
-      excludeCategories 'org.apache.beam.sdk.testing.UsesFailureMessage'
-      excludeCategories 'org.apache.beam.sdk.testing.UsesGaugeMetrics'
-      excludeCategories 'org.apache.beam.sdk.testing.UsesParDoLifecycle'
-      excludeCategories 'org.apache.beam.sdk.testing.UsesMapState'
-      excludeCategories 'org.apache.beam.sdk.testing.UsesSetState'
-      excludeCategories 'org.apache.beam.sdk.testing.UsesOrderedListState'
-      excludeCategories 'org.apache.beam.sdk.testing.UsesTimerMap'
-      excludeCategories 'org.apache.beam.sdk.testing.UsesKeyInParDo'
-      excludeCategories 'org.apache.beam.sdk.testing.UsesOnWindowExpiration'
-      excludeCategories 'org.apache.beam.sdk.testing.UsesTestStream'
-      //SplitableDoFnTests
-      excludeCategories 'org.apache.beam.sdk.testing.UsesBoundedSplittableParDo'
-      excludeCategories 'org.apache.beam.sdk.testing.UsesSplittableParDoWithWindowedSideInputs'
-      excludeCategories 'org.apache.beam.sdk.testing.UsesUnboundedSplittableParDo'
-      excludeCategories 'org.apache.beam.sdk.testing.UsesStrictTimerOrdering'
-      excludeCategories 'org.apache.beam.sdk.testing.UsesBundleFinalizer'
-    },
-    testFilter: {
-      // TODO(BEAM-10094)
-      excludeTestsMatching 'org.apache.beam.sdk.transforms.FlattenTest.testFlattenWithDifferentInputAndOutputCoders2'
-    },
-  )
+def portableValidatesRunnerTask(String name, Boolean streaming) {
+  def pipelineOptions = []
+  if (streaming) {
+    pipelineOptions += "--streaming"
+    pipelineOptions += "--timeout=20000"
+    // exclude unsupported tests
+    createPortableValidatesRunnerTask(
+            name: "validatesPortableRunner${name}",
+            jobServerDriver: "org.apache.beam.runners.spark.SparkJobServerDriver",
+            jobServerConfig: "--job-host=localhost,--job-port=0,--artifact-port=0,--expansion-port=0",
+            testClasspathConfiguration: configurations.validatesPortableRunner,
+            numParallelTests: 4,
+            pipelineOpts: pipelineOptions,
+            environment: BeamModulePlugin.PortableValidatesRunnerConfiguration.Environment.EMBEDDED,
+            systemProperties: [
+                    "beam.spark.test.reuseSparkContext": "false",
+                    "spark.ui.enabled": "false",
+                    "spark.ui.showConsoleProgress": "false",
+            ],
+            testCategories: {
+              includeCategories 'org.apache.beam.sdk.testing.ValidatesRunner'
+              excludeCategories 'org.apache.beam.sdk.testing.FlattenWithHeterogeneousCoders'
+              excludeCategories 'org.apache.beam.sdk.testing.LargeKeys$Above100MB'
+              excludeCategories 'org.apache.beam.sdk.testing.UsesCommittedMetrics'
+              excludeCategories 'org.apache.beam.sdk.testing.UsesCustomWindowMerging'
+              excludeCategories 'org.apache.beam.sdk.testing.UsesFailureMessage'
+              excludeCategories 'org.apache.beam.sdk.testing.UsesGaugeMetrics'
+              excludeCategories 'org.apache.beam.sdk.testing.UsesParDoLifecycle'
+              excludeCategories 'org.apache.beam.sdk.testing.UsesMapState'
+              excludeCategories 'org.apache.beam.sdk.testing.UsesSetState'
+              excludeCategories 'org.apache.beam.sdk.testing.UsesOrderedListState'
+              excludeCategories 'org.apache.beam.sdk.testing.UsesTimerMap'
+              excludeCategories 'org.apache.beam.sdk.testing.UsesKeyInParDo'
+              excludeCategories 'org.apache.beam.sdk.testing.UsesOnWindowExpiration'
+              excludeCategories 'org.apache.beam.sdk.testing.UsesTestStream'
+              //SplitableDoFnTests
+              excludeCategories 'org.apache.beam.sdk.testing.UsesBoundedSplittableParDo'
+              excludeCategories 'org.apache.beam.sdk.testing.UsesSplittableParDoWithWindowedSideInputs'
+              excludeCategories 'org.apache.beam.sdk.testing.UsesUnboundedSplittableParDo'
+              excludeCategories 'org.apache.beam.sdk.testing.UsesStrictTimerOrdering'
+              excludeCategories 'org.apache.beam.sdk.testing.UsesBundleFinalizer'
+              // Tests to exclude when streaming
+              excludeCategories 'org.apache.beam.sdk.testing.UsesSideInputs'
+            },
+            testFilter: {
+              // TODO(BEAM-10094)
+              excludeTestsMatching 'org.apache.beam.sdk.transforms.FlattenTest.testFlattenWithDifferentInputAndOutputCoders2'
+              // Tests to exclude when streaming
+              excludeTestsMatching 'org.apache.beam.sdk.transforms.CombineTest'
+              excludeTestsMatching 'org.apache.beam.sdk.transforms.CombineFnsTest'
+              excludeTestsMatching 'org.apache.beam.sdk.transforms.ParDoTest$MultipleInputsAndOutputTests'
+              excludeTestsMatching 'org.apache.beam.sdk.transforms.ParDoTest$StateCoderInferenceTests'
+              excludeTestsMatching 'org.apache.beam.sdk.transforms.ParDoTest$StateTests'
+              excludeTestsMatching 'org.apache.beam.sdk.transforms.ParDoTest$TimerTests'

Review comment:
       If possible, exclude test categories instead of test classes.

##########
File path: runners/spark/src/main/java/org/apache/beam/runners/spark/translation/SparkStreamingPortablePipelineTranslator.java
##########
@@ -0,0 +1,393 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.runners.spark.translation;
+
+import static org.apache.beam.runners.fnexecution.translation.PipelineTranslatorUtils.createOutputMap;
+import static org.apache.beam.runners.fnexecution.translation.PipelineTranslatorUtils.getWindowingStrategy;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Queue;
+import java.util.Set;
+import java.util.concurrent.LinkedBlockingQueue;
+import java.util.stream.Collectors;
+import java.util.stream.StreamSupport;
+import org.apache.beam.model.pipeline.v1.RunnerApi;
+import org.apache.beam.runners.core.construction.PTransformTranslation;
+import org.apache.beam.runners.core.construction.graph.ExecutableStage;
+import org.apache.beam.runners.core.construction.graph.PipelineNode;
+import org.apache.beam.runners.core.construction.graph.PipelineNode.PTransformNode;
+import org.apache.beam.runners.core.construction.graph.QueryablePipeline;
+import org.apache.beam.runners.fnexecution.provisioning.JobInfo;
+import org.apache.beam.runners.fnexecution.wire.WireCoders;
+import org.apache.beam.runners.spark.SparkPipelineOptions;
+import org.apache.beam.runners.spark.coders.CoderHelpers;
+import org.apache.beam.runners.spark.metrics.MetricsAccumulator;
+import org.apache.beam.runners.spark.stateful.SparkGroupAlsoByWindowViaWindowSet;
+import org.apache.beam.runners.spark.translation.streaming.UnboundedDataset;
+import org.apache.beam.runners.spark.util.GlobalWatermarkHolder;
+import org.apache.beam.runners.spark.util.SparkCompat;
+import org.apache.beam.sdk.coders.ByteArrayCoder;
+import org.apache.beam.sdk.coders.Coder;
+import org.apache.beam.sdk.coders.KvCoder;
+import org.apache.beam.sdk.transforms.join.RawUnionValue;
+import org.apache.beam.sdk.transforms.windowing.BoundedWindow;
+import org.apache.beam.sdk.transforms.windowing.GlobalWindow;
+import org.apache.beam.sdk.transforms.windowing.PaneInfo;
+import org.apache.beam.sdk.transforms.windowing.WindowFn;
+import org.apache.beam.sdk.util.WindowedValue;
+import org.apache.beam.sdk.values.KV;
+import org.apache.beam.sdk.values.TimestampedValue;
+import org.apache.beam.sdk.values.WindowingStrategy;
+import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.collect.BiMap;
+import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.collect.ImmutableMap;
+import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.collect.Iterables;
+import org.apache.spark.api.java.JavaRDD;
+import org.apache.spark.api.java.JavaSparkContext;
+import org.apache.spark.broadcast.Broadcast;
+import org.apache.spark.storage.StorageLevel;
+import org.apache.spark.streaming.api.java.JavaDStream;
+import org.apache.spark.streaming.api.java.JavaInputDStream;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import scala.Tuple2;
+
+/** Translates a bounded portable pipeline into a Spark job. */
+public class SparkStreamingPortablePipelineTranslator
+    implements SparkPortablePipelineTranslator<SparkStreamingTranslationContext> {
+
+  private static final Logger LOG =
+      LoggerFactory.getLogger(SparkStreamingPortablePipelineTranslator.class);
+
+  private final ImmutableMap<String, PTransformTranslator> urnToTransformTranslator;
+
+  interface PTransformTranslator {
+
+    /** Translates transformNode from Beam into the Spark context. */
+    void translate(
+        PTransformNode transformNode,
+        RunnerApi.Pipeline pipeline,
+        SparkStreamingTranslationContext context);
+  }
+
+  @Override
+  public Set<String> knownUrns() {
+    return urnToTransformTranslator.keySet();
+  }
+
+  public SparkStreamingPortablePipelineTranslator() {
+    ImmutableMap.Builder<String, PTransformTranslator> translatorMap = ImmutableMap.builder();
+    translatorMap.put(
+        PTransformTranslation.IMPULSE_TRANSFORM_URN,
+        SparkStreamingPortablePipelineTranslator::translateImpulse);
+    translatorMap.put(
+        PTransformTranslation.GROUP_BY_KEY_TRANSFORM_URN,
+        SparkStreamingPortablePipelineTranslator::translateGroupByKey);
+    translatorMap.put(
+        ExecutableStage.URN, SparkStreamingPortablePipelineTranslator::translateExecutableStage);
+    translatorMap.put(
+        PTransformTranslation.FLATTEN_TRANSFORM_URN,
+        SparkStreamingPortablePipelineTranslator::translateFlatten);
+    translatorMap.put(
+        PTransformTranslation.RESHUFFLE_URN,
+        SparkStreamingPortablePipelineTranslator::translateReshuffle);
+    this.urnToTransformTranslator = translatorMap.build();
+  }
+
+  /** Translates pipeline from Beam into the Spark context. */
+  @Override
+  public void translate(
+      final RunnerApi.Pipeline pipeline, SparkStreamingTranslationContext context) {
+    QueryablePipeline p =
+        QueryablePipeline.forTransforms(
+            pipeline.getRootTransformIdsList(), pipeline.getComponents());
+    for (PipelineNode.PTransformNode transformNode : p.getTopologicallyOrderedTransforms()) {
+      urnToTransformTranslator
+          .getOrDefault(
+              transformNode.getTransform().getSpec().getUrn(),
+              SparkStreamingPortablePipelineTranslator::urnNotFound)
+          .translate(transformNode, pipeline, context);
+    }
+  }
+
+  private static void urnNotFound(
+      PTransformNode transformNode,
+      RunnerApi.Pipeline pipeline,
+      SparkStreamingTranslationContext context) {
+    throw new IllegalArgumentException(
+        String.format(
+            "Transform %s has unknown URN %s",
+            transformNode.getId(), transformNode.getTransform().getSpec().getUrn()));
+  }
+
+  private static void translateImpulse(
+      PTransformNode transformNode,
+      RunnerApi.Pipeline pipeline,
+      SparkStreamingTranslationContext context) {
+
+    TimestampedValue<byte[]> tsValue = TimestampedValue.atMinimumTimestamp(new byte[0]);
+    Iterable<TimestampedValue<byte[]>> timestampedValues = Collections.singletonList(tsValue);
+    Iterable<WindowedValue<byte[]>> windowedValues =
+        StreamSupport.stream(timestampedValues.spliterator(), false)
+            .map(
+                timestampedValue ->
+                    WindowedValue.of(
+                        timestampedValue.getValue(),
+                        timestampedValue.getTimestamp(),
+                        GlobalWindow.INSTANCE,
+                        PaneInfo.NO_FIRING))
+            .collect(Collectors.toList());
+
+    ByteArrayCoder coder = ByteArrayCoder.of();
+
+    WindowedValue.FullWindowedValueCoder<byte[]> windowCoder =
+        WindowedValue.FullWindowedValueCoder.of(coder, GlobalWindow.Coder.INSTANCE);
+    JavaRDD<WindowedValue<byte[]>> emptyRDD =
+        context
+            .getSparkContext()
+            .parallelize(CoderHelpers.toByteArrays(windowedValues, windowCoder))
+            .map(CoderHelpers.fromByteFunction(windowCoder));
+
+    // create input DStream from RDD queue
+    Queue<JavaRDD<WindowedValue<byte[]>>> queueRDD = new LinkedBlockingQueue<>();
+    queueRDD.offer(emptyRDD);
+    JavaInputDStream<WindowedValue<byte[]>> emptyStream =
+        context.getStreamingContext().queueStream(queueRDD, true);
+
+    UnboundedDataset<byte[]> output =
+        new UnboundedDataset<>(
+            emptyStream, Collections.singletonList(emptyStream.inputDStream().id()));
+
+    GlobalWatermarkHolder.SparkWatermarks sparkWatermark =
+        new GlobalWatermarkHolder.SparkWatermarks(
+            GlobalWindow.INSTANCE.maxTimestamp(),
+            BoundedWindow.TIMESTAMP_MAX_VALUE,
+            context.getFirstTimestamp());
+    GlobalWatermarkHolder.add(output.getStreamSources().get(0), sparkWatermark);
+
+    context.pushDataset(getOutputId(transformNode), output);
+  }
+
+  private static <K, V> void translateGroupByKey(
+      PTransformNode transformNode,
+      RunnerApi.Pipeline pipeline,
+      SparkStreamingTranslationContext context) {
+    RunnerApi.Components components = pipeline.getComponents();
+    String inputId = getInputId(transformNode);
+    UnboundedDataset<KV<K, V>> inputDataset =
+        (UnboundedDataset<KV<K, V>>) context.popDataset(inputId);
+    List<Integer> streamSources = inputDataset.getStreamSources();
+    JavaDStream<WindowedValue<KV<K, V>>> dStream = inputDataset.getDStream();
+    WindowedValue.WindowedValueCoder<KV<K, V>> inputCoder =
+        getWindowedValueCoder(inputId, components);
+    KvCoder<K, V> inputKvCoder = (KvCoder<K, V>) inputCoder.getValueCoder();
+    Coder<K> inputKeyCoder = inputKvCoder.getKeyCoder();
+    Coder<V> inputValueCoder = inputKvCoder.getValueCoder();
+    final WindowingStrategy windowingStrategy = getWindowingStrategy(inputId, components);
+    final WindowFn<Object, BoundedWindow> windowFn = windowingStrategy.getWindowFn();
+    final WindowedValue.WindowedValueCoder<V> wvCoder =
+        WindowedValue.FullWindowedValueCoder.of(inputValueCoder, windowFn.windowCoder());
+
+    JavaDStream<WindowedValue<KV<K, Iterable<V>>>> outStream =
+        SparkGroupAlsoByWindowViaWindowSet.groupByKeyAndWindow(
+            dStream,
+            inputKeyCoder,
+            wvCoder,
+            windowingStrategy,
+            context.getSerializableOptions(),
+            streamSources,
+            transformNode.getId());
+
+    context.pushDataset(
+        getOutputId(transformNode), new UnboundedDataset<>(outStream, streamSources));
+  }
+
+  private static <InputT, OutputT, SideInputT> void translateExecutableStage(
+      PTransformNode transformNode,
+      RunnerApi.Pipeline pipeline,
+      SparkStreamingTranslationContext context) {
+    RunnerApi.ExecutableStagePayload stagePayload;
+    try {
+      stagePayload =
+          RunnerApi.ExecutableStagePayload.parseFrom(
+              transformNode.getTransform().getSpec().getPayload());
+    } catch (IOException e) {
+      throw new RuntimeException(e);
+    }
+
+    String inputPCollectionId = stagePayload.getInput();
+    UnboundedDataset<InputT> inputDataset =
+        (UnboundedDataset<InputT>) context.popDataset(inputPCollectionId);
+    List<Integer> streamSources = inputDataset.getStreamSources();
+    JavaDStream<WindowedValue<InputT>> inputDStream = inputDataset.getDStream();
+    Map<String, String> outputs = transformNode.getTransform().getOutputsMap();
+    BiMap<String, Integer> outputMap = createOutputMap(outputs.values());
+
+    RunnerApi.Components components = pipeline.getComponents();
+    Coder windowCoder =
+        getWindowingStrategy(inputPCollectionId, components).getWindowFn().windowCoder();
+
+    // TODO: handle side inputs?
+    ImmutableMap<
+            String, Tuple2<Broadcast<List<byte[]>>, WindowedValue.WindowedValueCoder<SideInputT>>>
+        broadcastVariables = ImmutableMap.copyOf(new HashMap<>());
+
+    SparkExecutableStageFunction<InputT, SideInputT> function =
+        new SparkExecutableStageFunction<>(
+            stagePayload,
+            context.jobInfo,
+            outputMap,
+            SparkExecutableStageContextFactory.getInstance(),
+            broadcastVariables,
+            MetricsAccumulator.getInstance(),
+            windowCoder);
+    JavaDStream<RawUnionValue> staged = inputDStream.mapPartitions(function);
+
+    String intermediateId = getExecutableStageIntermediateId(transformNode);
+    context.pushDataset(
+        intermediateId,
+        new Dataset() {
+          @Override
+          public void cache(String storageLevel, Coder<?> coder) {
+            StorageLevel level = StorageLevel.fromString(storageLevel);
+            staged.persist(level);
+          }
+
+          @Override
+          public void action() {
+            // Empty function to force computation of RDD.
+            staged.foreachRDD(TranslationUtils.emptyVoidFunction());
+          }
+
+          @Override
+          public void setName(String name) {
+            // ignore
+          }
+        });
+    // pop dataset to mark RDD as used
+    context.popDataset(intermediateId);
+
+    for (String outputId : outputs.values()) {
+      JavaDStream<WindowedValue<OutputT>> outStream =
+          staged.flatMap(new SparkExecutableStageExtractionFunction<>(outputMap.get(outputId)));
+      context.pushDataset(outputId, new UnboundedDataset<>(outStream, streamSources));
+    }
+
+    if (outputs.isEmpty()) {
+      // Add sink to ensure all outputs are computed
+      JavaDStream<WindowedValue<OutputT>> outStream =
+          staged.flatMap((rawUnionValue) -> Collections.emptyIterator());
+      context.pushDataset(
+          String.format("EmptyOutputSink_%d", context.nextSinkId()),
+          new UnboundedDataset<>(outStream, streamSources));
+    }
+  }
+
+  private static <T> void translateFlatten(
+      PTransformNode transformNode,
+      RunnerApi.Pipeline pipeline,
+      SparkStreamingTranslationContext context) {
+    Map<String, String> inputsMap = transformNode.getTransform().getInputsMap();
+    JavaDStream<WindowedValue<T>> unifiedStreams;
+    final List<Integer> streamSources = new ArrayList<>();
+
+    if (inputsMap.isEmpty()) {
+      Queue<JavaRDD<WindowedValue<T>>> q = new LinkedBlockingQueue<>();
+      q.offer(context.getSparkContext().emptyRDD());
+      unifiedStreams = context.getStreamingContext().queueStream(q);
+    } else {
+      final List<JavaDStream<WindowedValue<T>>> dStreams = new ArrayList<>();
+      for (String inputId : inputsMap.values()) {
+        Dataset dataset = context.popDataset(inputId);
+        if (dataset instanceof UnboundedDataset) {
+          UnboundedDataset<T> unboundedDataset = (UnboundedDataset<T>) dataset;
+          streamSources.addAll(unboundedDataset.getStreamSources());
+          dStreams.add(unboundedDataset.getDStream());
+        } else {
+          // create a single RDD stream.
+          Queue<JavaRDD<WindowedValue<T>>> q = new LinkedBlockingQueue<>();
+          q.offer(((BoundedDataset) dataset).getRDD());
+          // TODO: this is not recoverable from checkpoint!
+          JavaDStream<WindowedValue<T>> dStream = context.getStreamingContext().queueStream(q);
+          dStreams.add(dStream);
+        }
+      }
+      // Unify streams into a single stream.
+      unifiedStreams = SparkCompat.joinStreams(context.getStreamingContext(), dStreams);
+    }
+
+    context.pushDataset(
+        getOutputId(transformNode), new UnboundedDataset<>(unifiedStreams, streamSources));
+  }
+
+  private static <T> void translateReshuffle(
+      PTransformNode transformNode,
+      RunnerApi.Pipeline pipeline,
+      SparkStreamingTranslationContext context) {
+    String inputId = getInputId(transformNode);
+    UnboundedDataset<T> inputDataset = (UnboundedDataset<T>) context.popDataset(inputId);
+    List<Integer> streamSources = inputDataset.getStreamSources();
+    JavaDStream<WindowedValue<T>> dStream = inputDataset.getDStream();
+    WindowedValue.WindowedValueCoder<T> coder =
+        getWindowedValueCoder(inputId, pipeline.getComponents());
+
+    JavaDStream<WindowedValue<T>> reshuffledStream =
+        dStream.transform(rdd -> GroupCombineFunctions.reshuffle(rdd, coder));
+
+    context.pushDataset(
+        getOutputId(transformNode), new UnboundedDataset<>(reshuffledStream, streamSources));
+  }
+
+  private static String getInputId(PTransformNode transformNode) {
+    return Iterables.getOnlyElement(transformNode.getTransform().getInputsMap().values());
+  }
+
+  private static String getOutputId(PTransformNode transformNode) {
+    return Iterables.getOnlyElement(transformNode.getTransform().getOutputsMap().values());
+  }
+
+  private static String getExecutableStageIntermediateId(PTransformNode transformNode) {
+    return transformNode.getId();
+  }
+
+  private static <T> WindowedValue.WindowedValueCoder<T> getWindowedValueCoder(

Review comment:
       We should move these common methods into a shared class like [`PipelineTranslatorUtils`](https://github.com/apache/beam/blob/master/runners/java-fn-execution/src/main/java/org/apache/beam/runners/fnexecution/translation/PipelineTranslatorUtils.java).

##########
File path: runners/spark/src/main/java/org/apache/beam/runners/spark/translation/SparkStreamingPortablePipelineTranslator.java
##########
@@ -0,0 +1,393 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.runners.spark.translation;
+
+import static org.apache.beam.runners.fnexecution.translation.PipelineTranslatorUtils.createOutputMap;
+import static org.apache.beam.runners.fnexecution.translation.PipelineTranslatorUtils.getWindowingStrategy;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Queue;
+import java.util.Set;
+import java.util.concurrent.LinkedBlockingQueue;
+import java.util.stream.Collectors;
+import java.util.stream.StreamSupport;
+import org.apache.beam.model.pipeline.v1.RunnerApi;
+import org.apache.beam.runners.core.construction.PTransformTranslation;
+import org.apache.beam.runners.core.construction.graph.ExecutableStage;
+import org.apache.beam.runners.core.construction.graph.PipelineNode;
+import org.apache.beam.runners.core.construction.graph.PipelineNode.PTransformNode;
+import org.apache.beam.runners.core.construction.graph.QueryablePipeline;
+import org.apache.beam.runners.fnexecution.provisioning.JobInfo;
+import org.apache.beam.runners.fnexecution.wire.WireCoders;
+import org.apache.beam.runners.spark.SparkPipelineOptions;
+import org.apache.beam.runners.spark.coders.CoderHelpers;
+import org.apache.beam.runners.spark.metrics.MetricsAccumulator;
+import org.apache.beam.runners.spark.stateful.SparkGroupAlsoByWindowViaWindowSet;
+import org.apache.beam.runners.spark.translation.streaming.UnboundedDataset;
+import org.apache.beam.runners.spark.util.GlobalWatermarkHolder;
+import org.apache.beam.runners.spark.util.SparkCompat;
+import org.apache.beam.sdk.coders.ByteArrayCoder;
+import org.apache.beam.sdk.coders.Coder;
+import org.apache.beam.sdk.coders.KvCoder;
+import org.apache.beam.sdk.transforms.join.RawUnionValue;
+import org.apache.beam.sdk.transforms.windowing.BoundedWindow;
+import org.apache.beam.sdk.transforms.windowing.GlobalWindow;
+import org.apache.beam.sdk.transforms.windowing.PaneInfo;
+import org.apache.beam.sdk.transforms.windowing.WindowFn;
+import org.apache.beam.sdk.util.WindowedValue;
+import org.apache.beam.sdk.values.KV;
+import org.apache.beam.sdk.values.TimestampedValue;
+import org.apache.beam.sdk.values.WindowingStrategy;
+import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.collect.BiMap;
+import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.collect.ImmutableMap;
+import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.collect.Iterables;
+import org.apache.spark.api.java.JavaRDD;
+import org.apache.spark.api.java.JavaSparkContext;
+import org.apache.spark.broadcast.Broadcast;
+import org.apache.spark.storage.StorageLevel;
+import org.apache.spark.streaming.api.java.JavaDStream;
+import org.apache.spark.streaming.api.java.JavaInputDStream;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import scala.Tuple2;
+
+/** Translates a bounded portable pipeline into a Spark job. */
+public class SparkStreamingPortablePipelineTranslator
+    implements SparkPortablePipelineTranslator<SparkStreamingTranslationContext> {
+
+  private static final Logger LOG =
+      LoggerFactory.getLogger(SparkStreamingPortablePipelineTranslator.class);
+
+  private final ImmutableMap<String, PTransformTranslator> urnToTransformTranslator;
+
+  interface PTransformTranslator {
+
+    /** Translates transformNode from Beam into the Spark context. */
+    void translate(
+        PTransformNode transformNode,
+        RunnerApi.Pipeline pipeline,
+        SparkStreamingTranslationContext context);
+  }
+
+  @Override
+  public Set<String> knownUrns() {
+    return urnToTransformTranslator.keySet();
+  }
+
+  public SparkStreamingPortablePipelineTranslator() {
+    ImmutableMap.Builder<String, PTransformTranslator> translatorMap = ImmutableMap.builder();
+    translatorMap.put(
+        PTransformTranslation.IMPULSE_TRANSFORM_URN,
+        SparkStreamingPortablePipelineTranslator::translateImpulse);
+    translatorMap.put(
+        PTransformTranslation.GROUP_BY_KEY_TRANSFORM_URN,
+        SparkStreamingPortablePipelineTranslator::translateGroupByKey);
+    translatorMap.put(
+        ExecutableStage.URN, SparkStreamingPortablePipelineTranslator::translateExecutableStage);
+    translatorMap.put(
+        PTransformTranslation.FLATTEN_TRANSFORM_URN,
+        SparkStreamingPortablePipelineTranslator::translateFlatten);
+    translatorMap.put(
+        PTransformTranslation.RESHUFFLE_URN,
+        SparkStreamingPortablePipelineTranslator::translateReshuffle);
+    this.urnToTransformTranslator = translatorMap.build();
+  }
+
+  /** Translates pipeline from Beam into the Spark context. */
+  @Override
+  public void translate(
+      final RunnerApi.Pipeline pipeline, SparkStreamingTranslationContext context) {
+    QueryablePipeline p =
+        QueryablePipeline.forTransforms(
+            pipeline.getRootTransformIdsList(), pipeline.getComponents());
+    for (PipelineNode.PTransformNode transformNode : p.getTopologicallyOrderedTransforms()) {
+      urnToTransformTranslator
+          .getOrDefault(
+              transformNode.getTransform().getSpec().getUrn(),
+              SparkStreamingPortablePipelineTranslator::urnNotFound)
+          .translate(transformNode, pipeline, context);
+    }
+  }
+
+  private static void urnNotFound(
+      PTransformNode transformNode,
+      RunnerApi.Pipeline pipeline,
+      SparkStreamingTranslationContext context) {
+    throw new IllegalArgumentException(
+        String.format(
+            "Transform %s has unknown URN %s",
+            transformNode.getId(), transformNode.getTransform().getSpec().getUrn()));
+  }
+
+  private static void translateImpulse(
+      PTransformNode transformNode,
+      RunnerApi.Pipeline pipeline,
+      SparkStreamingTranslationContext context) {
+
+    TimestampedValue<byte[]> tsValue = TimestampedValue.atMinimumTimestamp(new byte[0]);
+    Iterable<TimestampedValue<byte[]>> timestampedValues = Collections.singletonList(tsValue);
+    Iterable<WindowedValue<byte[]>> windowedValues =
+        StreamSupport.stream(timestampedValues.spliterator(), false)
+            .map(
+                timestampedValue ->
+                    WindowedValue.of(
+                        timestampedValue.getValue(),
+                        timestampedValue.getTimestamp(),
+                        GlobalWindow.INSTANCE,
+                        PaneInfo.NO_FIRING))
+            .collect(Collectors.toList());
+
+    ByteArrayCoder coder = ByteArrayCoder.of();
+
+    WindowedValue.FullWindowedValueCoder<byte[]> windowCoder =
+        WindowedValue.FullWindowedValueCoder.of(coder, GlobalWindow.Coder.INSTANCE);
+    JavaRDD<WindowedValue<byte[]>> emptyRDD =
+        context
+            .getSparkContext()
+            .parallelize(CoderHelpers.toByteArrays(windowedValues, windowCoder))
+            .map(CoderHelpers.fromByteFunction(windowCoder));
+
+    // create input DStream from RDD queue

Review comment:
       Nit: this comment is obvious from the code.

##########
File path: runners/spark/src/main/java/org/apache/beam/runners/spark/translation/SparkStreamingTranslationContext.java
##########
@@ -0,0 +1,50 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.runners.spark.translation;
+
+import org.apache.beam.runners.fnexecution.provisioning.JobInfo;
+import org.apache.beam.runners.spark.SparkPipelineOptions;
+import org.apache.spark.api.java.JavaSparkContext;
+import org.apache.spark.streaming.Duration;
+import org.apache.spark.streaming.api.java.JavaStreamingContext;
+import org.joda.time.Instant;
+
+/**
+ * Translation context used to lazily store Spark data sets during portable pipeline translation and
+ * compute them after translation.
+ */
+public class SparkStreamingTranslationContext extends SparkTranslationContext {
+  private final JavaStreamingContext streamingContext;
+  private final Instant firstTimestamp;

Review comment:
       Nit: `firstTimestamp` sounds a little strange here, since it's the only timestamp around. Maybe `initialTimestamp` or something?

##########
File path: runners/spark/src/main/java/org/apache/beam/runners/spark/translation/SparkStreamingPortablePipelineTranslator.java
##########
@@ -0,0 +1,393 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.runners.spark.translation;
+
+import static org.apache.beam.runners.fnexecution.translation.PipelineTranslatorUtils.createOutputMap;
+import static org.apache.beam.runners.fnexecution.translation.PipelineTranslatorUtils.getWindowingStrategy;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Queue;
+import java.util.Set;
+import java.util.concurrent.LinkedBlockingQueue;
+import java.util.stream.Collectors;
+import java.util.stream.StreamSupport;
+import org.apache.beam.model.pipeline.v1.RunnerApi;
+import org.apache.beam.runners.core.construction.PTransformTranslation;
+import org.apache.beam.runners.core.construction.graph.ExecutableStage;
+import org.apache.beam.runners.core.construction.graph.PipelineNode;
+import org.apache.beam.runners.core.construction.graph.PipelineNode.PTransformNode;
+import org.apache.beam.runners.core.construction.graph.QueryablePipeline;
+import org.apache.beam.runners.fnexecution.provisioning.JobInfo;
+import org.apache.beam.runners.fnexecution.wire.WireCoders;
+import org.apache.beam.runners.spark.SparkPipelineOptions;
+import org.apache.beam.runners.spark.coders.CoderHelpers;
+import org.apache.beam.runners.spark.metrics.MetricsAccumulator;
+import org.apache.beam.runners.spark.stateful.SparkGroupAlsoByWindowViaWindowSet;
+import org.apache.beam.runners.spark.translation.streaming.UnboundedDataset;
+import org.apache.beam.runners.spark.util.GlobalWatermarkHolder;
+import org.apache.beam.runners.spark.util.SparkCompat;
+import org.apache.beam.sdk.coders.ByteArrayCoder;
+import org.apache.beam.sdk.coders.Coder;
+import org.apache.beam.sdk.coders.KvCoder;
+import org.apache.beam.sdk.transforms.join.RawUnionValue;
+import org.apache.beam.sdk.transforms.windowing.BoundedWindow;
+import org.apache.beam.sdk.transforms.windowing.GlobalWindow;
+import org.apache.beam.sdk.transforms.windowing.PaneInfo;
+import org.apache.beam.sdk.transforms.windowing.WindowFn;
+import org.apache.beam.sdk.util.WindowedValue;
+import org.apache.beam.sdk.values.KV;
+import org.apache.beam.sdk.values.TimestampedValue;
+import org.apache.beam.sdk.values.WindowingStrategy;
+import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.collect.BiMap;
+import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.collect.ImmutableMap;
+import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.collect.Iterables;
+import org.apache.spark.api.java.JavaRDD;
+import org.apache.spark.api.java.JavaSparkContext;
+import org.apache.spark.broadcast.Broadcast;
+import org.apache.spark.storage.StorageLevel;
+import org.apache.spark.streaming.api.java.JavaDStream;
+import org.apache.spark.streaming.api.java.JavaInputDStream;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import scala.Tuple2;
+
+/** Translates a bounded portable pipeline into a Spark job. */
+public class SparkStreamingPortablePipelineTranslator
+    implements SparkPortablePipelineTranslator<SparkStreamingTranslationContext> {
+
+  private static final Logger LOG =
+      LoggerFactory.getLogger(SparkStreamingPortablePipelineTranslator.class);
+
+  private final ImmutableMap<String, PTransformTranslator> urnToTransformTranslator;
+
+  interface PTransformTranslator {
+
+    /** Translates transformNode from Beam into the Spark context. */
+    void translate(
+        PTransformNode transformNode,
+        RunnerApi.Pipeline pipeline,
+        SparkStreamingTranslationContext context);
+  }
+
+  @Override
+  public Set<String> knownUrns() {
+    return urnToTransformTranslator.keySet();
+  }
+
+  public SparkStreamingPortablePipelineTranslator() {
+    ImmutableMap.Builder<String, PTransformTranslator> translatorMap = ImmutableMap.builder();
+    translatorMap.put(
+        PTransformTranslation.IMPULSE_TRANSFORM_URN,
+        SparkStreamingPortablePipelineTranslator::translateImpulse);
+    translatorMap.put(
+        PTransformTranslation.GROUP_BY_KEY_TRANSFORM_URN,
+        SparkStreamingPortablePipelineTranslator::translateGroupByKey);
+    translatorMap.put(
+        ExecutableStage.URN, SparkStreamingPortablePipelineTranslator::translateExecutableStage);
+    translatorMap.put(
+        PTransformTranslation.FLATTEN_TRANSFORM_URN,
+        SparkStreamingPortablePipelineTranslator::translateFlatten);
+    translatorMap.put(
+        PTransformTranslation.RESHUFFLE_URN,
+        SparkStreamingPortablePipelineTranslator::translateReshuffle);
+    this.urnToTransformTranslator = translatorMap.build();
+  }
+
+  /** Translates pipeline from Beam into the Spark context. */
+  @Override
+  public void translate(
+      final RunnerApi.Pipeline pipeline, SparkStreamingTranslationContext context) {
+    QueryablePipeline p =
+        QueryablePipeline.forTransforms(
+            pipeline.getRootTransformIdsList(), pipeline.getComponents());
+    for (PipelineNode.PTransformNode transformNode : p.getTopologicallyOrderedTransforms()) {
+      urnToTransformTranslator
+          .getOrDefault(
+              transformNode.getTransform().getSpec().getUrn(),
+              SparkStreamingPortablePipelineTranslator::urnNotFound)
+          .translate(transformNode, pipeline, context);
+    }
+  }
+
+  private static void urnNotFound(
+      PTransformNode transformNode,
+      RunnerApi.Pipeline pipeline,
+      SparkStreamingTranslationContext context) {
+    throw new IllegalArgumentException(
+        String.format(
+            "Transform %s has unknown URN %s",
+            transformNode.getId(), transformNode.getTransform().getSpec().getUrn()));
+  }
+
+  private static void translateImpulse(
+      PTransformNode transformNode,
+      RunnerApi.Pipeline pipeline,
+      SparkStreamingTranslationContext context) {
+
+    TimestampedValue<byte[]> tsValue = TimestampedValue.atMinimumTimestamp(new byte[0]);
+    Iterable<TimestampedValue<byte[]>> timestampedValues = Collections.singletonList(tsValue);
+    Iterable<WindowedValue<byte[]>> windowedValues =
+        StreamSupport.stream(timestampedValues.spliterator(), false)
+            .map(
+                timestampedValue ->
+                    WindowedValue.of(
+                        timestampedValue.getValue(),
+                        timestampedValue.getTimestamp(),
+                        GlobalWindow.INSTANCE,
+                        PaneInfo.NO_FIRING))
+            .collect(Collectors.toList());
+
+    ByteArrayCoder coder = ByteArrayCoder.of();
+
+    WindowedValue.FullWindowedValueCoder<byte[]> windowCoder =
+        WindowedValue.FullWindowedValueCoder.of(coder, GlobalWindow.Coder.INSTANCE);
+    JavaRDD<WindowedValue<byte[]>> emptyRDD =

Review comment:
       Nit: variable naming. An RDD containing an empty byte array is not empty itself.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [beam] ibzib commented on a change in pull request #12157: [BEAM-7587] Spark portable streaming

Posted by GitBox <gi...@apache.org>.
ibzib commented on a change in pull request #12157:
URL: https://github.com/apache/beam/pull/12157#discussion_r471667214



##########
File path: runners/spark/src/main/java/org/apache/beam/runners/spark/SparkPortableStreamingPipelineOptions.java
##########
@@ -0,0 +1,38 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.runners.spark;
+
+import org.apache.beam.sdk.annotations.Experimental;
+import org.apache.beam.sdk.options.Default;
+import org.apache.beam.sdk.options.Description;
+import org.apache.beam.sdk.options.PipelineOptions;
+import org.apache.beam.sdk.options.PortablePipelineOptions;
+
+/**
+ * Spark runner {@link PipelineOptions} handles Spark execution-related configurations, such as the
+ * master address, batch-interval, and other user-related knobs.
+ */
+@Experimental
+public interface SparkPortableStreamingPipelineOptions
+    extends SparkPipelineOptions, PortablePipelineOptions, PipelineOptions {
+  @Description("Timeout for testing Spark portable streaming, in milliseconds.")
+  @Default.Long(-1L)

Review comment:
       It's not really documented anywhere, but I found it in the Spark source. Looks like `awaitTermination()` is indeed equivalent to `awaitTerminationOrTimeout(-1)`.
   
   https://github.com/apache/spark/blob/b03761e3303e979999d4faa5cf4d1719a82e06cb/streaming/src/main/scala/org/apache/spark/streaming/StreamingContext.scala#L637
   
   https://github.com/apache/spark/blob/b03761e3303e979999d4faa5cf4d1719a82e06cb/streaming/src/main/scala/org/apache/spark/streaming/ContextWaiter.scala#L58




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [beam] ibzib commented on pull request #12157: [BEAM-7587] Spark portable streaming

Posted by GitBox <gi...@apache.org>.
ibzib commented on pull request #12157:
URL: https://github.com/apache/beam/pull/12157#issuecomment-652683027


   > I currently have streaming context as a subclass of the original spark translation context, but thinking some more I'm not sure if that totally works, and it might be better to have them both implementing an interface instead?
   
   I don't see any problems currently. If we do find/introduce any inconsistencies or confusing behaviors, we can change it later.


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [beam] ibzib commented on pull request #12157: [BEAM-7587] Spark portable streaming

Posted by GitBox <gi...@apache.org>.
ibzib commented on pull request #12157:
URL: https://github.com/apache/beam/pull/12157#issuecomment-654405149


   Please fix compilation errors:
   
   ```
   > Task :runners:spark:compileJava FAILED
   /home/jenkins/jenkins-slave/workspace/beam_PreCommit_Java_Commit/src/runners/spark/src/main/java/org/apache/beam/runners/spark/translation/SparkStreamingPortablePipelineTranslator.java:52: warning: [MissingOverride] knownUrns implements method in SparkPortablePipelineTranslator; expected @Override
     public Set<String> knownUrns() {
                        ^
       (see http://errorprone.info/bugpattern/MissingOverride)
     Did you mean '@Override public Set<String> knownUrns() {'?
   /home/jenkins/jenkins-slave/workspace/beam_PreCommit_Java_Commit/src/runners/spark/src/main/java/org/apache/beam/runners/spark/translation/SparkStreamingPortablePipelineTranslator.java:77: warning: [MissingOverride] translate implements method in SparkPortablePipelineTranslator; expected @Override
     public void translate(
                 ^
       (see http://errorprone.info/bugpattern/MissingOverride)
     Did you mean '@Override public void translate('?
   /home/jenkins/jenkins-slave/workspace/beam_PreCommit_Java_Commit/src/runners/spark/src/main/java/org/apache/beam/runners/spark/translation/SparkStreamingPortablePipelineTranslator.java:126: warning: [MissingOverride] createTranslationContext implements method in SparkPortablePipelineTranslator; expected @Override
     public SparkStreamingTranslationContext createTranslationContext(
                                             ^
       (see http://errorprone.info/bugpattern/MissingOverride)
     Did you mean '@Override public SparkStreamingTranslationContext createTranslationContext('?
   ```


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [beam] annaqin418 commented on pull request #12157: [BEAM-7587] Spark portable streaming

Posted by GitBox <gi...@apache.org>.
annaqin418 commented on pull request #12157:
URL: https://github.com/apache/beam/pull/12157#issuecomment-655169602


   R: @robertwb 
   This PR is still a while from being ready, but just wanted to tag you as well in my progress.


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [beam] ibzib commented on a change in pull request #12157: [BEAM-7587] Spark portable streaming

Posted by GitBox <gi...@apache.org>.
ibzib commented on a change in pull request #12157:
URL: https://github.com/apache/beam/pull/12157#discussion_r471663049



##########
File path: runners/spark/src/main/java/org/apache/beam/runners/spark/translation/SparkStreamingTranslationContext.java
##########
@@ -0,0 +1,50 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.runners.spark.translation;
+
+import org.apache.beam.runners.fnexecution.provisioning.JobInfo;
+import org.apache.beam.runners.spark.SparkPipelineOptions;
+import org.apache.spark.api.java.JavaSparkContext;
+import org.apache.spark.streaming.Duration;
+import org.apache.spark.streaming.api.java.JavaStreamingContext;
+import org.joda.time.Instant;
+
+/**
+ * Translation context used to lazily store Spark data sets during portable pipeline translation and
+ * compute them after translation.
+ */
+public class SparkStreamingTranslationContext extends SparkTranslationContext {
+  private final JavaStreamingContext streamingContext;
+  private final Instant firstTimestamp;
+
+  public SparkStreamingTranslationContext(
+      JavaSparkContext jsc, SparkPipelineOptions options, JobInfo jobInfo) {
+    super(jsc, options, jobInfo);
+    Duration batchDuration = new Duration(options.getBatchIntervalMillis());
+    this.streamingContext = new JavaStreamingContext(jsc, batchDuration);
+    this.firstTimestamp = new Instant();
+  }
+
+  public JavaStreamingContext getStreamingContext() {

Review comment:
       Ah okay. Disregard then.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [beam] aromanenko-dev commented on pull request #12157: [BEAM-7587] Spark portable streaming

Posted by GitBox <gi...@apache.org>.
aromanenko-dev commented on pull request #12157:
URL: https://github.com/apache/beam/pull/12157#issuecomment-674036749


   @annaqin418 Is this PR WIP?


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [beam] ibzib commented on pull request #12157: [BEAM-7587] Spark portable streaming

Posted by GitBox <gi...@apache.org>.
ibzib commented on pull request #12157:
URL: https://github.com/apache/beam/pull/12157#issuecomment-674252294


   Java precommit failures are unrelated (see BEAM-7865)


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [beam] robertwb commented on pull request #12157: [BEAM-7587] Spark portable streaming

Posted by GitBox <gi...@apache.org>.
robertwb commented on pull request #12157:
URL: https://github.com/apache/beam/pull/12157#issuecomment-678555841


   Run Java Spark PortableValidatesRunner Batch


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [beam] ibzib commented on a change in pull request #12157: [BEAM-7587] Spark portable streaming

Posted by GitBox <gi...@apache.org>.
ibzib commented on a change in pull request #12157:
URL: https://github.com/apache/beam/pull/12157#discussion_r471838797



##########
File path: runners/spark/job-server/build.gradle
##########
@@ -82,20 +82,60 @@ runShadow {
     jvmArgs += ["-Dorg.slf4j.simpleLogger.defaultLogLevel=${project.property('logLevel')}"]
 }
 
-def portableValidatesRunnerTask(String name) {
-  createPortableValidatesRunnerTask(
-    name: "validatesPortableRunner${name}",
-    jobServerDriver: "org.apache.beam.runners.spark.SparkJobServerDriver",
-    jobServerConfig: "--job-host=localhost,--job-port=0,--artifact-port=0,--expansion-port=0",
-    testClasspathConfiguration: configurations.validatesPortableRunner,
-    numParallelTests: 4,
-    environment: BeamModulePlugin.PortableValidatesRunnerConfiguration.Environment.EMBEDDED,
-    systemProperties: [
-      "beam.spark.test.reuseSparkContext": "false",
-      "spark.ui.enabled": "false",
-      "spark.ui.showConsoleProgress": "false",
-    ],
-    testCategories: {
+def portableValidatesRunnerTask(String name, Boolean streaming) {
+  def pipelineOptions = []
+  def testCategories
+  def testFilter
+
+  if (streaming) {
+    pipelineOptions += "--streaming"
+    pipelineOptions += "--streamingTimeoutMs=20000"
+
+    testCategories = {
+      includeCategories 'org.apache.beam.sdk.testing.ValidatesRunner'
+      excludeCategories 'org.apache.beam.sdk.testing.FlattenWithHeterogeneousCoders'
+      excludeCategories 'org.apache.beam.sdk.testing.LargeKeys$Above100MB'
+      excludeCategories 'org.apache.beam.sdk.testing.UsesCommittedMetrics'
+      excludeCategories 'org.apache.beam.sdk.testing.UsesCustomWindowMerging'
+      excludeCategories 'org.apache.beam.sdk.testing.UsesFailureMessage'
+      excludeCategories 'org.apache.beam.sdk.testing.UsesGaugeMetrics'
+      excludeCategories 'org.apache.beam.sdk.testing.UsesParDoLifecycle'
+      excludeCategories 'org.apache.beam.sdk.testing.UsesMapState'
+      excludeCategories 'org.apache.beam.sdk.testing.UsesSetState'
+      excludeCategories 'org.apache.beam.sdk.testing.UsesOrderedListState'
+      excludeCategories 'org.apache.beam.sdk.testing.UsesTimerMap'
+      excludeCategories 'org.apache.beam.sdk.testing.UsesKeyInParDo'
+      excludeCategories 'org.apache.beam.sdk.testing.UsesOnWindowExpiration'
+      excludeCategories 'org.apache.beam.sdk.testing.UsesTestStream'
+      // SplittableDoFnTests
+      excludeCategories 'org.apache.beam.sdk.testing.UsesBoundedSplittableParDo'
+      excludeCategories 'org.apache.beam.sdk.testing.UsesSplittableParDoWithWindowedSideInputs'
+      excludeCategories 'org.apache.beam.sdk.testing.UsesUnboundedSplittableParDo'
+      excludeCategories 'org.apache.beam.sdk.testing.UsesStrictTimerOrdering'
+      excludeCategories 'org.apache.beam.sdk.testing.UsesBundleFinalizer'
+      // Currently unsupported in portable streaming
+      excludeCategories 'org.apache.beam.sdk.testing.UsesSideInputs'

Review comment:
       Link JIRAs for side input, state, and timer support.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [beam] ibzib commented on pull request #12157: [BEAM-7587] Spark portable streaming

Posted by GitBox <gi...@apache.org>.
ibzib commented on pull request #12157:
URL: https://github.com/apache/beam/pull/12157#issuecomment-674252627


   run seed job


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [beam] ibzib commented on a change in pull request #12157: [BEAM-7587] Spark portable streaming

Posted by GitBox <gi...@apache.org>.
ibzib commented on a change in pull request #12157:
URL: https://github.com/apache/beam/pull/12157#discussion_r450521542



##########
File path: runners/spark/job-server/build.gradle
##########
@@ -82,13 +82,21 @@ runShadow {
     jvmArgs += ["-Dorg.slf4j.simpleLogger.defaultLogLevel=${project.property('logLevel')}"]
 }
 
-def portableValidatesRunnerTask(String name) {
+def portableValidatesRunnerTask(String name, Boolean streaming) {
+  def pipelineOptions = [
+          // Limit resource consumption via parallelism
+          "--parallelism=2",

Review comment:
       This option only applies to Flink, it has no meaning in Spark. https://github.com/apache/beam/blob/103facbf80c2e4786e60367161ec59352196d04a/runners/flink/src/main/java/org/apache/beam/runners/flink/FlinkPipelineOptions.java#L74




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [beam] ibzib commented on pull request #12157: [BEAM-7587] Spark portable streaming

Posted by GitBox <gi...@apache.org>.
ibzib commented on pull request #12157:
URL: https://github.com/apache/beam/pull/12157#issuecomment-674225949


   Run Spotless PreCommit


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [beam] annaqin418 commented on a change in pull request #12157: [BEAM-7587] Spark portable streaming

Posted by GitBox <gi...@apache.org>.
annaqin418 commented on a change in pull request #12157:
URL: https://github.com/apache/beam/pull/12157#discussion_r471169567



##########
File path: runners/spark/src/main/java/org/apache/beam/runners/spark/SparkPortableStreamingPipelineOptions.java
##########
@@ -0,0 +1,38 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.runners.spark;
+
+import org.apache.beam.sdk.annotations.Experimental;
+import org.apache.beam.sdk.options.Default;
+import org.apache.beam.sdk.options.Description;
+import org.apache.beam.sdk.options.PipelineOptions;
+import org.apache.beam.sdk.options.PortablePipelineOptions;
+
+/**
+ * Spark runner {@link PipelineOptions} handles Spark execution-related configurations, such as the
+ * master address, batch-interval, and other user-related knobs.
+ */
+@Experimental
+public interface SparkPortableStreamingPipelineOptions
+    extends SparkPipelineOptions, PortablePipelineOptions, PipelineOptions {
+  @Description("Timeout for testing Spark portable streaming, in milliseconds.")
+  @Default.Long(-1L)

Review comment:
       This is a good question, I couldn't find documentation on the proper way to have no timeout. Typically it seems that you would just use two entirely separate methods, awaitTermination() and awaitTerminationOrTimeout(). -1L simply appeared to give the desired behavior...




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [beam] annaqin418 commented on a change in pull request #12157: [BEAM-7587] Spark portable streaming

Posted by GitBox <gi...@apache.org>.
annaqin418 commented on a change in pull request #12157:
URL: https://github.com/apache/beam/pull/12157#discussion_r471198065



##########
File path: runners/spark/src/main/java/org/apache/beam/runners/spark/translation/SparkStreamingTranslationContext.java
##########
@@ -0,0 +1,50 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.runners.spark.translation;
+
+import org.apache.beam.runners.fnexecution.provisioning.JobInfo;
+import org.apache.beam.runners.spark.SparkPipelineOptions;
+import org.apache.spark.api.java.JavaSparkContext;
+import org.apache.spark.streaming.Duration;
+import org.apache.spark.streaming.api.java.JavaStreamingContext;
+import org.joda.time.Instant;
+
+/**
+ * Translation context used to lazily store Spark data sets during portable pipeline translation and
+ * compute them after translation.
+ */
+public class SparkStreamingTranslationContext extends SparkTranslationContext {
+  private final JavaStreamingContext streamingContext;
+  private final Instant firstTimestamp;
+
+  public SparkStreamingTranslationContext(
+      JavaSparkContext jsc, SparkPipelineOptions options, JobInfo jobInfo) {
+    super(jsc, options, jobInfo);
+    Duration batchDuration = new Duration(options.getBatchIntervalMillis());
+    this.streamingContext = new JavaStreamingContext(jsc, batchDuration);
+    this.firstTimestamp = new Instant();
+  }
+
+  public JavaStreamingContext getStreamingContext() {

Review comment:
       I'm not sure I can do so, since the SparkPipelineRunner calls this method but is outside of the translation package? 




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [beam] robertwb commented on a change in pull request #12157: [BEAM-7587] Spark portable streaming

Posted by GitBox <gi...@apache.org>.
robertwb commented on a change in pull request #12157:
URL: https://github.com/apache/beam/pull/12157#discussion_r474389484



##########
File path: runners/spark/src/main/java/org/apache/beam/runners/spark/translation/SparkStreamingPortablePipelineTranslator.java
##########
@@ -0,0 +1,366 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.runners.spark.translation;
+
+import static org.apache.beam.runners.fnexecution.translation.PipelineTranslatorUtils.createOutputMap;
+import static org.apache.beam.runners.fnexecution.translation.PipelineTranslatorUtils.getExecutableStageIntermediateId;
+import static org.apache.beam.runners.fnexecution.translation.PipelineTranslatorUtils.getInputId;
+import static org.apache.beam.runners.fnexecution.translation.PipelineTranslatorUtils.getOutputId;
+import static org.apache.beam.runners.fnexecution.translation.PipelineTranslatorUtils.getWindowedValueCoder;
+import static org.apache.beam.runners.fnexecution.translation.PipelineTranslatorUtils.getWindowingStrategy;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Queue;
+import java.util.Set;
+import java.util.concurrent.LinkedBlockingQueue;
+import java.util.stream.Collectors;
+import java.util.stream.StreamSupport;
+import org.apache.beam.model.pipeline.v1.RunnerApi;
+import org.apache.beam.runners.core.construction.PTransformTranslation;
+import org.apache.beam.runners.core.construction.graph.ExecutableStage;
+import org.apache.beam.runners.core.construction.graph.PipelineNode;
+import org.apache.beam.runners.core.construction.graph.PipelineNode.PTransformNode;
+import org.apache.beam.runners.core.construction.graph.QueryablePipeline;
+import org.apache.beam.runners.fnexecution.provisioning.JobInfo;
+import org.apache.beam.runners.spark.SparkPipelineOptions;
+import org.apache.beam.runners.spark.coders.CoderHelpers;
+import org.apache.beam.runners.spark.metrics.MetricsAccumulator;
+import org.apache.beam.runners.spark.stateful.SparkGroupAlsoByWindowViaWindowSet;
+import org.apache.beam.runners.spark.translation.streaming.UnboundedDataset;
+import org.apache.beam.runners.spark.util.GlobalWatermarkHolder;
+import org.apache.beam.runners.spark.util.SparkCompat;
+import org.apache.beam.sdk.coders.ByteArrayCoder;
+import org.apache.beam.sdk.coders.Coder;
+import org.apache.beam.sdk.coders.KvCoder;
+import org.apache.beam.sdk.transforms.join.RawUnionValue;
+import org.apache.beam.sdk.transforms.windowing.BoundedWindow;
+import org.apache.beam.sdk.transforms.windowing.GlobalWindow;
+import org.apache.beam.sdk.transforms.windowing.PaneInfo;
+import org.apache.beam.sdk.transforms.windowing.WindowFn;
+import org.apache.beam.sdk.util.WindowedValue;
+import org.apache.beam.sdk.values.KV;
+import org.apache.beam.sdk.values.TimestampedValue;
+import org.apache.beam.sdk.values.WindowingStrategy;
+import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.collect.BiMap;
+import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.collect.ImmutableMap;
+import org.apache.spark.api.java.JavaRDD;
+import org.apache.spark.api.java.JavaSparkContext;
+import org.apache.spark.broadcast.Broadcast;
+import org.apache.spark.storage.StorageLevel;
+import org.apache.spark.streaming.api.java.JavaDStream;
+import org.apache.spark.streaming.api.java.JavaInputDStream;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import scala.Tuple2;
+
+/** Translates an unbounded portable pipeline into a Spark job. */
+public class SparkStreamingPortablePipelineTranslator
+    implements SparkPortablePipelineTranslator<SparkStreamingTranslationContext> {
+
+  private static final Logger LOG =
+      LoggerFactory.getLogger(SparkStreamingPortablePipelineTranslator.class);
+
+  private final ImmutableMap<String, PTransformTranslator> urnToTransformTranslator;
+
+  interface PTransformTranslator {
+
+    /** Translates transformNode from Beam into the Spark context. */
+    void translate(
+        PTransformNode transformNode,
+        RunnerApi.Pipeline pipeline,
+        SparkStreamingTranslationContext context);
+  }
+
+  @Override
+  public Set<String> knownUrns() {
+    return urnToTransformTranslator.keySet();
+  }
+
+  public SparkStreamingPortablePipelineTranslator() {
+    ImmutableMap.Builder<String, PTransformTranslator> translatorMap = ImmutableMap.builder();
+    translatorMap.put(
+        PTransformTranslation.IMPULSE_TRANSFORM_URN,
+        SparkStreamingPortablePipelineTranslator::translateImpulse);
+    translatorMap.put(
+        PTransformTranslation.GROUP_BY_KEY_TRANSFORM_URN,
+        SparkStreamingPortablePipelineTranslator::translateGroupByKey);
+    translatorMap.put(
+        ExecutableStage.URN, SparkStreamingPortablePipelineTranslator::translateExecutableStage);
+    translatorMap.put(
+        PTransformTranslation.FLATTEN_TRANSFORM_URN,
+        SparkStreamingPortablePipelineTranslator::translateFlatten);
+    translatorMap.put(
+        PTransformTranslation.RESHUFFLE_URN,
+        SparkStreamingPortablePipelineTranslator::translateReshuffle);
+    this.urnToTransformTranslator = translatorMap.build();
+  }
+
+  /** Translates pipeline from Beam into the Spark context. */
+  @Override
+  public void translate(
+      final RunnerApi.Pipeline pipeline, SparkStreamingTranslationContext context) {
+    QueryablePipeline p =
+        QueryablePipeline.forTransforms(
+            pipeline.getRootTransformIdsList(), pipeline.getComponents());
+    for (PipelineNode.PTransformNode transformNode : p.getTopologicallyOrderedTransforms()) {
+      urnToTransformTranslator
+          .getOrDefault(
+              transformNode.getTransform().getSpec().getUrn(),
+              SparkStreamingPortablePipelineTranslator::urnNotFound)
+          .translate(transformNode, pipeline, context);
+    }
+  }
+
+  private static void urnNotFound(
+      PTransformNode transformNode,
+      RunnerApi.Pipeline pipeline,
+      SparkStreamingTranslationContext context) {
+    throw new IllegalArgumentException(
+        String.format(
+            "Transform %s has unknown URN %s",
+            transformNode.getId(), transformNode.getTransform().getSpec().getUrn()));
+  }
+
+  private static void translateImpulse(
+      PTransformNode transformNode,
+      RunnerApi.Pipeline pipeline,
+      SparkStreamingTranslationContext context) {
+
+    TimestampedValue<byte[]> tsValue = TimestampedValue.atMinimumTimestamp(new byte[0]);
+    Iterable<TimestampedValue<byte[]>> timestampedValues = Collections.singletonList(tsValue);
+    Iterable<WindowedValue<byte[]>> windowedValues =
+        StreamSupport.stream(timestampedValues.spliterator(), false)
+            .map(
+                timestampedValue ->
+                    WindowedValue.of(
+                        timestampedValue.getValue(),
+                        timestampedValue.getTimestamp(),
+                        GlobalWindow.INSTANCE,
+                        PaneInfo.NO_FIRING))
+            .collect(Collectors.toList());
+
+    ByteArrayCoder coder = ByteArrayCoder.of();
+    WindowedValue.FullWindowedValueCoder<byte[]> windowCoder =
+        WindowedValue.FullWindowedValueCoder.of(coder, GlobalWindow.Coder.INSTANCE);
+    JavaRDD<WindowedValue<byte[]>> emptyByteArrayRDD =
+        context
+            .getSparkContext()
+            .parallelize(CoderHelpers.toByteArrays(windowedValues, windowCoder))
+            .map(CoderHelpers.fromByteFunction(windowCoder));
+
+    Queue<JavaRDD<WindowedValue<byte[]>>> rddQueue = new LinkedBlockingQueue<>();
+    rddQueue.offer(emptyByteArrayRDD);
+    JavaInputDStream<WindowedValue<byte[]>> emptyStream =
+        context.getStreamingContext().queueStream(rddQueue, true /* oneAtATime */);
+
+    UnboundedDataset<byte[]> output =
+        new UnboundedDataset<>(
+            emptyStream, Collections.singletonList(emptyStream.inputDStream().id()));
+
+    // Add watermark to holder and advance to infinity to ensure future watermarks can be updated
+    GlobalWatermarkHolder.SparkWatermarks sparkWatermark =
+        new GlobalWatermarkHolder.SparkWatermarks(
+            GlobalWindow.INSTANCE.maxTimestamp(),
+            BoundedWindow.TIMESTAMP_MAX_VALUE,
+            context.getFirstTimestamp());
+    GlobalWatermarkHolder.add(output.getStreamSources().get(0), sparkWatermark);
+
+    context.pushDataset(getOutputId(transformNode), output);
+  }
+
+  private static <K, V> void translateGroupByKey(
+      PTransformNode transformNode,
+      RunnerApi.Pipeline pipeline,
+      SparkStreamingTranslationContext context) {
+    RunnerApi.Components components = pipeline.getComponents();
+    String inputId = getInputId(transformNode);
+    UnboundedDataset<KV<K, V>> inputDataset =
+        (UnboundedDataset<KV<K, V>>) context.popDataset(inputId);
+    List<Integer> streamSources = inputDataset.getStreamSources();
+    JavaDStream<WindowedValue<KV<K, V>>> dStream = inputDataset.getDStream();
+    WindowedValue.WindowedValueCoder<KV<K, V>> inputCoder =
+        getWindowedValueCoder(inputId, components);
+    KvCoder<K, V> inputKvCoder = (KvCoder<K, V>) inputCoder.getValueCoder();
+    Coder<K> inputKeyCoder = inputKvCoder.getKeyCoder();
+    Coder<V> inputValueCoder = inputKvCoder.getValueCoder();
+    final WindowingStrategy windowingStrategy = getWindowingStrategy(inputId, components);

Review comment:
       Be consistent on what's declared final vs. what's not. 

##########
File path: runners/spark/src/main/java/org/apache/beam/runners/spark/translation/SparkStreamingPortablePipelineTranslator.java
##########
@@ -0,0 +1,366 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.runners.spark.translation;
+
+import static org.apache.beam.runners.fnexecution.translation.PipelineTranslatorUtils.createOutputMap;
+import static org.apache.beam.runners.fnexecution.translation.PipelineTranslatorUtils.getExecutableStageIntermediateId;
+import static org.apache.beam.runners.fnexecution.translation.PipelineTranslatorUtils.getInputId;
+import static org.apache.beam.runners.fnexecution.translation.PipelineTranslatorUtils.getOutputId;
+import static org.apache.beam.runners.fnexecution.translation.PipelineTranslatorUtils.getWindowedValueCoder;
+import static org.apache.beam.runners.fnexecution.translation.PipelineTranslatorUtils.getWindowingStrategy;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Queue;
+import java.util.Set;
+import java.util.concurrent.LinkedBlockingQueue;
+import java.util.stream.Collectors;
+import java.util.stream.StreamSupport;
+import org.apache.beam.model.pipeline.v1.RunnerApi;
+import org.apache.beam.runners.core.construction.PTransformTranslation;
+import org.apache.beam.runners.core.construction.graph.ExecutableStage;
+import org.apache.beam.runners.core.construction.graph.PipelineNode;
+import org.apache.beam.runners.core.construction.graph.PipelineNode.PTransformNode;
+import org.apache.beam.runners.core.construction.graph.QueryablePipeline;
+import org.apache.beam.runners.fnexecution.provisioning.JobInfo;
+import org.apache.beam.runners.spark.SparkPipelineOptions;
+import org.apache.beam.runners.spark.coders.CoderHelpers;
+import org.apache.beam.runners.spark.metrics.MetricsAccumulator;
+import org.apache.beam.runners.spark.stateful.SparkGroupAlsoByWindowViaWindowSet;
+import org.apache.beam.runners.spark.translation.streaming.UnboundedDataset;
+import org.apache.beam.runners.spark.util.GlobalWatermarkHolder;
+import org.apache.beam.runners.spark.util.SparkCompat;
+import org.apache.beam.sdk.coders.ByteArrayCoder;
+import org.apache.beam.sdk.coders.Coder;
+import org.apache.beam.sdk.coders.KvCoder;
+import org.apache.beam.sdk.transforms.join.RawUnionValue;
+import org.apache.beam.sdk.transforms.windowing.BoundedWindow;
+import org.apache.beam.sdk.transforms.windowing.GlobalWindow;
+import org.apache.beam.sdk.transforms.windowing.PaneInfo;
+import org.apache.beam.sdk.transforms.windowing.WindowFn;
+import org.apache.beam.sdk.util.WindowedValue;
+import org.apache.beam.sdk.values.KV;
+import org.apache.beam.sdk.values.TimestampedValue;
+import org.apache.beam.sdk.values.WindowingStrategy;
+import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.collect.BiMap;
+import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.collect.ImmutableMap;
+import org.apache.spark.api.java.JavaRDD;
+import org.apache.spark.api.java.JavaSparkContext;
+import org.apache.spark.broadcast.Broadcast;
+import org.apache.spark.storage.StorageLevel;
+import org.apache.spark.streaming.api.java.JavaDStream;
+import org.apache.spark.streaming.api.java.JavaInputDStream;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import scala.Tuple2;
+
+/** Translates an unbounded portable pipeline into a Spark job. */
+public class SparkStreamingPortablePipelineTranslator
+    implements SparkPortablePipelineTranslator<SparkStreamingTranslationContext> {
+
+  private static final Logger LOG =
+      LoggerFactory.getLogger(SparkStreamingPortablePipelineTranslator.class);
+
+  private final ImmutableMap<String, PTransformTranslator> urnToTransformTranslator;
+
+  interface PTransformTranslator {
+
+    /** Translates transformNode from Beam into the Spark context. */
+    void translate(
+        PTransformNode transformNode,
+        RunnerApi.Pipeline pipeline,
+        SparkStreamingTranslationContext context);
+  }
+
+  @Override
+  public Set<String> knownUrns() {
+    return urnToTransformTranslator.keySet();
+  }
+
+  public SparkStreamingPortablePipelineTranslator() {
+    ImmutableMap.Builder<String, PTransformTranslator> translatorMap = ImmutableMap.builder();
+    translatorMap.put(
+        PTransformTranslation.IMPULSE_TRANSFORM_URN,
+        SparkStreamingPortablePipelineTranslator::translateImpulse);
+    translatorMap.put(
+        PTransformTranslation.GROUP_BY_KEY_TRANSFORM_URN,
+        SparkStreamingPortablePipelineTranslator::translateGroupByKey);
+    translatorMap.put(
+        ExecutableStage.URN, SparkStreamingPortablePipelineTranslator::translateExecutableStage);
+    translatorMap.put(
+        PTransformTranslation.FLATTEN_TRANSFORM_URN,
+        SparkStreamingPortablePipelineTranslator::translateFlatten);
+    translatorMap.put(
+        PTransformTranslation.RESHUFFLE_URN,
+        SparkStreamingPortablePipelineTranslator::translateReshuffle);
+    this.urnToTransformTranslator = translatorMap.build();
+  }
+
+  /** Translates pipeline from Beam into the Spark context. */
+  @Override
+  public void translate(
+      final RunnerApi.Pipeline pipeline, SparkStreamingTranslationContext context) {
+    QueryablePipeline p =
+        QueryablePipeline.forTransforms(
+            pipeline.getRootTransformIdsList(), pipeline.getComponents());
+    for (PipelineNode.PTransformNode transformNode : p.getTopologicallyOrderedTransforms()) {
+      urnToTransformTranslator
+          .getOrDefault(
+              transformNode.getTransform().getSpec().getUrn(),
+              SparkStreamingPortablePipelineTranslator::urnNotFound)
+          .translate(transformNode, pipeline, context);
+    }
+  }
+
+  private static void urnNotFound(
+      PTransformNode transformNode,
+      RunnerApi.Pipeline pipeline,
+      SparkStreamingTranslationContext context) {
+    throw new IllegalArgumentException(
+        String.format(
+            "Transform %s has unknown URN %s",
+            transformNode.getId(), transformNode.getTransform().getSpec().getUrn()));
+  }
+
+  private static void translateImpulse(
+      PTransformNode transformNode,
+      RunnerApi.Pipeline pipeline,
+      SparkStreamingTranslationContext context) {
+
+    TimestampedValue<byte[]> tsValue = TimestampedValue.atMinimumTimestamp(new byte[0]);
+    Iterable<TimestampedValue<byte[]>> timestampedValues = Collections.singletonList(tsValue);
+    Iterable<WindowedValue<byte[]>> windowedValues =
+        StreamSupport.stream(timestampedValues.spliterator(), false)
+            .map(
+                timestampedValue ->
+                    WindowedValue.of(
+                        timestampedValue.getValue(),
+                        timestampedValue.getTimestamp(),
+                        GlobalWindow.INSTANCE,
+                        PaneInfo.NO_FIRING))
+            .collect(Collectors.toList());
+
+    ByteArrayCoder coder = ByteArrayCoder.of();
+    WindowedValue.FullWindowedValueCoder<byte[]> windowCoder =
+        WindowedValue.FullWindowedValueCoder.of(coder, GlobalWindow.Coder.INSTANCE);
+    JavaRDD<WindowedValue<byte[]>> emptyByteArrayRDD =
+        context
+            .getSparkContext()
+            .parallelize(CoderHelpers.toByteArrays(windowedValues, windowCoder))
+            .map(CoderHelpers.fromByteFunction(windowCoder));
+
+    Queue<JavaRDD<WindowedValue<byte[]>>> rddQueue = new LinkedBlockingQueue<>();
+    rddQueue.offer(emptyByteArrayRDD);
+    JavaInputDStream<WindowedValue<byte[]>> emptyStream =
+        context.getStreamingContext().queueStream(rddQueue, true /* oneAtATime */);
+
+    UnboundedDataset<byte[]> output =
+        new UnboundedDataset<>(
+            emptyStream, Collections.singletonList(emptyStream.inputDStream().id()));
+
+    // Add watermark to holder and advance to infinity to ensure future watermarks can be updated
+    GlobalWatermarkHolder.SparkWatermarks sparkWatermark =
+        new GlobalWatermarkHolder.SparkWatermarks(
+            GlobalWindow.INSTANCE.maxTimestamp(),
+            BoundedWindow.TIMESTAMP_MAX_VALUE,
+            context.getFirstTimestamp());
+    GlobalWatermarkHolder.add(output.getStreamSources().get(0), sparkWatermark);
+
+    context.pushDataset(getOutputId(transformNode), output);
+  }
+
+  private static <K, V> void translateGroupByKey(
+      PTransformNode transformNode,
+      RunnerApi.Pipeline pipeline,
+      SparkStreamingTranslationContext context) {
+    RunnerApi.Components components = pipeline.getComponents();
+    String inputId = getInputId(transformNode);
+    UnboundedDataset<KV<K, V>> inputDataset =
+        (UnboundedDataset<KV<K, V>>) context.popDataset(inputId);
+    List<Integer> streamSources = inputDataset.getStreamSources();
+    JavaDStream<WindowedValue<KV<K, V>>> dStream = inputDataset.getDStream();
+    WindowedValue.WindowedValueCoder<KV<K, V>> inputCoder =
+        getWindowedValueCoder(inputId, components);
+    KvCoder<K, V> inputKvCoder = (KvCoder<K, V>) inputCoder.getValueCoder();
+    Coder<K> inputKeyCoder = inputKvCoder.getKeyCoder();
+    Coder<V> inputValueCoder = inputKvCoder.getValueCoder();
+    final WindowingStrategy windowingStrategy = getWindowingStrategy(inputId, components);
+    final WindowFn<Object, BoundedWindow> windowFn = windowingStrategy.getWindowFn();
+    final WindowedValue.WindowedValueCoder<V> wvCoder =
+        WindowedValue.FullWindowedValueCoder.of(inputValueCoder, windowFn.windowCoder());
+
+    JavaDStream<WindowedValue<KV<K, Iterable<V>>>> outStream =
+        SparkGroupAlsoByWindowViaWindowSet.groupByKeyAndWindow(
+            dStream,
+            inputKeyCoder,
+            wvCoder,
+            windowingStrategy,
+            context.getSerializableOptions(),
+            streamSources,
+            transformNode.getId());
+
+    context.pushDataset(
+        getOutputId(transformNode), new UnboundedDataset<>(outStream, streamSources));
+  }
+
+  private static <InputT, OutputT, SideInputT> void translateExecutableStage(
+      PTransformNode transformNode,
+      RunnerApi.Pipeline pipeline,
+      SparkStreamingTranslationContext context) {
+    RunnerApi.ExecutableStagePayload stagePayload;
+    try {
+      stagePayload =
+          RunnerApi.ExecutableStagePayload.parseFrom(
+              transformNode.getTransform().getSpec().getPayload());
+    } catch (IOException e) {
+      throw new RuntimeException(e);
+    }
+
+    String inputPCollectionId = stagePayload.getInput();
+    UnboundedDataset<InputT> inputDataset =
+        (UnboundedDataset<InputT>) context.popDataset(inputPCollectionId);
+    List<Integer> streamSources = inputDataset.getStreamSources();
+    JavaDStream<WindowedValue<InputT>> inputDStream = inputDataset.getDStream();
+    Map<String, String> outputs = transformNode.getTransform().getOutputsMap();
+    BiMap<String, Integer> outputMap = createOutputMap(outputs.values());
+
+    RunnerApi.Components components = pipeline.getComponents();
+    Coder windowCoder =
+        getWindowingStrategy(inputPCollectionId, components).getWindowFn().windowCoder();
+
+    // TODO (BEAM-10712): handle side inputs.
+    ImmutableMap<
+            String, Tuple2<Broadcast<List<byte[]>>, WindowedValue.WindowedValueCoder<SideInputT>>>
+        broadcastVariables = ImmutableMap.copyOf(new HashMap<>());
+
+    SparkExecutableStageFunction<InputT, SideInputT> function =
+        new SparkExecutableStageFunction<>(
+            stagePayload,
+            context.jobInfo,
+            outputMap,
+            SparkExecutableStageContextFactory.getInstance(),
+            broadcastVariables,
+            MetricsAccumulator.getInstance(),
+            windowCoder);
+    JavaDStream<RawUnionValue> staged = inputDStream.mapPartitions(function);
+
+    String intermediateId = getExecutableStageIntermediateId(transformNode);
+    context.pushDataset(
+        intermediateId,
+        new Dataset() {
+          @Override
+          public void cache(String storageLevel, Coder<?> coder) {
+            StorageLevel level = StorageLevel.fromString(storageLevel);
+            staged.persist(level);
+          }
+
+          @Override
+          public void action() {
+            // Empty function to force computation of RDD.
+            staged.foreachRDD(TranslationUtils.emptyVoidFunction());
+          }
+
+          @Override
+          public void setName(String name) {
+            // ignore
+          }
+        });
+    // pop dataset to mark DStream as used
+    context.popDataset(intermediateId);
+
+    for (String outputId : outputs.values()) {
+      JavaDStream<WindowedValue<OutputT>> outStream =
+          staged.flatMap(new SparkExecutableStageExtractionFunction<>(outputMap.get(outputId)));
+      context.pushDataset(outputId, new UnboundedDataset<>(outStream, streamSources));
+    }
+
+    if (outputs.isEmpty()) {
+      // Add sink to ensure all outputs are computed

Review comment:
       In this case there are no outputs (per the above line). Perhaps just say something to the effect that we want to ensure the stage is executed. 

##########
File path: runners/spark/src/main/java/org/apache/beam/runners/spark/translation/SparkStreamingPortablePipelineTranslator.java
##########
@@ -0,0 +1,366 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.runners.spark.translation;
+
+import static org.apache.beam.runners.fnexecution.translation.PipelineTranslatorUtils.createOutputMap;
+import static org.apache.beam.runners.fnexecution.translation.PipelineTranslatorUtils.getExecutableStageIntermediateId;
+import static org.apache.beam.runners.fnexecution.translation.PipelineTranslatorUtils.getInputId;
+import static org.apache.beam.runners.fnexecution.translation.PipelineTranslatorUtils.getOutputId;
+import static org.apache.beam.runners.fnexecution.translation.PipelineTranslatorUtils.getWindowedValueCoder;
+import static org.apache.beam.runners.fnexecution.translation.PipelineTranslatorUtils.getWindowingStrategy;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Queue;
+import java.util.Set;
+import java.util.concurrent.LinkedBlockingQueue;
+import java.util.stream.Collectors;
+import java.util.stream.StreamSupport;
+import org.apache.beam.model.pipeline.v1.RunnerApi;
+import org.apache.beam.runners.core.construction.PTransformTranslation;
+import org.apache.beam.runners.core.construction.graph.ExecutableStage;
+import org.apache.beam.runners.core.construction.graph.PipelineNode;
+import org.apache.beam.runners.core.construction.graph.PipelineNode.PTransformNode;
+import org.apache.beam.runners.core.construction.graph.QueryablePipeline;
+import org.apache.beam.runners.fnexecution.provisioning.JobInfo;
+import org.apache.beam.runners.spark.SparkPipelineOptions;
+import org.apache.beam.runners.spark.coders.CoderHelpers;
+import org.apache.beam.runners.spark.metrics.MetricsAccumulator;
+import org.apache.beam.runners.spark.stateful.SparkGroupAlsoByWindowViaWindowSet;
+import org.apache.beam.runners.spark.translation.streaming.UnboundedDataset;
+import org.apache.beam.runners.spark.util.GlobalWatermarkHolder;
+import org.apache.beam.runners.spark.util.SparkCompat;
+import org.apache.beam.sdk.coders.ByteArrayCoder;
+import org.apache.beam.sdk.coders.Coder;
+import org.apache.beam.sdk.coders.KvCoder;
+import org.apache.beam.sdk.transforms.join.RawUnionValue;
+import org.apache.beam.sdk.transforms.windowing.BoundedWindow;
+import org.apache.beam.sdk.transforms.windowing.GlobalWindow;
+import org.apache.beam.sdk.transforms.windowing.PaneInfo;
+import org.apache.beam.sdk.transforms.windowing.WindowFn;
+import org.apache.beam.sdk.util.WindowedValue;
+import org.apache.beam.sdk.values.KV;
+import org.apache.beam.sdk.values.TimestampedValue;
+import org.apache.beam.sdk.values.WindowingStrategy;
+import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.collect.BiMap;
+import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.collect.ImmutableMap;
+import org.apache.spark.api.java.JavaRDD;
+import org.apache.spark.api.java.JavaSparkContext;
+import org.apache.spark.broadcast.Broadcast;
+import org.apache.spark.storage.StorageLevel;
+import org.apache.spark.streaming.api.java.JavaDStream;
+import org.apache.spark.streaming.api.java.JavaInputDStream;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import scala.Tuple2;
+
+/** Translates an unbounded portable pipeline into a Spark job. */
+public class SparkStreamingPortablePipelineTranslator
+    implements SparkPortablePipelineTranslator<SparkStreamingTranslationContext> {
+
+  private static final Logger LOG =
+      LoggerFactory.getLogger(SparkStreamingPortablePipelineTranslator.class);
+
+  private final ImmutableMap<String, PTransformTranslator> urnToTransformTranslator;
+
+  interface PTransformTranslator {
+
+    /** Translates transformNode from Beam into the Spark context. */
+    void translate(
+        PTransformNode transformNode,
+        RunnerApi.Pipeline pipeline,
+        SparkStreamingTranslationContext context);
+  }
+
+  @Override
+  public Set<String> knownUrns() {
+    return urnToTransformTranslator.keySet();
+  }
+
+  public SparkStreamingPortablePipelineTranslator() {
+    ImmutableMap.Builder<String, PTransformTranslator> translatorMap = ImmutableMap.builder();
+    translatorMap.put(
+        PTransformTranslation.IMPULSE_TRANSFORM_URN,
+        SparkStreamingPortablePipelineTranslator::translateImpulse);
+    translatorMap.put(
+        PTransformTranslation.GROUP_BY_KEY_TRANSFORM_URN,
+        SparkStreamingPortablePipelineTranslator::translateGroupByKey);
+    translatorMap.put(
+        ExecutableStage.URN, SparkStreamingPortablePipelineTranslator::translateExecutableStage);
+    translatorMap.put(
+        PTransformTranslation.FLATTEN_TRANSFORM_URN,
+        SparkStreamingPortablePipelineTranslator::translateFlatten);
+    translatorMap.put(
+        PTransformTranslation.RESHUFFLE_URN,
+        SparkStreamingPortablePipelineTranslator::translateReshuffle);
+    this.urnToTransformTranslator = translatorMap.build();
+  }
+
+  /** Translates pipeline from Beam into the Spark context. */
+  @Override
+  public void translate(
+      final RunnerApi.Pipeline pipeline, SparkStreamingTranslationContext context) {
+    QueryablePipeline p =
+        QueryablePipeline.forTransforms(
+            pipeline.getRootTransformIdsList(), pipeline.getComponents());
+    for (PipelineNode.PTransformNode transformNode : p.getTopologicallyOrderedTransforms()) {
+      urnToTransformTranslator
+          .getOrDefault(
+              transformNode.getTransform().getSpec().getUrn(),
+              SparkStreamingPortablePipelineTranslator::urnNotFound)
+          .translate(transformNode, pipeline, context);
+    }
+  }
+
+  private static void urnNotFound(
+      PTransformNode transformNode,
+      RunnerApi.Pipeline pipeline,
+      SparkStreamingTranslationContext context) {
+    throw new IllegalArgumentException(
+        String.format(
+            "Transform %s has unknown URN %s",
+            transformNode.getId(), transformNode.getTransform().getSpec().getUrn()));
+  }
+
+  private static void translateImpulse(
+      PTransformNode transformNode,
+      RunnerApi.Pipeline pipeline,
+      SparkStreamingTranslationContext context) {
+
+    TimestampedValue<byte[]> tsValue = TimestampedValue.atMinimumTimestamp(new byte[0]);
+    Iterable<TimestampedValue<byte[]>> timestampedValues = Collections.singletonList(tsValue);
+    Iterable<WindowedValue<byte[]>> windowedValues =
+        StreamSupport.stream(timestampedValues.spliterator(), false)
+            .map(
+                timestampedValue ->
+                    WindowedValue.of(
+                        timestampedValue.getValue(),
+                        timestampedValue.getTimestamp(),
+                        GlobalWindow.INSTANCE,
+                        PaneInfo.NO_FIRING))
+            .collect(Collectors.toList());
+
+    ByteArrayCoder coder = ByteArrayCoder.of();
+    WindowedValue.FullWindowedValueCoder<byte[]> windowCoder =
+        WindowedValue.FullWindowedValueCoder.of(coder, GlobalWindow.Coder.INSTANCE);
+    JavaRDD<WindowedValue<byte[]>> emptyByteArrayRDD =
+        context
+            .getSparkContext()
+            .parallelize(CoderHelpers.toByteArrays(windowedValues, windowCoder))
+            .map(CoderHelpers.fromByteFunction(windowCoder));
+
+    Queue<JavaRDD<WindowedValue<byte[]>>> rddQueue = new LinkedBlockingQueue<>();
+    rddQueue.offer(emptyByteArrayRDD);
+    JavaInputDStream<WindowedValue<byte[]>> emptyStream =

Review comment:
       This stream isn't empty, is it? 

##########
File path: runners/spark/src/main/java/org/apache/beam/runners/spark/translation/SparkStreamingPortablePipelineTranslator.java
##########
@@ -0,0 +1,366 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.runners.spark.translation;
+
+import static org.apache.beam.runners.fnexecution.translation.PipelineTranslatorUtils.createOutputMap;
+import static org.apache.beam.runners.fnexecution.translation.PipelineTranslatorUtils.getExecutableStageIntermediateId;
+import static org.apache.beam.runners.fnexecution.translation.PipelineTranslatorUtils.getInputId;
+import static org.apache.beam.runners.fnexecution.translation.PipelineTranslatorUtils.getOutputId;
+import static org.apache.beam.runners.fnexecution.translation.PipelineTranslatorUtils.getWindowedValueCoder;
+import static org.apache.beam.runners.fnexecution.translation.PipelineTranslatorUtils.getWindowingStrategy;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Queue;
+import java.util.Set;
+import java.util.concurrent.LinkedBlockingQueue;
+import java.util.stream.Collectors;
+import java.util.stream.StreamSupport;
+import org.apache.beam.model.pipeline.v1.RunnerApi;
+import org.apache.beam.runners.core.construction.PTransformTranslation;
+import org.apache.beam.runners.core.construction.graph.ExecutableStage;
+import org.apache.beam.runners.core.construction.graph.PipelineNode;
+import org.apache.beam.runners.core.construction.graph.PipelineNode.PTransformNode;
+import org.apache.beam.runners.core.construction.graph.QueryablePipeline;
+import org.apache.beam.runners.fnexecution.provisioning.JobInfo;
+import org.apache.beam.runners.spark.SparkPipelineOptions;
+import org.apache.beam.runners.spark.coders.CoderHelpers;
+import org.apache.beam.runners.spark.metrics.MetricsAccumulator;
+import org.apache.beam.runners.spark.stateful.SparkGroupAlsoByWindowViaWindowSet;
+import org.apache.beam.runners.spark.translation.streaming.UnboundedDataset;
+import org.apache.beam.runners.spark.util.GlobalWatermarkHolder;
+import org.apache.beam.runners.spark.util.SparkCompat;
+import org.apache.beam.sdk.coders.ByteArrayCoder;
+import org.apache.beam.sdk.coders.Coder;
+import org.apache.beam.sdk.coders.KvCoder;
+import org.apache.beam.sdk.transforms.join.RawUnionValue;
+import org.apache.beam.sdk.transforms.windowing.BoundedWindow;
+import org.apache.beam.sdk.transforms.windowing.GlobalWindow;
+import org.apache.beam.sdk.transforms.windowing.PaneInfo;
+import org.apache.beam.sdk.transforms.windowing.WindowFn;
+import org.apache.beam.sdk.util.WindowedValue;
+import org.apache.beam.sdk.values.KV;
+import org.apache.beam.sdk.values.TimestampedValue;
+import org.apache.beam.sdk.values.WindowingStrategy;
+import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.collect.BiMap;
+import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.collect.ImmutableMap;
+import org.apache.spark.api.java.JavaRDD;
+import org.apache.spark.api.java.JavaSparkContext;
+import org.apache.spark.broadcast.Broadcast;
+import org.apache.spark.storage.StorageLevel;
+import org.apache.spark.streaming.api.java.JavaDStream;
+import org.apache.spark.streaming.api.java.JavaInputDStream;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import scala.Tuple2;
+
+/** Translates an unbounded portable pipeline into a Spark job. */
+public class SparkStreamingPortablePipelineTranslator
+    implements SparkPortablePipelineTranslator<SparkStreamingTranslationContext> {
+
+  private static final Logger LOG =
+      LoggerFactory.getLogger(SparkStreamingPortablePipelineTranslator.class);
+
+  private final ImmutableMap<String, PTransformTranslator> urnToTransformTranslator;
+
+  interface PTransformTranslator {
+
+    /** Translates transformNode from Beam into the Spark context. */
+    void translate(
+        PTransformNode transformNode,
+        RunnerApi.Pipeline pipeline,
+        SparkStreamingTranslationContext context);
+  }
+
+  @Override
+  public Set<String> knownUrns() {
+    return urnToTransformTranslator.keySet();
+  }
+
+  public SparkStreamingPortablePipelineTranslator() {
+    ImmutableMap.Builder<String, PTransformTranslator> translatorMap = ImmutableMap.builder();
+    translatorMap.put(
+        PTransformTranslation.IMPULSE_TRANSFORM_URN,
+        SparkStreamingPortablePipelineTranslator::translateImpulse);
+    translatorMap.put(
+        PTransformTranslation.GROUP_BY_KEY_TRANSFORM_URN,
+        SparkStreamingPortablePipelineTranslator::translateGroupByKey);
+    translatorMap.put(
+        ExecutableStage.URN, SparkStreamingPortablePipelineTranslator::translateExecutableStage);
+    translatorMap.put(
+        PTransformTranslation.FLATTEN_TRANSFORM_URN,
+        SparkStreamingPortablePipelineTranslator::translateFlatten);
+    translatorMap.put(
+        PTransformTranslation.RESHUFFLE_URN,
+        SparkStreamingPortablePipelineTranslator::translateReshuffle);
+    this.urnToTransformTranslator = translatorMap.build();
+  }
+
+  /** Translates pipeline from Beam into the Spark context. */
+  @Override
+  public void translate(
+      final RunnerApi.Pipeline pipeline, SparkStreamingTranslationContext context) {
+    QueryablePipeline p =
+        QueryablePipeline.forTransforms(
+            pipeline.getRootTransformIdsList(), pipeline.getComponents());
+    for (PipelineNode.PTransformNode transformNode : p.getTopologicallyOrderedTransforms()) {
+      urnToTransformTranslator
+          .getOrDefault(
+              transformNode.getTransform().getSpec().getUrn(),
+              SparkStreamingPortablePipelineTranslator::urnNotFound)
+          .translate(transformNode, pipeline, context);
+    }
+  }
+
+  private static void urnNotFound(
+      PTransformNode transformNode,
+      RunnerApi.Pipeline pipeline,
+      SparkStreamingTranslationContext context) {
+    throw new IllegalArgumentException(
+        String.format(
+            "Transform %s has unknown URN %s",
+            transformNode.getId(), transformNode.getTransform().getSpec().getUrn()));
+  }
+
+  private static void translateImpulse(
+      PTransformNode transformNode,
+      RunnerApi.Pipeline pipeline,
+      SparkStreamingTranslationContext context) {
+
+    TimestampedValue<byte[]> tsValue = TimestampedValue.atMinimumTimestamp(new byte[0]);
+    Iterable<TimestampedValue<byte[]>> timestampedValues = Collections.singletonList(tsValue);
+    Iterable<WindowedValue<byte[]>> windowedValues =
+        StreamSupport.stream(timestampedValues.spliterator(), false)
+            .map(
+                timestampedValue ->
+                    WindowedValue.of(
+                        timestampedValue.getValue(),
+                        timestampedValue.getTimestamp(),
+                        GlobalWindow.INSTANCE,
+                        PaneInfo.NO_FIRING))
+            .collect(Collectors.toList());
+
+    ByteArrayCoder coder = ByteArrayCoder.of();
+    WindowedValue.FullWindowedValueCoder<byte[]> windowCoder =
+        WindowedValue.FullWindowedValueCoder.of(coder, GlobalWindow.Coder.INSTANCE);
+    JavaRDD<WindowedValue<byte[]>> emptyByteArrayRDD =
+        context
+            .getSparkContext()
+            .parallelize(CoderHelpers.toByteArrays(windowedValues, windowCoder))
+            .map(CoderHelpers.fromByteFunction(windowCoder));
+
+    Queue<JavaRDD<WindowedValue<byte[]>>> rddQueue = new LinkedBlockingQueue<>();
+    rddQueue.offer(emptyByteArrayRDD);
+    JavaInputDStream<WindowedValue<byte[]>> emptyStream =
+        context.getStreamingContext().queueStream(rddQueue, true /* oneAtATime */);
+
+    UnboundedDataset<byte[]> output =
+        new UnboundedDataset<>(
+            emptyStream, Collections.singletonList(emptyStream.inputDStream().id()));
+
+    // Add watermark to holder and advance to infinity to ensure future watermarks can be updated
+    GlobalWatermarkHolder.SparkWatermarks sparkWatermark =
+        new GlobalWatermarkHolder.SparkWatermarks(
+            GlobalWindow.INSTANCE.maxTimestamp(),
+            BoundedWindow.TIMESTAMP_MAX_VALUE,
+            context.getFirstTimestamp());
+    GlobalWatermarkHolder.add(output.getStreamSources().get(0), sparkWatermark);
+
+    context.pushDataset(getOutputId(transformNode), output);
+  }
+
+  private static <K, V> void translateGroupByKey(
+      PTransformNode transformNode,
+      RunnerApi.Pipeline pipeline,
+      SparkStreamingTranslationContext context) {
+    RunnerApi.Components components = pipeline.getComponents();
+    String inputId = getInputId(transformNode);
+    UnboundedDataset<KV<K, V>> inputDataset =
+        (UnboundedDataset<KV<K, V>>) context.popDataset(inputId);
+    List<Integer> streamSources = inputDataset.getStreamSources();
+    JavaDStream<WindowedValue<KV<K, V>>> dStream = inputDataset.getDStream();
+    WindowedValue.WindowedValueCoder<KV<K, V>> inputCoder =
+        getWindowedValueCoder(inputId, components);
+    KvCoder<K, V> inputKvCoder = (KvCoder<K, V>) inputCoder.getValueCoder();
+    Coder<K> inputKeyCoder = inputKvCoder.getKeyCoder();
+    Coder<V> inputValueCoder = inputKvCoder.getValueCoder();
+    final WindowingStrategy windowingStrategy = getWindowingStrategy(inputId, components);
+    final WindowFn<Object, BoundedWindow> windowFn = windowingStrategy.getWindowFn();
+    final WindowedValue.WindowedValueCoder<V> wvCoder =
+        WindowedValue.FullWindowedValueCoder.of(inputValueCoder, windowFn.windowCoder());
+
+    JavaDStream<WindowedValue<KV<K, Iterable<V>>>> outStream =
+        SparkGroupAlsoByWindowViaWindowSet.groupByKeyAndWindow(
+            dStream,
+            inputKeyCoder,
+            wvCoder,
+            windowingStrategy,
+            context.getSerializableOptions(),
+            streamSources,
+            transformNode.getId());
+
+    context.pushDataset(
+        getOutputId(transformNode), new UnboundedDataset<>(outStream, streamSources));
+  }
+
+  private static <InputT, OutputT, SideInputT> void translateExecutableStage(
+      PTransformNode transformNode,
+      RunnerApi.Pipeline pipeline,
+      SparkStreamingTranslationContext context) {
+    RunnerApi.ExecutableStagePayload stagePayload;
+    try {
+      stagePayload =
+          RunnerApi.ExecutableStagePayload.parseFrom(
+              transformNode.getTransform().getSpec().getPayload());
+    } catch (IOException e) {
+      throw new RuntimeException(e);
+    }
+
+    String inputPCollectionId = stagePayload.getInput();
+    UnboundedDataset<InputT> inputDataset =
+        (UnboundedDataset<InputT>) context.popDataset(inputPCollectionId);
+    List<Integer> streamSources = inputDataset.getStreamSources();
+    JavaDStream<WindowedValue<InputT>> inputDStream = inputDataset.getDStream();

Review comment:
       Nit: I'm noticing the pattern of assigning a huge number of locals at the top of a function and then only using them once below. Often (not always) the code would be more concise if the declarations were used inline (e.g. use `inputDataset.getDStream()` rather than assign to an intermediate `inputDStream`). 

##########
File path: runners/spark/src/main/java/org/apache/beam/runners/spark/translation/SparkStreamingPortablePipelineTranslator.java
##########
@@ -0,0 +1,366 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.runners.spark.translation;
+
+import static org.apache.beam.runners.fnexecution.translation.PipelineTranslatorUtils.createOutputMap;
+import static org.apache.beam.runners.fnexecution.translation.PipelineTranslatorUtils.getExecutableStageIntermediateId;
+import static org.apache.beam.runners.fnexecution.translation.PipelineTranslatorUtils.getInputId;
+import static org.apache.beam.runners.fnexecution.translation.PipelineTranslatorUtils.getOutputId;
+import static org.apache.beam.runners.fnexecution.translation.PipelineTranslatorUtils.getWindowedValueCoder;
+import static org.apache.beam.runners.fnexecution.translation.PipelineTranslatorUtils.getWindowingStrategy;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Queue;
+import java.util.Set;
+import java.util.concurrent.LinkedBlockingQueue;
+import java.util.stream.Collectors;
+import java.util.stream.StreamSupport;
+import org.apache.beam.model.pipeline.v1.RunnerApi;
+import org.apache.beam.runners.core.construction.PTransformTranslation;
+import org.apache.beam.runners.core.construction.graph.ExecutableStage;
+import org.apache.beam.runners.core.construction.graph.PipelineNode;
+import org.apache.beam.runners.core.construction.graph.PipelineNode.PTransformNode;
+import org.apache.beam.runners.core.construction.graph.QueryablePipeline;
+import org.apache.beam.runners.fnexecution.provisioning.JobInfo;
+import org.apache.beam.runners.spark.SparkPipelineOptions;
+import org.apache.beam.runners.spark.coders.CoderHelpers;
+import org.apache.beam.runners.spark.metrics.MetricsAccumulator;
+import org.apache.beam.runners.spark.stateful.SparkGroupAlsoByWindowViaWindowSet;
+import org.apache.beam.runners.spark.translation.streaming.UnboundedDataset;
+import org.apache.beam.runners.spark.util.GlobalWatermarkHolder;
+import org.apache.beam.runners.spark.util.SparkCompat;
+import org.apache.beam.sdk.coders.ByteArrayCoder;
+import org.apache.beam.sdk.coders.Coder;
+import org.apache.beam.sdk.coders.KvCoder;
+import org.apache.beam.sdk.transforms.join.RawUnionValue;
+import org.apache.beam.sdk.transforms.windowing.BoundedWindow;
+import org.apache.beam.sdk.transforms.windowing.GlobalWindow;
+import org.apache.beam.sdk.transforms.windowing.PaneInfo;
+import org.apache.beam.sdk.transforms.windowing.WindowFn;
+import org.apache.beam.sdk.util.WindowedValue;
+import org.apache.beam.sdk.values.KV;
+import org.apache.beam.sdk.values.TimestampedValue;
+import org.apache.beam.sdk.values.WindowingStrategy;
+import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.collect.BiMap;
+import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.collect.ImmutableMap;
+import org.apache.spark.api.java.JavaRDD;
+import org.apache.spark.api.java.JavaSparkContext;
+import org.apache.spark.broadcast.Broadcast;
+import org.apache.spark.storage.StorageLevel;
+import org.apache.spark.streaming.api.java.JavaDStream;
+import org.apache.spark.streaming.api.java.JavaInputDStream;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import scala.Tuple2;
+
+/** Translates an unbounded portable pipeline into a Spark job. */
+public class SparkStreamingPortablePipelineTranslator
+    implements SparkPortablePipelineTranslator<SparkStreamingTranslationContext> {
+
+  private static final Logger LOG =
+      LoggerFactory.getLogger(SparkStreamingPortablePipelineTranslator.class);
+
+  private final ImmutableMap<String, PTransformTranslator> urnToTransformTranslator;
+
+  interface PTransformTranslator {
+
+    /** Translates transformNode from Beam into the Spark context. */
+    void translate(
+        PTransformNode transformNode,
+        RunnerApi.Pipeline pipeline,
+        SparkStreamingTranslationContext context);
+  }
+
+  @Override
+  public Set<String> knownUrns() {
+    return urnToTransformTranslator.keySet();
+  }
+
+  public SparkStreamingPortablePipelineTranslator() {
+    ImmutableMap.Builder<String, PTransformTranslator> translatorMap = ImmutableMap.builder();
+    translatorMap.put(
+        PTransformTranslation.IMPULSE_TRANSFORM_URN,
+        SparkStreamingPortablePipelineTranslator::translateImpulse);
+    translatorMap.put(
+        PTransformTranslation.GROUP_BY_KEY_TRANSFORM_URN,
+        SparkStreamingPortablePipelineTranslator::translateGroupByKey);
+    translatorMap.put(
+        ExecutableStage.URN, SparkStreamingPortablePipelineTranslator::translateExecutableStage);
+    translatorMap.put(
+        PTransformTranslation.FLATTEN_TRANSFORM_URN,
+        SparkStreamingPortablePipelineTranslator::translateFlatten);
+    translatorMap.put(
+        PTransformTranslation.RESHUFFLE_URN,
+        SparkStreamingPortablePipelineTranslator::translateReshuffle);
+    this.urnToTransformTranslator = translatorMap.build();
+  }
+
+  /** Translates pipeline from Beam into the Spark context. */
+  @Override
+  public void translate(
+      final RunnerApi.Pipeline pipeline, SparkStreamingTranslationContext context) {
+    QueryablePipeline p =
+        QueryablePipeline.forTransforms(
+            pipeline.getRootTransformIdsList(), pipeline.getComponents());
+    for (PipelineNode.PTransformNode transformNode : p.getTopologicallyOrderedTransforms()) {
+      urnToTransformTranslator
+          .getOrDefault(
+              transformNode.getTransform().getSpec().getUrn(),
+              SparkStreamingPortablePipelineTranslator::urnNotFound)
+          .translate(transformNode, pipeline, context);
+    }
+  }
+
+  private static void urnNotFound(
+      PTransformNode transformNode,
+      RunnerApi.Pipeline pipeline,
+      SparkStreamingTranslationContext context) {
+    throw new IllegalArgumentException(
+        String.format(
+            "Transform %s has unknown URN %s",
+            transformNode.getId(), transformNode.getTransform().getSpec().getUrn()));
+  }
+
+  private static void translateImpulse(
+      PTransformNode transformNode,
+      RunnerApi.Pipeline pipeline,
+      SparkStreamingTranslationContext context) {
+
+    TimestampedValue<byte[]> tsValue = TimestampedValue.atMinimumTimestamp(new byte[0]);
+    Iterable<TimestampedValue<byte[]>> timestampedValues = Collections.singletonList(tsValue);
+    Iterable<WindowedValue<byte[]>> windowedValues =
+        StreamSupport.stream(timestampedValues.spliterator(), false)
+            .map(
+                timestampedValue ->
+                    WindowedValue.of(
+                        timestampedValue.getValue(),
+                        timestampedValue.getTimestamp(),
+                        GlobalWindow.INSTANCE,
+                        PaneInfo.NO_FIRING))
+            .collect(Collectors.toList());
+
+    ByteArrayCoder coder = ByteArrayCoder.of();
+    WindowedValue.FullWindowedValueCoder<byte[]> windowCoder =
+        WindowedValue.FullWindowedValueCoder.of(coder, GlobalWindow.Coder.INSTANCE);
+    JavaRDD<WindowedValue<byte[]>> emptyByteArrayRDD =
+        context
+            .getSparkContext()
+            .parallelize(CoderHelpers.toByteArrays(windowedValues, windowCoder))
+            .map(CoderHelpers.fromByteFunction(windowCoder));
+
+    Queue<JavaRDD<WindowedValue<byte[]>>> rddQueue = new LinkedBlockingQueue<>();
+    rddQueue.offer(emptyByteArrayRDD);
+    JavaInputDStream<WindowedValue<byte[]>> emptyStream =
+        context.getStreamingContext().queueStream(rddQueue, true /* oneAtATime */);
+
+    UnboundedDataset<byte[]> output =
+        new UnboundedDataset<>(
+            emptyStream, Collections.singletonList(emptyStream.inputDStream().id()));
+
+    // Add watermark to holder and advance to infinity to ensure future watermarks can be updated
+    GlobalWatermarkHolder.SparkWatermarks sparkWatermark =
+        new GlobalWatermarkHolder.SparkWatermarks(
+            GlobalWindow.INSTANCE.maxTimestamp(),
+            BoundedWindow.TIMESTAMP_MAX_VALUE,
+            context.getFirstTimestamp());
+    GlobalWatermarkHolder.add(output.getStreamSources().get(0), sparkWatermark);
+
+    context.pushDataset(getOutputId(transformNode), output);
+  }
+
+  private static <K, V> void translateGroupByKey(
+      PTransformNode transformNode,
+      RunnerApi.Pipeline pipeline,
+      SparkStreamingTranslationContext context) {
+    RunnerApi.Components components = pipeline.getComponents();
+    String inputId = getInputId(transformNode);
+    UnboundedDataset<KV<K, V>> inputDataset =
+        (UnboundedDataset<KV<K, V>>) context.popDataset(inputId);
+    List<Integer> streamSources = inputDataset.getStreamSources();
+    JavaDStream<WindowedValue<KV<K, V>>> dStream = inputDataset.getDStream();
+    WindowedValue.WindowedValueCoder<KV<K, V>> inputCoder =
+        getWindowedValueCoder(inputId, components);
+    KvCoder<K, V> inputKvCoder = (KvCoder<K, V>) inputCoder.getValueCoder();
+    Coder<K> inputKeyCoder = inputKvCoder.getKeyCoder();
+    Coder<V> inputValueCoder = inputKvCoder.getValueCoder();
+    final WindowingStrategy windowingStrategy = getWindowingStrategy(inputId, components);
+    final WindowFn<Object, BoundedWindow> windowFn = windowingStrategy.getWindowFn();
+    final WindowedValue.WindowedValueCoder<V> wvCoder =
+        WindowedValue.FullWindowedValueCoder.of(inputValueCoder, windowFn.windowCoder());
+
+    JavaDStream<WindowedValue<KV<K, Iterable<V>>>> outStream =
+        SparkGroupAlsoByWindowViaWindowSet.groupByKeyAndWindow(
+            dStream,
+            inputKeyCoder,
+            wvCoder,
+            windowingStrategy,
+            context.getSerializableOptions(),
+            streamSources,
+            transformNode.getId());
+
+    context.pushDataset(
+        getOutputId(transformNode), new UnboundedDataset<>(outStream, streamSources));
+  }
+
+  private static <InputT, OutputT, SideInputT> void translateExecutableStage(
+      PTransformNode transformNode,
+      RunnerApi.Pipeline pipeline,
+      SparkStreamingTranslationContext context) {
+    RunnerApi.ExecutableStagePayload stagePayload;
+    try {
+      stagePayload =
+          RunnerApi.ExecutableStagePayload.parseFrom(
+              transformNode.getTransform().getSpec().getPayload());
+    } catch (IOException e) {
+      throw new RuntimeException(e);
+    }
+
+    String inputPCollectionId = stagePayload.getInput();
+    UnboundedDataset<InputT> inputDataset =
+        (UnboundedDataset<InputT>) context.popDataset(inputPCollectionId);
+    List<Integer> streamSources = inputDataset.getStreamSources();
+    JavaDStream<WindowedValue<InputT>> inputDStream = inputDataset.getDStream();
+    Map<String, String> outputs = transformNode.getTransform().getOutputsMap();
+    BiMap<String, Integer> outputMap = createOutputMap(outputs.values());
+
+    RunnerApi.Components components = pipeline.getComponents();
+    Coder windowCoder =
+        getWindowingStrategy(inputPCollectionId, components).getWindowFn().windowCoder();
+
+    // TODO (BEAM-10712): handle side inputs.
+    ImmutableMap<
+            String, Tuple2<Broadcast<List<byte[]>>, WindowedValue.WindowedValueCoder<SideInputT>>>
+        broadcastVariables = ImmutableMap.copyOf(new HashMap<>());
+
+    SparkExecutableStageFunction<InputT, SideInputT> function =
+        new SparkExecutableStageFunction<>(
+            stagePayload,
+            context.jobInfo,
+            outputMap,
+            SparkExecutableStageContextFactory.getInstance(),
+            broadcastVariables,
+            MetricsAccumulator.getInstance(),
+            windowCoder);
+    JavaDStream<RawUnionValue> staged = inputDStream.mapPartitions(function);
+
+    String intermediateId = getExecutableStageIntermediateId(transformNode);
+    context.pushDataset(
+        intermediateId,
+        new Dataset() {
+          @Override
+          public void cache(String storageLevel, Coder<?> coder) {
+            StorageLevel level = StorageLevel.fromString(storageLevel);
+            staged.persist(level);
+          }
+
+          @Override
+          public void action() {
+            // Empty function to force computation of RDD.
+            staged.foreachRDD(TranslationUtils.emptyVoidFunction());
+          }
+
+          @Override
+          public void setName(String name) {
+            // ignore
+          }
+        });
+    // pop dataset to mark DStream as used
+    context.popDataset(intermediateId);
+
+    for (String outputId : outputs.values()) {
+      JavaDStream<WindowedValue<OutputT>> outStream =
+          staged.flatMap(new SparkExecutableStageExtractionFunction<>(outputMap.get(outputId)));
+      context.pushDataset(outputId, new UnboundedDataset<>(outStream, streamSources));
+    }
+
+    if (outputs.isEmpty()) {
+      // Add sink to ensure all outputs are computed
+      JavaDStream<WindowedValue<OutputT>> outStream =
+          staged.flatMap((rawUnionValue) -> Collections.emptyIterator());
+      context.pushDataset(
+          String.format("EmptyOutputSink_%d", context.nextSinkId()),
+          new UnboundedDataset<>(outStream, streamSources));
+    }
+  }
+
+  private static <T> void translateFlatten(
+      PTransformNode transformNode,
+      RunnerApi.Pipeline pipeline,
+      SparkStreamingTranslationContext context) {
+    Map<String, String> inputsMap = transformNode.getTransform().getInputsMap();
+    JavaDStream<WindowedValue<T>> unifiedStreams;
+    final List<Integer> streamSources = new ArrayList<>();
+
+    if (inputsMap.isEmpty()) {
+      Queue<JavaRDD<WindowedValue<T>>> q = new LinkedBlockingQueue<>();
+      q.offer(context.getSparkContext().emptyRDD());
+      unifiedStreams = context.getStreamingContext().queueStream(q);
+    } else {
+      final List<JavaDStream<WindowedValue<T>>> dStreams = new ArrayList<>();
+      for (String inputId : inputsMap.values()) {
+        Dataset dataset = context.popDataset(inputId);
+        if (dataset instanceof UnboundedDataset) {
+          UnboundedDataset<T> unboundedDataset = (UnboundedDataset<T>) dataset;
+          streamSources.addAll(unboundedDataset.getStreamSources());
+          dStreams.add(unboundedDataset.getDStream());
+        } else {
+          // create a single RDD stream.
+          Queue<JavaRDD<WindowedValue<T>>> q = new LinkedBlockingQueue<>();
+          q.offer(((BoundedDataset) dataset).getRDD());
+          // TODO: this is not recoverable from checkpoint!

Review comment:
       File and reference a JIRA. 

##########
File path: runners/spark/src/main/java/org/apache/beam/runners/spark/translation/SparkStreamingPortablePipelineTranslator.java
##########
@@ -0,0 +1,366 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.runners.spark.translation;
+
+import static org.apache.beam.runners.fnexecution.translation.PipelineTranslatorUtils.createOutputMap;
+import static org.apache.beam.runners.fnexecution.translation.PipelineTranslatorUtils.getExecutableStageIntermediateId;
+import static org.apache.beam.runners.fnexecution.translation.PipelineTranslatorUtils.getInputId;
+import static org.apache.beam.runners.fnexecution.translation.PipelineTranslatorUtils.getOutputId;
+import static org.apache.beam.runners.fnexecution.translation.PipelineTranslatorUtils.getWindowedValueCoder;
+import static org.apache.beam.runners.fnexecution.translation.PipelineTranslatorUtils.getWindowingStrategy;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Queue;
+import java.util.Set;
+import java.util.concurrent.LinkedBlockingQueue;
+import java.util.stream.Collectors;
+import java.util.stream.StreamSupport;
+import org.apache.beam.model.pipeline.v1.RunnerApi;
+import org.apache.beam.runners.core.construction.PTransformTranslation;
+import org.apache.beam.runners.core.construction.graph.ExecutableStage;
+import org.apache.beam.runners.core.construction.graph.PipelineNode;
+import org.apache.beam.runners.core.construction.graph.PipelineNode.PTransformNode;
+import org.apache.beam.runners.core.construction.graph.QueryablePipeline;
+import org.apache.beam.runners.fnexecution.provisioning.JobInfo;
+import org.apache.beam.runners.spark.SparkPipelineOptions;
+import org.apache.beam.runners.spark.coders.CoderHelpers;
+import org.apache.beam.runners.spark.metrics.MetricsAccumulator;
+import org.apache.beam.runners.spark.stateful.SparkGroupAlsoByWindowViaWindowSet;
+import org.apache.beam.runners.spark.translation.streaming.UnboundedDataset;
+import org.apache.beam.runners.spark.util.GlobalWatermarkHolder;
+import org.apache.beam.runners.spark.util.SparkCompat;
+import org.apache.beam.sdk.coders.ByteArrayCoder;
+import org.apache.beam.sdk.coders.Coder;
+import org.apache.beam.sdk.coders.KvCoder;
+import org.apache.beam.sdk.transforms.join.RawUnionValue;
+import org.apache.beam.sdk.transforms.windowing.BoundedWindow;
+import org.apache.beam.sdk.transforms.windowing.GlobalWindow;
+import org.apache.beam.sdk.transforms.windowing.PaneInfo;
+import org.apache.beam.sdk.transforms.windowing.WindowFn;
+import org.apache.beam.sdk.util.WindowedValue;
+import org.apache.beam.sdk.values.KV;
+import org.apache.beam.sdk.values.TimestampedValue;
+import org.apache.beam.sdk.values.WindowingStrategy;
+import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.collect.BiMap;
+import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.collect.ImmutableMap;
+import org.apache.spark.api.java.JavaRDD;
+import org.apache.spark.api.java.JavaSparkContext;
+import org.apache.spark.broadcast.Broadcast;
+import org.apache.spark.storage.StorageLevel;
+import org.apache.spark.streaming.api.java.JavaDStream;
+import org.apache.spark.streaming.api.java.JavaInputDStream;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import scala.Tuple2;
+
+/** Translates an unbounded portable pipeline into a Spark job. */
+public class SparkStreamingPortablePipelineTranslator
+    implements SparkPortablePipelineTranslator<SparkStreamingTranslationContext> {
+
+  private static final Logger LOG =
+      LoggerFactory.getLogger(SparkStreamingPortablePipelineTranslator.class);
+
+  private final ImmutableMap<String, PTransformTranslator> urnToTransformTranslator;
+
+  interface PTransformTranslator {
+
+    /** Translates transformNode from Beam into the Spark context. */
+    void translate(
+        PTransformNode transformNode,
+        RunnerApi.Pipeline pipeline,
+        SparkStreamingTranslationContext context);
+  }
+
+  @Override
+  public Set<String> knownUrns() {
+    return urnToTransformTranslator.keySet();
+  }
+
+  public SparkStreamingPortablePipelineTranslator() {
+    ImmutableMap.Builder<String, PTransformTranslator> translatorMap = ImmutableMap.builder();
+    translatorMap.put(
+        PTransformTranslation.IMPULSE_TRANSFORM_URN,
+        SparkStreamingPortablePipelineTranslator::translateImpulse);
+    translatorMap.put(
+        PTransformTranslation.GROUP_BY_KEY_TRANSFORM_URN,
+        SparkStreamingPortablePipelineTranslator::translateGroupByKey);
+    translatorMap.put(
+        ExecutableStage.URN, SparkStreamingPortablePipelineTranslator::translateExecutableStage);
+    translatorMap.put(
+        PTransformTranslation.FLATTEN_TRANSFORM_URN,
+        SparkStreamingPortablePipelineTranslator::translateFlatten);
+    translatorMap.put(
+        PTransformTranslation.RESHUFFLE_URN,
+        SparkStreamingPortablePipelineTranslator::translateReshuffle);
+    this.urnToTransformTranslator = translatorMap.build();
+  }
+
+  /** Translates pipeline from Beam into the Spark context. */
+  @Override
+  public void translate(
+      final RunnerApi.Pipeline pipeline, SparkStreamingTranslationContext context) {
+    QueryablePipeline p =
+        QueryablePipeline.forTransforms(
+            pipeline.getRootTransformIdsList(), pipeline.getComponents());
+    for (PipelineNode.PTransformNode transformNode : p.getTopologicallyOrderedTransforms()) {
+      urnToTransformTranslator
+          .getOrDefault(
+              transformNode.getTransform().getSpec().getUrn(),
+              SparkStreamingPortablePipelineTranslator::urnNotFound)
+          .translate(transformNode, pipeline, context);
+    }
+  }
+
+  private static void urnNotFound(
+      PTransformNode transformNode,
+      RunnerApi.Pipeline pipeline,
+      SparkStreamingTranslationContext context) {
+    throw new IllegalArgumentException(
+        String.format(
+            "Transform %s has unknown URN %s",
+            transformNode.getId(), transformNode.getTransform().getSpec().getUrn()));
+  }
+
+  private static void translateImpulse(
+      PTransformNode transformNode,
+      RunnerApi.Pipeline pipeline,
+      SparkStreamingTranslationContext context) {
+
+    TimestampedValue<byte[]> tsValue = TimestampedValue.atMinimumTimestamp(new byte[0]);
+    Iterable<TimestampedValue<byte[]>> timestampedValues = Collections.singletonList(tsValue);
+    Iterable<WindowedValue<byte[]>> windowedValues =
+        StreamSupport.stream(timestampedValues.spliterator(), false)
+            .map(
+                timestampedValue ->
+                    WindowedValue.of(
+                        timestampedValue.getValue(),
+                        timestampedValue.getTimestamp(),
+                        GlobalWindow.INSTANCE,
+                        PaneInfo.NO_FIRING))
+            .collect(Collectors.toList());
+
+    ByteArrayCoder coder = ByteArrayCoder.of();
+    WindowedValue.FullWindowedValueCoder<byte[]> windowCoder =
+        WindowedValue.FullWindowedValueCoder.of(coder, GlobalWindow.Coder.INSTANCE);
+    JavaRDD<WindowedValue<byte[]>> emptyByteArrayRDD =
+        context
+            .getSparkContext()
+            .parallelize(CoderHelpers.toByteArrays(windowedValues, windowCoder))

Review comment:
       You could even defer the singleton iterable wrapping to here and use `windowCoder.encode(...)` rather than use `CoderHelpers.toByteArrays`.

##########
File path: runners/spark/src/main/java/org/apache/beam/runners/spark/translation/SparkStreamingPortablePipelineTranslator.java
##########
@@ -0,0 +1,366 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.runners.spark.translation;
+
+import static org.apache.beam.runners.fnexecution.translation.PipelineTranslatorUtils.createOutputMap;
+import static org.apache.beam.runners.fnexecution.translation.PipelineTranslatorUtils.getExecutableStageIntermediateId;
+import static org.apache.beam.runners.fnexecution.translation.PipelineTranslatorUtils.getInputId;
+import static org.apache.beam.runners.fnexecution.translation.PipelineTranslatorUtils.getOutputId;
+import static org.apache.beam.runners.fnexecution.translation.PipelineTranslatorUtils.getWindowedValueCoder;
+import static org.apache.beam.runners.fnexecution.translation.PipelineTranslatorUtils.getWindowingStrategy;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Queue;
+import java.util.Set;
+import java.util.concurrent.LinkedBlockingQueue;
+import java.util.stream.Collectors;
+import java.util.stream.StreamSupport;
+import org.apache.beam.model.pipeline.v1.RunnerApi;
+import org.apache.beam.runners.core.construction.PTransformTranslation;
+import org.apache.beam.runners.core.construction.graph.ExecutableStage;
+import org.apache.beam.runners.core.construction.graph.PipelineNode;
+import org.apache.beam.runners.core.construction.graph.PipelineNode.PTransformNode;
+import org.apache.beam.runners.core.construction.graph.QueryablePipeline;
+import org.apache.beam.runners.fnexecution.provisioning.JobInfo;
+import org.apache.beam.runners.spark.SparkPipelineOptions;
+import org.apache.beam.runners.spark.coders.CoderHelpers;
+import org.apache.beam.runners.spark.metrics.MetricsAccumulator;
+import org.apache.beam.runners.spark.stateful.SparkGroupAlsoByWindowViaWindowSet;
+import org.apache.beam.runners.spark.translation.streaming.UnboundedDataset;
+import org.apache.beam.runners.spark.util.GlobalWatermarkHolder;
+import org.apache.beam.runners.spark.util.SparkCompat;
+import org.apache.beam.sdk.coders.ByteArrayCoder;
+import org.apache.beam.sdk.coders.Coder;
+import org.apache.beam.sdk.coders.KvCoder;
+import org.apache.beam.sdk.transforms.join.RawUnionValue;
+import org.apache.beam.sdk.transforms.windowing.BoundedWindow;
+import org.apache.beam.sdk.transforms.windowing.GlobalWindow;
+import org.apache.beam.sdk.transforms.windowing.PaneInfo;
+import org.apache.beam.sdk.transforms.windowing.WindowFn;
+import org.apache.beam.sdk.util.WindowedValue;
+import org.apache.beam.sdk.values.KV;
+import org.apache.beam.sdk.values.TimestampedValue;
+import org.apache.beam.sdk.values.WindowingStrategy;
+import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.collect.BiMap;
+import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.collect.ImmutableMap;
+import org.apache.spark.api.java.JavaRDD;
+import org.apache.spark.api.java.JavaSparkContext;
+import org.apache.spark.broadcast.Broadcast;
+import org.apache.spark.storage.StorageLevel;
+import org.apache.spark.streaming.api.java.JavaDStream;
+import org.apache.spark.streaming.api.java.JavaInputDStream;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import scala.Tuple2;
+
+/** Translates an unbounded portable pipeline into a Spark job. */
+public class SparkStreamingPortablePipelineTranslator
+    implements SparkPortablePipelineTranslator<SparkStreamingTranslationContext> {
+
+  private static final Logger LOG =
+      LoggerFactory.getLogger(SparkStreamingPortablePipelineTranslator.class);
+
+  private final ImmutableMap<String, PTransformTranslator> urnToTransformTranslator;
+
+  interface PTransformTranslator {
+
+    /** Translates transformNode from Beam into the Spark context. */
+    void translate(
+        PTransformNode transformNode,
+        RunnerApi.Pipeline pipeline,
+        SparkStreamingTranslationContext context);
+  }
+
+  @Override
+  public Set<String> knownUrns() {
+    return urnToTransformTranslator.keySet();
+  }
+
+  public SparkStreamingPortablePipelineTranslator() {
+    ImmutableMap.Builder<String, PTransformTranslator> translatorMap = ImmutableMap.builder();
+    translatorMap.put(
+        PTransformTranslation.IMPULSE_TRANSFORM_URN,
+        SparkStreamingPortablePipelineTranslator::translateImpulse);
+    translatorMap.put(
+        PTransformTranslation.GROUP_BY_KEY_TRANSFORM_URN,
+        SparkStreamingPortablePipelineTranslator::translateGroupByKey);
+    translatorMap.put(
+        ExecutableStage.URN, SparkStreamingPortablePipelineTranslator::translateExecutableStage);
+    translatorMap.put(
+        PTransformTranslation.FLATTEN_TRANSFORM_URN,
+        SparkStreamingPortablePipelineTranslator::translateFlatten);
+    translatorMap.put(
+        PTransformTranslation.RESHUFFLE_URN,
+        SparkStreamingPortablePipelineTranslator::translateReshuffle);
+    this.urnToTransformTranslator = translatorMap.build();
+  }
+
+  /** Translates pipeline from Beam into the Spark context. */
+  @Override
+  public void translate(
+      final RunnerApi.Pipeline pipeline, SparkStreamingTranslationContext context) {
+    QueryablePipeline p =
+        QueryablePipeline.forTransforms(
+            pipeline.getRootTransformIdsList(), pipeline.getComponents());
+    for (PipelineNode.PTransformNode transformNode : p.getTopologicallyOrderedTransforms()) {
+      urnToTransformTranslator
+          .getOrDefault(
+              transformNode.getTransform().getSpec().getUrn(),
+              SparkStreamingPortablePipelineTranslator::urnNotFound)
+          .translate(transformNode, pipeline, context);
+    }
+  }
+
+  private static void urnNotFound(
+      PTransformNode transformNode,
+      RunnerApi.Pipeline pipeline,
+      SparkStreamingTranslationContext context) {
+    throw new IllegalArgumentException(
+        String.format(
+            "Transform %s has unknown URN %s",
+            transformNode.getId(), transformNode.getTransform().getSpec().getUrn()));
+  }
+
+  private static void translateImpulse(
+      PTransformNode transformNode,
+      RunnerApi.Pipeline pipeline,
+      SparkStreamingTranslationContext context) {
+
+    TimestampedValue<byte[]> tsValue = TimestampedValue.atMinimumTimestamp(new byte[0]);
+    Iterable<TimestampedValue<byte[]>> timestampedValues = Collections.singletonList(tsValue);
+    Iterable<WindowedValue<byte[]>> windowedValues =
+        StreamSupport.stream(timestampedValues.spliterator(), false)
+            .map(
+                timestampedValue ->
+                    WindowedValue.of(
+                        timestampedValue.getValue(),
+                        timestampedValue.getTimestamp(),
+                        GlobalWindow.INSTANCE,
+                        PaneInfo.NO_FIRING))
+            .collect(Collectors.toList());
+
+    ByteArrayCoder coder = ByteArrayCoder.of();
+    WindowedValue.FullWindowedValueCoder<byte[]> windowCoder =
+        WindowedValue.FullWindowedValueCoder.of(coder, GlobalWindow.Coder.INSTANCE);
+    JavaRDD<WindowedValue<byte[]>> emptyByteArrayRDD =
+        context
+            .getSparkContext()
+            .parallelize(CoderHelpers.toByteArrays(windowedValues, windowCoder))
+            .map(CoderHelpers.fromByteFunction(windowCoder));
+
+    Queue<JavaRDD<WindowedValue<byte[]>>> rddQueue = new LinkedBlockingQueue<>();
+    rddQueue.offer(emptyByteArrayRDD);
+    JavaInputDStream<WindowedValue<byte[]>> emptyStream =
+        context.getStreamingContext().queueStream(rddQueue, true /* oneAtATime */);
+
+    UnboundedDataset<byte[]> output =
+        new UnboundedDataset<>(
+            emptyStream, Collections.singletonList(emptyStream.inputDStream().id()));
+
+    // Add watermark to holder and advance to infinity to ensure future watermarks can be updated
+    GlobalWatermarkHolder.SparkWatermarks sparkWatermark =
+        new GlobalWatermarkHolder.SparkWatermarks(
+            GlobalWindow.INSTANCE.maxTimestamp(),
+            BoundedWindow.TIMESTAMP_MAX_VALUE,
+            context.getFirstTimestamp());
+    GlobalWatermarkHolder.add(output.getStreamSources().get(0), sparkWatermark);

Review comment:
       I have to admit I'm a bit fuzzy on how this holds back (or doesn't) the watermark. 

##########
File path: runners/spark/src/main/java/org/apache/beam/runners/spark/translation/SparkStreamingPortablePipelineTranslator.java
##########
@@ -0,0 +1,366 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.runners.spark.translation;
+
+import static org.apache.beam.runners.fnexecution.translation.PipelineTranslatorUtils.createOutputMap;
+import static org.apache.beam.runners.fnexecution.translation.PipelineTranslatorUtils.getExecutableStageIntermediateId;
+import static org.apache.beam.runners.fnexecution.translation.PipelineTranslatorUtils.getInputId;
+import static org.apache.beam.runners.fnexecution.translation.PipelineTranslatorUtils.getOutputId;
+import static org.apache.beam.runners.fnexecution.translation.PipelineTranslatorUtils.getWindowedValueCoder;
+import static org.apache.beam.runners.fnexecution.translation.PipelineTranslatorUtils.getWindowingStrategy;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Queue;
+import java.util.Set;
+import java.util.concurrent.LinkedBlockingQueue;
+import java.util.stream.Collectors;
+import java.util.stream.StreamSupport;
+import org.apache.beam.model.pipeline.v1.RunnerApi;
+import org.apache.beam.runners.core.construction.PTransformTranslation;
+import org.apache.beam.runners.core.construction.graph.ExecutableStage;
+import org.apache.beam.runners.core.construction.graph.PipelineNode;
+import org.apache.beam.runners.core.construction.graph.PipelineNode.PTransformNode;
+import org.apache.beam.runners.core.construction.graph.QueryablePipeline;
+import org.apache.beam.runners.fnexecution.provisioning.JobInfo;
+import org.apache.beam.runners.spark.SparkPipelineOptions;
+import org.apache.beam.runners.spark.coders.CoderHelpers;
+import org.apache.beam.runners.spark.metrics.MetricsAccumulator;
+import org.apache.beam.runners.spark.stateful.SparkGroupAlsoByWindowViaWindowSet;
+import org.apache.beam.runners.spark.translation.streaming.UnboundedDataset;
+import org.apache.beam.runners.spark.util.GlobalWatermarkHolder;
+import org.apache.beam.runners.spark.util.SparkCompat;
+import org.apache.beam.sdk.coders.ByteArrayCoder;
+import org.apache.beam.sdk.coders.Coder;
+import org.apache.beam.sdk.coders.KvCoder;
+import org.apache.beam.sdk.transforms.join.RawUnionValue;
+import org.apache.beam.sdk.transforms.windowing.BoundedWindow;
+import org.apache.beam.sdk.transforms.windowing.GlobalWindow;
+import org.apache.beam.sdk.transforms.windowing.PaneInfo;
+import org.apache.beam.sdk.transforms.windowing.WindowFn;
+import org.apache.beam.sdk.util.WindowedValue;
+import org.apache.beam.sdk.values.KV;
+import org.apache.beam.sdk.values.TimestampedValue;
+import org.apache.beam.sdk.values.WindowingStrategy;
+import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.collect.BiMap;
+import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.collect.ImmutableMap;
+import org.apache.spark.api.java.JavaRDD;
+import org.apache.spark.api.java.JavaSparkContext;
+import org.apache.spark.broadcast.Broadcast;
+import org.apache.spark.storage.StorageLevel;
+import org.apache.spark.streaming.api.java.JavaDStream;
+import org.apache.spark.streaming.api.java.JavaInputDStream;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import scala.Tuple2;
+
+/** Translates an unbounded portable pipeline into a Spark job. */
+public class SparkStreamingPortablePipelineTranslator
+    implements SparkPortablePipelineTranslator<SparkStreamingTranslationContext> {
+
+  private static final Logger LOG =
+      LoggerFactory.getLogger(SparkStreamingPortablePipelineTranslator.class);
+
+  private final ImmutableMap<String, PTransformTranslator> urnToTransformTranslator;
+
+  interface PTransformTranslator {
+
+    /** Translates transformNode from Beam into the Spark context. */
+    void translate(
+        PTransformNode transformNode,
+        RunnerApi.Pipeline pipeline,
+        SparkStreamingTranslationContext context);
+  }
+
+  @Override
+  public Set<String> knownUrns() {
+    return urnToTransformTranslator.keySet();
+  }
+
+  public SparkStreamingPortablePipelineTranslator() {
+    ImmutableMap.Builder<String, PTransformTranslator> translatorMap = ImmutableMap.builder();
+    translatorMap.put(
+        PTransformTranslation.IMPULSE_TRANSFORM_URN,
+        SparkStreamingPortablePipelineTranslator::translateImpulse);
+    translatorMap.put(
+        PTransformTranslation.GROUP_BY_KEY_TRANSFORM_URN,
+        SparkStreamingPortablePipelineTranslator::translateGroupByKey);
+    translatorMap.put(
+        ExecutableStage.URN, SparkStreamingPortablePipelineTranslator::translateExecutableStage);
+    translatorMap.put(
+        PTransformTranslation.FLATTEN_TRANSFORM_URN,
+        SparkStreamingPortablePipelineTranslator::translateFlatten);
+    translatorMap.put(
+        PTransformTranslation.RESHUFFLE_URN,
+        SparkStreamingPortablePipelineTranslator::translateReshuffle);
+    this.urnToTransformTranslator = translatorMap.build();
+  }
+
+  /** Translates pipeline from Beam into the Spark context. */
+  @Override
+  public void translate(
+      final RunnerApi.Pipeline pipeline, SparkStreamingTranslationContext context) {
+    QueryablePipeline p =
+        QueryablePipeline.forTransforms(
+            pipeline.getRootTransformIdsList(), pipeline.getComponents());
+    for (PipelineNode.PTransformNode transformNode : p.getTopologicallyOrderedTransforms()) {
+      urnToTransformTranslator
+          .getOrDefault(
+              transformNode.getTransform().getSpec().getUrn(),
+              SparkStreamingPortablePipelineTranslator::urnNotFound)
+          .translate(transformNode, pipeline, context);
+    }
+  }
+
+  private static void urnNotFound(
+      PTransformNode transformNode,
+      RunnerApi.Pipeline pipeline,
+      SparkStreamingTranslationContext context) {
+    throw new IllegalArgumentException(
+        String.format(
+            "Transform %s has unknown URN %s",
+            transformNode.getId(), transformNode.getTransform().getSpec().getUrn()));
+  }
+
+  private static void translateImpulse(
+      PTransformNode transformNode,
+      RunnerApi.Pipeline pipeline,
+      SparkStreamingTranslationContext context) {
+
+    TimestampedValue<byte[]> tsValue = TimestampedValue.atMinimumTimestamp(new byte[0]);
+    Iterable<TimestampedValue<byte[]>> timestampedValues = Collections.singletonList(tsValue);
+    Iterable<WindowedValue<byte[]>> windowedValues =

Review comment:
       It seems it'd be easier construct this with
   
   ```
   Collections.singletonList(
       WindowedValue.of(...));
   ```
   
   rather than use an intermediate timestamped value list.

##########
File path: runners/spark/src/main/java/org/apache/beam/runners/spark/translation/SparkStreamingPortablePipelineTranslator.java
##########
@@ -0,0 +1,366 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.runners.spark.translation;
+
+import static org.apache.beam.runners.fnexecution.translation.PipelineTranslatorUtils.createOutputMap;
+import static org.apache.beam.runners.fnexecution.translation.PipelineTranslatorUtils.getExecutableStageIntermediateId;
+import static org.apache.beam.runners.fnexecution.translation.PipelineTranslatorUtils.getInputId;
+import static org.apache.beam.runners.fnexecution.translation.PipelineTranslatorUtils.getOutputId;
+import static org.apache.beam.runners.fnexecution.translation.PipelineTranslatorUtils.getWindowedValueCoder;
+import static org.apache.beam.runners.fnexecution.translation.PipelineTranslatorUtils.getWindowingStrategy;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Queue;
+import java.util.Set;
+import java.util.concurrent.LinkedBlockingQueue;
+import java.util.stream.Collectors;
+import java.util.stream.StreamSupport;
+import org.apache.beam.model.pipeline.v1.RunnerApi;
+import org.apache.beam.runners.core.construction.PTransformTranslation;
+import org.apache.beam.runners.core.construction.graph.ExecutableStage;
+import org.apache.beam.runners.core.construction.graph.PipelineNode;
+import org.apache.beam.runners.core.construction.graph.PipelineNode.PTransformNode;
+import org.apache.beam.runners.core.construction.graph.QueryablePipeline;
+import org.apache.beam.runners.fnexecution.provisioning.JobInfo;
+import org.apache.beam.runners.spark.SparkPipelineOptions;
+import org.apache.beam.runners.spark.coders.CoderHelpers;
+import org.apache.beam.runners.spark.metrics.MetricsAccumulator;
+import org.apache.beam.runners.spark.stateful.SparkGroupAlsoByWindowViaWindowSet;
+import org.apache.beam.runners.spark.translation.streaming.UnboundedDataset;
+import org.apache.beam.runners.spark.util.GlobalWatermarkHolder;
+import org.apache.beam.runners.spark.util.SparkCompat;
+import org.apache.beam.sdk.coders.ByteArrayCoder;
+import org.apache.beam.sdk.coders.Coder;
+import org.apache.beam.sdk.coders.KvCoder;
+import org.apache.beam.sdk.transforms.join.RawUnionValue;
+import org.apache.beam.sdk.transforms.windowing.BoundedWindow;
+import org.apache.beam.sdk.transforms.windowing.GlobalWindow;
+import org.apache.beam.sdk.transforms.windowing.PaneInfo;
+import org.apache.beam.sdk.transforms.windowing.WindowFn;
+import org.apache.beam.sdk.util.WindowedValue;
+import org.apache.beam.sdk.values.KV;
+import org.apache.beam.sdk.values.TimestampedValue;
+import org.apache.beam.sdk.values.WindowingStrategy;
+import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.collect.BiMap;
+import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.collect.ImmutableMap;
+import org.apache.spark.api.java.JavaRDD;
+import org.apache.spark.api.java.JavaSparkContext;
+import org.apache.spark.broadcast.Broadcast;
+import org.apache.spark.storage.StorageLevel;
+import org.apache.spark.streaming.api.java.JavaDStream;
+import org.apache.spark.streaming.api.java.JavaInputDStream;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import scala.Tuple2;
+
+/** Translates an unbounded portable pipeline into a Spark job. */
+public class SparkStreamingPortablePipelineTranslator
+    implements SparkPortablePipelineTranslator<SparkStreamingTranslationContext> {
+
+  private static final Logger LOG =
+      LoggerFactory.getLogger(SparkStreamingPortablePipelineTranslator.class);
+
+  private final ImmutableMap<String, PTransformTranslator> urnToTransformTranslator;
+
+  interface PTransformTranslator {
+
+    /** Translates transformNode from Beam into the Spark context. */
+    void translate(
+        PTransformNode transformNode,
+        RunnerApi.Pipeline pipeline,
+        SparkStreamingTranslationContext context);
+  }
+
+  @Override
+  public Set<String> knownUrns() {
+    return urnToTransformTranslator.keySet();
+  }
+
+  public SparkStreamingPortablePipelineTranslator() {
+    ImmutableMap.Builder<String, PTransformTranslator> translatorMap = ImmutableMap.builder();
+    translatorMap.put(
+        PTransformTranslation.IMPULSE_TRANSFORM_URN,
+        SparkStreamingPortablePipelineTranslator::translateImpulse);
+    translatorMap.put(
+        PTransformTranslation.GROUP_BY_KEY_TRANSFORM_URN,
+        SparkStreamingPortablePipelineTranslator::translateGroupByKey);
+    translatorMap.put(
+        ExecutableStage.URN, SparkStreamingPortablePipelineTranslator::translateExecutableStage);
+    translatorMap.put(
+        PTransformTranslation.FLATTEN_TRANSFORM_URN,
+        SparkStreamingPortablePipelineTranslator::translateFlatten);
+    translatorMap.put(
+        PTransformTranslation.RESHUFFLE_URN,
+        SparkStreamingPortablePipelineTranslator::translateReshuffle);
+    this.urnToTransformTranslator = translatorMap.build();
+  }
+
+  /** Translates pipeline from Beam into the Spark context. */
+  @Override
+  public void translate(
+      final RunnerApi.Pipeline pipeline, SparkStreamingTranslationContext context) {
+    QueryablePipeline p =
+        QueryablePipeline.forTransforms(
+            pipeline.getRootTransformIdsList(), pipeline.getComponents());
+    for (PipelineNode.PTransformNode transformNode : p.getTopologicallyOrderedTransforms()) {
+      urnToTransformTranslator
+          .getOrDefault(
+              transformNode.getTransform().getSpec().getUrn(),
+              SparkStreamingPortablePipelineTranslator::urnNotFound)
+          .translate(transformNode, pipeline, context);
+    }
+  }
+
+  private static void urnNotFound(
+      PTransformNode transformNode,
+      RunnerApi.Pipeline pipeline,
+      SparkStreamingTranslationContext context) {
+    throw new IllegalArgumentException(
+        String.format(
+            "Transform %s has unknown URN %s",
+            transformNode.getId(), transformNode.getTransform().getSpec().getUrn()));
+  }
+
+  private static void translateImpulse(
+      PTransformNode transformNode,
+      RunnerApi.Pipeline pipeline,
+      SparkStreamingTranslationContext context) {
+
+    TimestampedValue<byte[]> tsValue = TimestampedValue.atMinimumTimestamp(new byte[0]);
+    Iterable<TimestampedValue<byte[]>> timestampedValues = Collections.singletonList(tsValue);
+    Iterable<WindowedValue<byte[]>> windowedValues =
+        StreamSupport.stream(timestampedValues.spliterator(), false)
+            .map(
+                timestampedValue ->
+                    WindowedValue.of(
+                        timestampedValue.getValue(),
+                        timestampedValue.getTimestamp(),
+                        GlobalWindow.INSTANCE,
+                        PaneInfo.NO_FIRING))
+            .collect(Collectors.toList());
+
+    ByteArrayCoder coder = ByteArrayCoder.of();
+    WindowedValue.FullWindowedValueCoder<byte[]> windowCoder =
+        WindowedValue.FullWindowedValueCoder.of(coder, GlobalWindow.Coder.INSTANCE);
+    JavaRDD<WindowedValue<byte[]>> emptyByteArrayRDD =
+        context
+            .getSparkContext()
+            .parallelize(CoderHelpers.toByteArrays(windowedValues, windowCoder))
+            .map(CoderHelpers.fromByteFunction(windowCoder));
+
+    Queue<JavaRDD<WindowedValue<byte[]>>> rddQueue = new LinkedBlockingQueue<>();
+    rddQueue.offer(emptyByteArrayRDD);
+    JavaInputDStream<WindowedValue<byte[]>> emptyStream =
+        context.getStreamingContext().queueStream(rddQueue, true /* oneAtATime */);
+
+    UnboundedDataset<byte[]> output =
+        new UnboundedDataset<>(
+            emptyStream, Collections.singletonList(emptyStream.inputDStream().id()));
+
+    // Add watermark to holder and advance to infinity to ensure future watermarks can be updated
+    GlobalWatermarkHolder.SparkWatermarks sparkWatermark =
+        new GlobalWatermarkHolder.SparkWatermarks(
+            GlobalWindow.INSTANCE.maxTimestamp(),
+            BoundedWindow.TIMESTAMP_MAX_VALUE,
+            context.getFirstTimestamp());
+    GlobalWatermarkHolder.add(output.getStreamSources().get(0), sparkWatermark);
+
+    context.pushDataset(getOutputId(transformNode), output);
+  }
+
+  private static <K, V> void translateGroupByKey(
+      PTransformNode transformNode,
+      RunnerApi.Pipeline pipeline,
+      SparkStreamingTranslationContext context) {
+    RunnerApi.Components components = pipeline.getComponents();
+    String inputId = getInputId(transformNode);
+    UnboundedDataset<KV<K, V>> inputDataset =
+        (UnboundedDataset<KV<K, V>>) context.popDataset(inputId);
+    List<Integer> streamSources = inputDataset.getStreamSources();
+    JavaDStream<WindowedValue<KV<K, V>>> dStream = inputDataset.getDStream();
+    WindowedValue.WindowedValueCoder<KV<K, V>> inputCoder =
+        getWindowedValueCoder(inputId, components);
+    KvCoder<K, V> inputKvCoder = (KvCoder<K, V>) inputCoder.getValueCoder();
+    Coder<K> inputKeyCoder = inputKvCoder.getKeyCoder();
+    Coder<V> inputValueCoder = inputKvCoder.getValueCoder();
+    final WindowingStrategy windowingStrategy = getWindowingStrategy(inputId, components);
+    final WindowFn<Object, BoundedWindow> windowFn = windowingStrategy.getWindowFn();
+    final WindowedValue.WindowedValueCoder<V> wvCoder =
+        WindowedValue.FullWindowedValueCoder.of(inputValueCoder, windowFn.windowCoder());
+
+    JavaDStream<WindowedValue<KV<K, Iterable<V>>>> outStream =
+        SparkGroupAlsoByWindowViaWindowSet.groupByKeyAndWindow(
+            dStream,
+            inputKeyCoder,
+            wvCoder,
+            windowingStrategy,
+            context.getSerializableOptions(),
+            streamSources,
+            transformNode.getId());
+
+    context.pushDataset(
+        getOutputId(transformNode), new UnboundedDataset<>(outStream, streamSources));
+  }
+
+  private static <InputT, OutputT, SideInputT> void translateExecutableStage(
+      PTransformNode transformNode,
+      RunnerApi.Pipeline pipeline,
+      SparkStreamingTranslationContext context) {
+    RunnerApi.ExecutableStagePayload stagePayload;
+    try {
+      stagePayload =
+          RunnerApi.ExecutableStagePayload.parseFrom(
+              transformNode.getTransform().getSpec().getPayload());
+    } catch (IOException e) {
+      throw new RuntimeException(e);
+    }
+
+    String inputPCollectionId = stagePayload.getInput();
+    UnboundedDataset<InputT> inputDataset =
+        (UnboundedDataset<InputT>) context.popDataset(inputPCollectionId);
+    List<Integer> streamSources = inputDataset.getStreamSources();
+    JavaDStream<WindowedValue<InputT>> inputDStream = inputDataset.getDStream();
+    Map<String, String> outputs = transformNode.getTransform().getOutputsMap();
+    BiMap<String, Integer> outputMap = createOutputMap(outputs.values());
+
+    RunnerApi.Components components = pipeline.getComponents();
+    Coder windowCoder =
+        getWindowingStrategy(inputPCollectionId, components).getWindowFn().windowCoder();
+
+    // TODO (BEAM-10712): handle side inputs.

Review comment:
       Trow a clear exception here if the executable stage declares itself as having side inputs. 




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [beam] ibzib commented on pull request #12157: [BEAM-7587] Spark portable streaming

Posted by GitBox <gi...@apache.org>.
ibzib commented on pull request #12157:
URL: https://github.com/apache/beam/pull/12157#issuecomment-678529101


   run seed job


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [beam] ibzib commented on a change in pull request #12157: [BEAM-7587] Spark portable streaming

Posted by GitBox <gi...@apache.org>.
ibzib commented on a change in pull request #12157:
URL: https://github.com/apache/beam/pull/12157#discussion_r448656410



##########
File path: runners/spark/src/main/java/org/apache/beam/runners/spark/SparkPipelineRunner.java
##########
@@ -40,9 +41,7 @@
 import org.apache.beam.runners.jobsubmission.PortablePipelineRunner;
 import org.apache.beam.runners.spark.aggregators.AggregatorsAccumulator;
 import org.apache.beam.runners.spark.metrics.MetricsAccumulator;
-import org.apache.beam.runners.spark.translation.SparkBatchPortablePipelineTranslator;
-import org.apache.beam.runners.spark.translation.SparkContextFactory;
-import org.apache.beam.runners.spark.translation.SparkTranslationContext;
+import org.apache.beam.runners.spark.translation.*;

Review comment:
       Import classes instead of packages to avoid bloating the namespace.

##########
File path: runners/spark/src/main/java/org/apache/beam/runners/spark/translation/SparkTranslationContext.java
##########
@@ -35,15 +35,15 @@
  * compute them after translation.
  */
 public class SparkTranslationContext {
-  private final JavaSparkContext jsc;
+  protected final JavaSparkContext jsc;

Review comment:
       Why are these access changes necessary?




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [beam] ibzib commented on pull request #12157: [BEAM-7587] Spark portable streaming

Posted by GitBox <gi...@apache.org>.
ibzib commented on pull request #12157:
URL: https://github.com/apache/beam/pull/12157#issuecomment-678529142


   Run Java Spark PortableValidatesRunner Batch


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [beam] ibzib commented on pull request #12157: [BEAM-7587] Spark portable streaming

Posted by GitBox <gi...@apache.org>.
ibzib commented on pull request #12157:
URL: https://github.com/apache/beam/pull/12157#issuecomment-678590444


   Run Java Spark PortableValidatesRunner Batch


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [beam] ibzib merged pull request #12157: [BEAM-7587] Spark portable streaming

Posted by GitBox <gi...@apache.org>.
ibzib merged pull request #12157:
URL: https://github.com/apache/beam/pull/12157


   


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org